mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-25 23:15:51 +03:00
[GH-ISSUE #339] [FEATURE REQUEST] Group tasks by a specific key #1164
Labels
No labels
CLI
bug
designing
documentation
duplicate
enhancement
good first issue
good first issue
help wanted
idea
invalid
investigate
needs-more-info
performance
pr-welcome
pull-request
question
wontfix
work in progress
work in progress
work-around-available
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
starred/asynq#1164
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Originally created by @krhubert on GitHub (Nov 8, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/339
Originally assigned to: @hibiken on GitHub.
Is your feature request related to a problem? Please describe.
I have a service that that sends emails to users and sometimes it may happen that there will be >20 emails of the same type to send. Not the same email this is the key difference why I can't use the unique task option. Let me give you an example of such situation:
There's an email to accept an invitation to edit file A, B, C, ... They all come from different places in the system, so the queue will hold 3 emails (the same type
id-send-invitation) but with different data (filename:A, ...). I want to detect such state and get these 3 emails from queue to send only one email: `You have an invitation to many files (A,B,...) go to dashboard and accept them)Describe the solution you'd like
There are a few that comes to my mind:
email:invitation:id:reciver@email.comand allow to flush them every X seconds. In such a case, the task handler receives[]Taskslice and can implement a logic what to do with each task.group methodwhich will tell a queue how to group tasks into buckets. It could look like:Describe alternatives you've considered
Use some other services/tools/message brokers to achieve that
Additional context
@hibiken commented on GitHub (Nov 9, 2021):
@krhubert Thank you for opening an issue!
This is a great feature request and I think both of your suggestion makes sense. I think this requires more thinking and planning on my side and I'll update the thread when I have a more concrete plan!
@hibiken commented on GitHub (Nov 10, 2021):
I'm having trouble coming up with a good solution to this, it may take more time to design a solution that's satisfying from both package users' and implementers' perspective.
But in the meantime, I have a workaround suggestion:
Essentially, we can create a queue to buffer all the tasks we want to aggregate, and make sure to not configure any server to consume tasks directly from this queue. Then we enqueue another task (let's call this an "aggregator task") to aggregate all the tasks from the buffer queue (we can include the queue name in the payload). Configure server to process the aggregator task and within the Handler, read all tasks from the buffer queue and enqueue a combined task.
In handler, we handle the task2 which has the qname in the payload so that we can inspect all tasks from that queue and create a task that combines all of the tasks in the queue.
Let me know if this makes sense, or if you have any questions on this.
This is not an ideal solution but I think it's a good enough workaround while we come up with a solution to this :)
Update
Actually, we may not need to enqueue another task in the
HandleAggregatorTask, we can simply handle the tasks within the handler itself (just make sure to delete them once we are done with the tasks!)@hibiken commented on GitHub (Nov 11, 2021):
FYI: Found an interesting blog post from Sidekiq author: https://www.mikeperham.com/2020/12/14/grouping-events-for-later-processing/
@krhubert commented on GitHub (Nov 12, 2021):
The solution you wrote makes sense, but I was thinking about my usecase, and It even a bit more complex than what I originally described. I think what I really need is one more feature, a cooldown period before fire a task. Let me demonstrate it with an example of an email with a file invitation. Imagine two situations.
Now the pbm is to deliver their emails as quickly as possible, but also group as many emails as possible. A simple solution that flushes every let's say 30 seconds will trigger:
So Alice will receive 2 emails, and Bob one, but after 30 seconds. Now let's take a solution with a cooldown period. How it should work? Fire a task when no new events come to a queue after X seconds. Let's use 3 seconds as a cooling period and now we have:
I'm not sure if I have seen sth like that in any message brokers/task queues systems.
@hibiken commented on GitHub (Nov 13, 2021):
@krhubert The use case you described totally makes sense. And I can see that it'd be useful to others as well where you have to batch multiple successive operation into one (e.g. batch notifications to a user).
I'm currently designing the API and also what's the semantics around task options (e.g. retries, timeouts, etc)
This is my preliminary API proposal:
On the client side, user can specify a group key for a task.
On the worker (i.e. server) side, user can specify a few options in the config:
GroupGracePeriod: The grace period is renewed whenever a task with the same group key is added to the groupGroupMaxDelay: The grace period has a configurable upper bound, user can optionally set maximum delay, after which Asynq will deliver the tasks to Handler regardless of the remaining grace periodGroupMaxSize: User can also set an upper bound for the group size, if group size reaches this number, the tasks will be delivered to HandlerGroupFunc: User can provide a function to group the tasks into one taskAs for task options, I think we should take the minimum number for
I'll formalize this a bit more before start implementing this feature
@pawelmitka commented on GitHub (Dec 3, 2021):
@hibiken We would really like to have batching tasks on the consumer side to push data in batches instead of singular tasks. Let me know if you need any support in implementation of this solution.
@zhaolion commented on GitHub (Jan 21, 2022):
@hibiken The author of sidekiq have developed a new repository that implements batch tasks, which may give you some ideas: https://github.com/contribsys/faktory/wiki/Ent-Batches
@hibiken commented on GitHub (Jan 22, 2022):
@zhaolion Thanks for pointing that out. But it seems like Sidekiq's (and Faktory's) "batch feature" is more like a workflow feature, which we are planning on implementing, and tracked as a separte issue #244 .
This feature request is slightly different: We want to aggregate (or group) of tasks into one before invoking the
Handler. The initial comment in this thread gives a good example :)