[GH-ISSUE #322] [FEATURE REQUEST] Support for ULID in TaskMessage ID field #1153

Closed
opened 2026-03-07 22:06:31 +03:00 by kerem · 9 comments
Owner

Originally created by @dhh93 on GitHub (Sep 5, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/322

Originally assigned to: @hibiken on GitHub.

Hey there,

Was wondering if we could add support for ULID in the TaskMessage ID field. The first 10 characters of each ULID string encode timestamp information, which would enable the sorting of tasks by ID.

There is another library (albeit 3rd party) that implements ULIDs with a similar API to the UUID library, though it would require a little work to to make the two interchangeable within asynq.

  • Client.go L:299 calls uuid.New, which takes no arguments, while ulid.New needs a uint64 timestamp and an io.Reader as an entropy source. time.Now() and rand.Reader (or nil) seem like sensible defaults.
  • Base.go L:264 calls uuid.MustParse. which returns a UUID type while ulid.MustParse returns a ULID type.

Given the two differences above, it looks like a custom type would have to be implemented to support both, but it shouldn't be too much of a hassle.

Happy to submit a PR to help implement if needed.

Cheers.

Originally created by @dhh93 on GitHub (Sep 5, 2021). Original GitHub issue: https://github.com/hibiken/asynq/issues/322 Originally assigned to: @hibiken on GitHub. Hey there, Was wondering if we could add support for [ULID](https://github.com/ulid/spec) in the `TaskMessage` ID field. The first 10 characters of each ULID string encode timestamp information, which would enable the sorting of tasks by ID. There is another [library](https://github.com/oklog/ulid) (albeit 3rd party) that implements ULIDs with a similar API to the UUID library, though it would require a little work to to make the two interchangeable within asynq. - [Client.go L:299](https://github.com/hibiken/asynq/blob/05534c6f248799ed3c01717f1076df522e03bd52/client.go#L299) calls `uuid.New`, which takes no arguments, while `ulid.New` needs a `uint64` timestamp and an `io.Reader` as an entropy source. `time.Now()` and `rand.Reader` (or `nil`) seem like sensible defaults. - [Base.go L:264](https://github.com/hibiken/asynq/blob/05534c6f248799ed3c01717f1076df522e03bd52/internal/base/base.go#L264) calls `uuid.MustParse`. which returns a `UUID` type while `ulid.MustParse` returns a `ULID` type. Given the two differences above, it looks like a custom type would have to be implemented to support both, but it shouldn't be too much of a hassle. Happy to submit a PR to help implement if needed. Cheers.
kerem 2026-03-07 22:06:31 +03:00
Author
Owner

@hibiken commented on GitHub (Sep 7, 2021):

@dhh93 Thank you for creating this issue!

To better understand the motivation behind the change; why do you need to sort tasks by the timestamp?
I'm guessing the timestamp is the time the task was enqueued or created?

<!-- gh-comment-id:914335654 --> @hibiken commented on GitHub (Sep 7, 2021): @dhh93 Thank you for creating this issue! To better understand the motivation behind the change; why do you need to sort tasks by the timestamp? I'm guessing the timestamp is the time the task was enqueued or created?
Author
Owner

@ryan961 commented on GitHub (Sep 8, 2021):

@hibiken hi😊,I have a similar need. First, I really liked the project and put it into production, although it's still in beta.I use it as a cache synchronization queue. I update the data in Redis and equeue the update operation to the asynq queue (schedule queue). The asynq server pulls tasks off queues and synchronize update operations to the database. To reduce database stress, I pass asynq.ProcessIn(300*time.Second) options to tune task processing behavior at enqueue time,
if the same operation happening in the 5 minutes,I will re-equeue this update operation task and also pass asynq.ProcessIn(300*time.Second) options . So I need to set the same task ID to override the last task. I add taskIdOption.

// TaskId returns an option to enqueue a task by the given task uniqueKey.
func TaskId(taskId string) Option {
	return taskIdOption(taskId)
}

func (t taskIdOption) String() string     { return fmt.Sprintf("TaskId(%q)", string(t)) }
func (t taskIdOption) Type() OptionType   { return TaskIdOpt }
func (t taskIdOption) Value() interface{} { return string(t) }
func composeOptions(opts ...Option) (option, error) {
	res := option{
		retry:     defaultMaxRetry,
		queue:     base.DefaultQueueName,
		timeout:   0, // do not set to deafultTimeout here
		deadline:  time.Time{},
		processAt: time.Now(),
	}
	for _, opt := range opts {
		switch opt := opt.(type) {
		case retryOption:
			res.retry = int(opt)
		case queueOption:
			qname := string(opt)
			if err := base.ValidateQueueName(qname); err != nil {
				return option{}, err
			}
			res.queue = qname
		case timeoutOption:
			res.timeout = time.Duration(opt)
		case deadlineOption:
			res.deadline = time.Time(opt)
		case uniqueOption:
			res.uniqueTTL = time.Duration(opt)
		case processAtOption:
			res.processAt = time.Time(opt)
		case processInOption:
			res.processAt = time.Now().Add(time.Duration(opt))
		case taskIdOption:
			id := uuid.NewMD5(uuid.UUID{}, []byte(string(opt)))
			res.taskId = &id
		default:
			// ignore unexpected option
		}
	}

	if res.taskId == nil {
		id := uuid.New()
		res.taskId = &id
	}
	return res, nil
}
msg := &base.TaskMessage{
		ID:        *opt.taskId,
		Type:      task.Type(),
		Payload:   task.Payload(),
		Queue:     opt.queue,
		Retry:     opt.retry,
		Deadline:  deadline.Unix(),
		Timeout:   int64(timeout.Seconds()),
		UniqueKey: uniqueKey,
	}
<!-- gh-comment-id:914871989 --> @ryan961 commented on GitHub (Sep 8, 2021): @hibiken hi😊,I have a similar need. First, I really liked the project and put it into production, although it's still in beta.I use it as a cache synchronization queue. I update the data in Redis and equeue the update operation to the asynq queue (schedule queue). The asynq server pulls tasks off queues and synchronize update operations to the database. To reduce database stress, I pass `asynq.ProcessIn(300*time.Second)` options to tune task processing behavior at enqueue time, if the same operation happening in the 5 minutes,I will re-equeue this update operation task and also pass `asynq.ProcessIn(300*time.Second)` options . So I need to set the same task ID to override the last task. I add `taskIdOption`. ``` // TaskId returns an option to enqueue a task by the given task uniqueKey. func TaskId(taskId string) Option { return taskIdOption(taskId) } func (t taskIdOption) String() string { return fmt.Sprintf("TaskId(%q)", string(t)) } func (t taskIdOption) Type() OptionType { return TaskIdOpt } func (t taskIdOption) Value() interface{} { return string(t) } ``` ``` func composeOptions(opts ...Option) (option, error) { res := option{ retry: defaultMaxRetry, queue: base.DefaultQueueName, timeout: 0, // do not set to deafultTimeout here deadline: time.Time{}, processAt: time.Now(), } for _, opt := range opts { switch opt := opt.(type) { case retryOption: res.retry = int(opt) case queueOption: qname := string(opt) if err := base.ValidateQueueName(qname); err != nil { return option{}, err } res.queue = qname case timeoutOption: res.timeout = time.Duration(opt) case deadlineOption: res.deadline = time.Time(opt) case uniqueOption: res.uniqueTTL = time.Duration(opt) case processAtOption: res.processAt = time.Time(opt) case processInOption: res.processAt = time.Now().Add(time.Duration(opt)) case taskIdOption: id := uuid.NewMD5(uuid.UUID{}, []byte(string(opt))) res.taskId = &id default: // ignore unexpected option } } if res.taskId == nil { id := uuid.New() res.taskId = &id } return res, nil } ``` ``` msg := &base.TaskMessage{ ID: *opt.taskId, Type: task.Type(), Payload: task.Payload(), Queue: opt.queue, Retry: opt.retry, Deadline: deadline.Unix(), Timeout: int64(timeout.Seconds()), UniqueKey: uniqueKey, } ```
Author
Owner

@hibiken commented on GitHub (Sep 8, 2021):

@Ryalu you should probably achieve similar effect by passing Unique(ttl) option when enqueueing the task.

Example:

// passing Unique option to avoid creating redundant tasks.
info, err := client.Enqueue(task, asynq.ProcessIn(300*time.Second), asynq.Unique(300*time.Second))
if err != nil {
    if errors.Is(err, asynq.ErrDuplicateTask) {
         // the task already exist
    } else {
        // other error
    }
}
<!-- gh-comment-id:914888128 --> @hibiken commented on GitHub (Sep 8, 2021): @Ryalu you should probably achieve similar effect by passing `Unique(ttl)` option when enqueueing the task. Example: ```go // passing Unique option to avoid creating redundant tasks. info, err := client.Enqueue(task, asynq.ProcessIn(300*time.Second), asynq.Unique(300*time.Second)) if err != nil { if errors.Is(err, asynq.ErrDuplicateTask) { // the task already exist } else { // other error } } ```
Author
Owner

@ryan961 commented on GitHub (Sep 8, 2021):

@Ryalu you should probably achieve similar effect by passing Unique(ttl) option when enqueueing the task.

Example:

// passing Unique option to avoid creating redundant tasks.
info, err := client.Enqueue(task, asynq.ProcessIn(300*time.Second), asynq.Unique(300*time.Second))
if err != nil {
    if errors.Is(err, asynq.ErrDuplicateTask) {
         // the task already exist
    } else {
        // other error
    }
}

Thank you for replying.
It can ensure that tasks are not executed repeatedly, but can't override the same task. such as:
I create a task to update item (UPDATE itemsSETnumber=500 WHERE customerId = 3 AND item = 2) and enqueue
client.Enqueue(task, asynq.TaskId("customerId:3:item:2"), asynq.ProcessIn(300*time.Second)). After the 30 seconds, I need to re-enqueue task("customerId:3:item:2")(UPDATE itemsSETnumber=1000 WHERE customerId = 3 AND item = 2) again. I also enqueue client.Enqueue(task, asynq.TaskId("customerId:3:item:2"), asynq.ProcessIn(300*time.Second)). For mysql, it only execute UPDATE itemsSETnumber=1000 WHERE customerId = 3 AND item = 2.

<!-- gh-comment-id:914901580 --> @ryan961 commented on GitHub (Sep 8, 2021): > @Ryalu you should probably achieve similar effect by passing `Unique(ttl)` option when enqueueing the task. > > Example: > > ```go > // passing Unique option to avoid creating redundant tasks. > info, err := client.Enqueue(task, asynq.ProcessIn(300*time.Second), asynq.Unique(300*time.Second)) > if err != nil { > if errors.Is(err, asynq.ErrDuplicateTask) { > // the task already exist > } else { > // other error > } > } > ``` Thank you for replying. It can ensure that tasks are not executed repeatedly, but can't override the same task. such as: I create a task to update item (`UPDATE `items` SET `number`=500 WHERE customerId = 3 AND item = 2`) and enqueue `client.Enqueue(task, asynq.TaskId("customerId:3:item:2"), asynq.ProcessIn(300*time.Second))`. After the 30 seconds, I need to re-enqueue task("customerId:3:item:2")(`UPDATE `items` SET `number`=1000 WHERE customerId = 3 AND item = 2`) again. I also enqueue `client.Enqueue(task, asynq.TaskId("customerId:3:item:2"), asynq.ProcessIn(300*time.Second))`. For mysql, it only execute `UPDATE `items` SET `number`=1000 WHERE customerId = 3 AND item = 2`.
Author
Owner

@hibiken commented on GitHub (Sep 8, 2021):

Ok thank you for clarifying!

To address the initial feature request by @dhh93 and follow up request by @Ryalu, I think we can provide TaskID(string) option when enqueueing tasks to allow users to provide a custom task ID.

@Ryalu I think Client.Enqueue should return ErrDuplicateTask if task with the given ID already exists. I think we can add a method to Inspector to update an existing task with the given ID.

Example:

info, err := client.Enqueue(task, asynq.TaskID("custom_id"), asynq.ProcessIn(5*time.Minute))
if err != nil {
    if errors.Is(err, asynq.ErrDuplicateTask) {
         // task already exists, update the existing task with new payload.
         info, err = inspector.UpdateTask("custom_id", task.Payload())
         // ... 
    } else {
         // handle other errors
    }
}

So my proposed changes are the following:

  • Add func TaskID(custom_id string) Option to allow custom task IDs
  • Add (i *Inspector) UpdateTask(id string, payload []byte) (*TaskInfo, error) to allow updating existing task

@dhh93 @Ryalu Let me know if you have any feedback/thoughts!

<!-- gh-comment-id:915221184 --> @hibiken commented on GitHub (Sep 8, 2021): Ok thank you for clarifying! To address the initial feature request by @dhh93 and follow up request by @Ryalu, I think we can provide `TaskID(string)` option when enqueueing tasks to allow users to provide a custom task ID. @Ryalu I think `Client.Enqueue` should return `ErrDuplicateTask` if task with the given ID already exists. I think we can add a method to `Inspector` to update an existing task with the given ID. Example: ```go info, err := client.Enqueue(task, asynq.TaskID("custom_id"), asynq.ProcessIn(5*time.Minute)) if err != nil { if errors.Is(err, asynq.ErrDuplicateTask) { // task already exists, update the existing task with new payload. info, err = inspector.UpdateTask("custom_id", task.Payload()) // ... } else { // handle other errors } } ``` So my proposed changes are the following: - Add `func TaskID(custom_id string) Option` to allow custom task IDs - Add `(i *Inspector) UpdateTask(id string, payload []byte) (*TaskInfo, error)` to allow updating existing task @dhh93 @Ryalu Let me know if you have any feedback/thoughts!
Author
Owner

@dhh93 commented on GitHub (Sep 8, 2021):

Appreciate the response, @hibiken. With regards to your initial question, I wanted to incorporate ULIDs to help with logging/tracing; I think this somewhat ties in to Issue #321 as well. Having sortable, unique IDs really helps with parsing through aggregated logs/requests across an entire system.

The proposed changes look good -- allowing for custom IDs in general is nice since users can work with the Inspector API to manage tasks without keeping a separate mapping of the asynq specific Task ID & their own systems'.

I think the library should stick with UUID as the default to minimize disruption to those who rely on it. Having the functional option that you suggested is probably the best way to implement custom user IDs, though the DecodeMessage function may have to be changed to accommodate a string instead of unmarshaling to a UUID like it is currently doing now.

github.com/hibiken/asynq@05534c6f24/internal/base/base.go (L264)

<!-- gh-comment-id:915390532 --> @dhh93 commented on GitHub (Sep 8, 2021): Appreciate the response, @hibiken. With regards to your initial question, I wanted to incorporate ULIDs to help with logging/tracing; I think this somewhat ties in to [Issue #321](https://github.com/hibiken/asynq/issues/321) as well. Having sortable, unique IDs really helps with parsing through aggregated logs/requests across an entire system. The proposed changes look good -- allowing for custom IDs in general is nice since users can work with the `Inspector` API to manage tasks without keeping a separate mapping of the `asynq` specific Task ID & their own systems'. I think the library should stick with `UUID` as the default to minimize disruption to those who rely on it. Having the functional option that you suggested is probably the best way to implement custom user IDs, though the `DecodeMessage` function may have to be changed to accommodate a string instead of unmarshaling to a `UUID` like it is currently doing now. https://github.com/hibiken/asynq/blob/05534c6f248799ed3c01717f1076df522e03bd52/internal/base/base.go#L264
Author
Owner

@crossworth commented on GitHub (Sep 24, 2021):

@hibiken this can be closed I guess.
Seems that the PR failed to close this issue when was merged.

<!-- gh-comment-id:926587846 --> @crossworth commented on GitHub (Sep 24, 2021): @hibiken this can be closed I guess. Seems that the PR failed to close this issue when was merged.
Author
Owner

@hibiken commented on GitHub (Sep 30, 2021):

@crossworth The change is not in master yet. The branch was merged into next branch which I will merge once I'm ready to release a new version :)

<!-- gh-comment-id:931322865 --> @hibiken commented on GitHub (Sep 30, 2021): @crossworth The change is not in master yet. The branch was merged into `next` branch which I will merge once I'm ready to release a new version :)
Author
Owner

@hibiken commented on GitHub (Nov 6, 2021):

This has landed in master now and available since version 0.19

<!-- gh-comment-id:962518203 --> @hibiken commented on GitHub (Nov 6, 2021): This has landed in master now and available since version 0.19
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1153
No description provided.