[GH-ISSUE #339] [FEATURE REQUEST] Group tasks by a specific key #2174

Closed
opened 2026-03-15 19:33:19 +03:00 by kerem · 8 comments
Owner

Originally created by @krhubert on GitHub (Nov 8, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/339

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
I have a service that that sends emails to users and sometimes it may happen that there will be >20 emails of the same type to send. Not the same email this is the key difference why I can't use the unique task option. Let me give you an example of such situation:

There's an email to accept an invitation to edit file A, B, C, ... They all come from different places in the system, so the queue will hold 3 emails (the same type id-send-invitation) but with different data (filename: A, ...). I want to detect such state and get these 3 emails from queue to send only one email: `You have an invitation to many files (A,B,...) go to dashboard and accept them)

Describe the solution you'd like
There are a few that comes to my mind:

  1. allow to create queues in a runtime like email:invitation:id:reciver@email.com and allow to flush them every X seconds. In such a case, the task handler receives []Task slice and can implement a logic what to do with each task.
  2. allow to pass a group method which will tell a queue how to group tasks into buckets. It could look like:
func group(t *asyncq.Task) (bucket string, windowToWait time.Time){
  return t.payload.id + t.payload.email, time.Minute
}

Describe alternatives you've considered
Use some other services/tools/message brokers to achieve that

Additional context

Originally created by @krhubert on GitHub (Nov 8, 2021). Original GitHub issue: https://github.com/hibiken/asynq/issues/339 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** I have a service that that sends emails to users and sometimes it may happen that there will be >20 emails of the same type to send. Not the same email this is the key difference why I can't use the unique task option. Let me give you an example of such situation: There's an email to accept an invitation to edit file A, B, C, ... They all come from different places in the system, so the queue will hold 3 emails (the same type `id-send-invitation`) but with different data (filename: `A`, ...). I want to detect such state and get these 3 emails from queue to send only one email: `You have an invitation to many files (A,B,...) go to dashboard and accept them) **Describe the solution you'd like** There are a few that comes to my mind: 1. allow to create queues in a runtime like `email:invitation:id:reciver@email.com` and allow to flush them every X seconds. In such a case, the task handler receives `[]Task` slice and can implement a logic what to do with each task. 2. allow to pass a `group method` which will tell a queue how to group tasks into buckets. It could look like: ```go func group(t *asyncq.Task) (bucket string, windowToWait time.Time){ return t.payload.id + t.payload.email, time.Minute } ``` **Describe alternatives you've considered** Use some other services/tools/message brokers to achieve that **Additional context** -
kerem 2026-03-15 19:33:19 +03:00
Author
Owner

@hibiken commented on GitHub (Nov 9, 2021):

@krhubert Thank you for opening an issue!
This is a great feature request and I think both of your suggestion makes sense. I think this requires more thinking and planning on my side and I'll update the thread when I have a more concrete plan!

<!-- gh-comment-id:963797070 --> @hibiken commented on GitHub (Nov 9, 2021): @krhubert Thank you for opening an issue! This is a great feature request and I think both of your suggestion makes sense. I think this requires more thinking and planning on my side and I'll update the thread when I have a more concrete plan!
Author
Owner

@hibiken commented on GitHub (Nov 10, 2021):

I'm having trouble coming up with a good solution to this, it may take more time to design a solution that's satisfying from both package users' and implementers' perspective.

But in the meantime, I have a workaround suggestion:
Essentially, we can create a queue to buffer all the tasks we want to aggregate, and make sure to not configure any server to consume tasks directly from this queue. Then we enqueue another task (let's call this an "aggregator task") to aggregate all the tasks from the buffer queue (we can include the queue name in the payload). Configure server to process the aggregator task and within the Handler, read all tasks from the buffer queue and enqueue a combined task.

// On the client side, enqueue two tasks:
// 1) task to send notification to a user
// 2) task to aggregate all notification tasks in a queue to one task

userID := 123
qname := fmt.Sprintf("notifications:%d",  userID)
task1 := NewNotificationTask(userID)
_, err := client.Enqueue(task1, Queue(qname)) // Enqueue the notification to a queue "notifications:123"

task2 := NewAggregatorTask(qname)
// Enqueue an aggregator task, TaskID option ensures that there's only one task present in the queue.
_, err = client.Enqueue(task2, asynq.TaskID(qname))

In handler, we handle the task2 which has the qname in the payload so that we can inspect all tasks from that queue and create a task that combines all of the tasks in the queue.

var inspector *asynq.Inspector
var client *asynq.Client

func HandleAggregatorTask(ctx context.Context, task *asynq.Task) error {
     qname := GetQueueNameFromPayload(task)
     tasks, err := inspector.ListPendingTasks(qname)
     // check err
     if len(tasks) == 0 {
           return nil // No tasks to aggregate (most likely another aggregator task ran first)
     }
     t := CombineTasks(tasks)
     _, err := client.Enqueue(task, Queue("notifications"))
     // Make sure to delete the tasks from the buffer queue
     for _, t := range tasks {
          if err := inspector.DeleteTask(qname, t.ID); err != nil {
                log.Printf("Failed to delete task %s from queue %q", t.ID, qname)
          }
     }
     return nil
}

Let me know if this makes sense, or if you have any questions on this.

This is not an ideal solution but I think it's a good enough workaround while we come up with a solution to this :)


Update

Actually, we may not need to enqueue another task in the HandleAggregatorTask, we can simply handle the tasks within the handler itself (just make sure to delete them once we are done with the tasks!)

<!-- gh-comment-id:965854506 --> @hibiken commented on GitHub (Nov 10, 2021): I'm having trouble coming up with a good solution to this, it may take more time to design a solution that's satisfying from both package users' and implementers' perspective. But in the meantime, I have a workaround suggestion: Essentially, we can create a queue to buffer all the tasks we want to aggregate, and make sure to *not* configure any server to consume tasks directly from this queue. Then we enqueue another task (let's call this an "aggregator task") to aggregate all the tasks from the buffer queue (we can include the queue name in the payload). Configure server to process the aggregator task and within the Handler, read all tasks from the buffer queue and enqueue a combined task. ```go // On the client side, enqueue two tasks: // 1) task to send notification to a user // 2) task to aggregate all notification tasks in a queue to one task userID := 123 qname := fmt.Sprintf("notifications:%d", userID) task1 := NewNotificationTask(userID) _, err := client.Enqueue(task1, Queue(qname)) // Enqueue the notification to a queue "notifications:123" task2 := NewAggregatorTask(qname) // Enqueue an aggregator task, TaskID option ensures that there's only one task present in the queue. _, err = client.Enqueue(task2, asynq.TaskID(qname)) ``` In handler, we handle the task2 which has the qname in the payload so that we can inspect all tasks from that queue and create a task that combines all of the tasks in the queue. ```go var inspector *asynq.Inspector var client *asynq.Client func HandleAggregatorTask(ctx context.Context, task *asynq.Task) error { qname := GetQueueNameFromPayload(task) tasks, err := inspector.ListPendingTasks(qname) // check err if len(tasks) == 0 { return nil // No tasks to aggregate (most likely another aggregator task ran first) } t := CombineTasks(tasks) _, err := client.Enqueue(task, Queue("notifications")) // Make sure to delete the tasks from the buffer queue for _, t := range tasks { if err := inspector.DeleteTask(qname, t.ID); err != nil { log.Printf("Failed to delete task %s from queue %q", t.ID, qname) } } return nil } ``` Let me know if this makes sense, or if you have any questions on this. This is not an ideal solution but I think it's a good enough workaround while we come up with a solution to this :) --- **Update** Actually, we may not need to enqueue another task in the `HandleAggregatorTask`, we can simply handle the tasks within the handler itself (just make sure to delete them once we are done with the tasks!)
Author
Owner

@hibiken commented on GitHub (Nov 11, 2021):

FYI: Found an interesting blog post from Sidekiq author: https://www.mikeperham.com/2020/12/14/grouping-events-for-later-processing/

<!-- gh-comment-id:965893916 --> @hibiken commented on GitHub (Nov 11, 2021): FYI: Found an interesting blog post from Sidekiq author: https://www.mikeperham.com/2020/12/14/grouping-events-for-later-processing/
Author
Owner

@krhubert commented on GitHub (Nov 12, 2021):

The solution you wrote makes sense, but I was thinking about my usecase, and It even a bit more complex than what I originally described. I think what I really need is one more feature, a cooldown period before fire a task. Let me demonstrate it with an example of an email with a file invitation. Imagine two situations.

  1. user Alice receives an invitation email every 0.5 sec through 60 seconds. so she gets 120 emails in total that can be grouped.
  2. user Bob receives only one email.

Now the pbm is to deliver their emails as quickly as possible, but also group as many emails as possible. A simple solution that flushes every let's say 30 seconds will trigger:

  1. 2 tasks for Alice
  2. 1 task for Bob after 30 sec

So Alice will receive 2 emails, and Bob one, but after 30 seconds. Now let's take a solution with a cooldown period. How it should work? Fire a task when no new events come to a queue after X seconds. Let's use 3 seconds as a cooling period and now we have:

  1. Alice will receive one email after 63 seconds
  2. Bob will receive one email after 3 seconds.

I'm not sure if I have seen sth like that in any message brokers/task queues systems.

<!-- gh-comment-id:966969808 --> @krhubert commented on GitHub (Nov 12, 2021): The solution you wrote makes sense, but I was thinking about my usecase, and It even a bit more complex than what I originally described. I think what I really need is one more feature, a cooldown period before fire a task. Let me demonstrate it with an example of an email with a file invitation. Imagine two situations. 1. user Alice receives an invitation email every 0.5 sec through 60 seconds. so she gets 120 emails in total that can be grouped. 2. user Bob receives only one email. Now the pbm is to deliver their emails as quickly as possible, but also group as many emails as possible. A simple solution that flushes every let's say 30 seconds will trigger: 1. 2 tasks for Alice 2. 1 task for Bob after 30 sec So Alice will receive 2 emails, and Bob one, but after 30 seconds. Now let's take a solution with a cooldown period. How it should work? Fire a task when no new events come to a queue after X seconds. Let's use 3 seconds as a cooling period and now we have: 1. Alice will receive one email after 63 seconds 2. Bob will receive one email after 3 seconds. I'm not sure if I have seen sth like that in any message brokers/task queues systems.
Author
Owner

@hibiken commented on GitHub (Nov 13, 2021):

@krhubert The use case you described totally makes sense. And I can see that it'd be useful to others as well where you have to batch multiple successive operation into one (e.g. batch notifications to a user).

I'm currently designing the API and also what's the semantics around task options (e.g. retries, timeouts, etc)

This is my preliminary API proposal:

On the client side, user can specify a group key for a task.

// Enqueue with Group option. Tasks in the same queue with the same group key will be grouped
client.Enqueue(task, asynq.Queue("notifications"), asynq.Group("notification:user1"))

On the worker (i.e. server) side, user can specify a few options in the config:

  • GroupGracePeriod: The grace period is renewed whenever a task with the same group key is added to the group
  • GroupMaxDelay: The grace period has a configurable upper bound, user can optionally set maximum delay, after which Asynq will deliver the tasks to Handler regardless of the remaining grace period
  • GroupMaxSize: User can also set an upper bound for the group size, if group size reaches this number, the tasks will be delivered to Handler
  • GroupFunc: User can provide a function to group the tasks into one task
srv := asynq.NewServer{
     asynq.RedisClientOpt{Addr: ":6379"},
     asynq.Config{
          Queues: map[string]int{
                "notifications": 1,
          },
          GroupGracePeriod: 1 * time.Minute,
          GroupMaxDelay: 10 * time.Minute,
          GroupMaxSize: 20,
          GroupFunc: func(groupName string, tasks []*asynq.Task) *asynq.Task {
                 // ... combine all tasks into one
                 // By default, it should use groupName as the task typename,
                 // and use JSON to encode all tasks into payload
                 // JSON would look something like 
                 // { tasks: [{type: "notification", payload: "xxx"}, {type: "notifiation": payload: "yyy"}] 
          },
     },
}

As for task options, I think we should take the minimum number for

  • Retry count
  • Timeout
  • Deadline
  • Retention

I'll formalize this a bit more before start implementing this feature

<!-- gh-comment-id:967746321 --> @hibiken commented on GitHub (Nov 13, 2021): @krhubert The use case you described totally makes sense. And I can see that it'd be useful to others as well where you have to batch multiple successive operation into one (e.g. batch notifications to a user). I'm currently designing the API and also what's the semantics around task options (e.g. retries, timeouts, etc) This is my preliminary API proposal: On the client side, user can specify a group key for a task. ```go // Enqueue with Group option. Tasks in the same queue with the same group key will be grouped client.Enqueue(task, asynq.Queue("notifications"), asynq.Group("notification:user1")) ``` On the worker (i.e. server) side, user can specify a few options in the config: - `GroupGracePeriod`: The grace period is renewed whenever a task with the same group key is added to the group - `GroupMaxDelay`: The grace period has a configurable upper bound, user can optionally set maximum delay, after which Asynq will deliver the tasks to Handler regardless of the remaining grace period - `GroupMaxSize`: User can also set an upper bound for the group size, if group size reaches this number, the tasks will be delivered to Handler - `GroupFunc`: User can provide a function to group the tasks into one task ```go srv := asynq.NewServer{ asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{ Queues: map[string]int{ "notifications": 1, }, GroupGracePeriod: 1 * time.Minute, GroupMaxDelay: 10 * time.Minute, GroupMaxSize: 20, GroupFunc: func(groupName string, tasks []*asynq.Task) *asynq.Task { // ... combine all tasks into one // By default, it should use groupName as the task typename, // and use JSON to encode all tasks into payload // JSON would look something like // { tasks: [{type: "notification", payload: "xxx"}, {type: "notifiation": payload: "yyy"}] }, }, } ``` As for task options, I think we should take the minimum number for - Retry count - Timeout - Deadline - Retention I'll formalize this a bit more before start implementing this feature
Author
Owner

@pawelmitka commented on GitHub (Dec 3, 2021):

@hibiken We would really like to have batching tasks on the consumer side to push data in batches instead of singular tasks. Let me know if you need any support in implementation of this solution.

<!-- gh-comment-id:985507125 --> @pawelmitka commented on GitHub (Dec 3, 2021): @hibiken We would really like to have batching tasks on the consumer side to push data in batches instead of singular tasks. Let me know if you need any support in implementation of this solution.
Author
Owner

@zhaolion commented on GitHub (Jan 21, 2022):

@hibiken The author of sidekiq have developed a new repository that implements batch tasks, which may give you some ideas: https://github.com/contribsys/faktory/wiki/Ent-Batches

<!-- gh-comment-id:1018085704 --> @zhaolion commented on GitHub (Jan 21, 2022): @hibiken The author of sidekiq have developed a new [repository](https://github.com/contribsys/faktory) that implements batch tasks, which may give you some ideas: https://github.com/contribsys/faktory/wiki/Ent-Batches
Author
Owner

@hibiken commented on GitHub (Jan 22, 2022):

@zhaolion Thanks for pointing that out. But it seems like Sidekiq's (and Faktory's) "batch feature" is more like a workflow feature, which we are planning on implementing, and tracked as a separte issue #244 .

This feature request is slightly different: We want to aggregate (or group) of tasks into one before invoking the Handler. The initial comment in this thread gives a good example :)

<!-- gh-comment-id:1018997544 --> @hibiken commented on GitHub (Jan 22, 2022): @zhaolion Thanks for pointing that out. But it seems like Sidekiq's (and Faktory's) "batch feature" is more like a workflow feature, which we are planning on implementing, and tracked as a separte issue #244 . This feature request is slightly different: We want to aggregate (or group) of tasks into one before invoking the `Handler`. The initial comment in this thread gives a good example :)
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2174
No description provided.