mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-25 23:15:51 +03:00
[GH-ISSUE #328] [FEATURE REQUEST] Distributed concurrency support per queue / taskType #2167
Labels
No labels
CLI
bug
designing
documentation
duplicate
enhancement
good first issue
good first issue
help wanted
idea
invalid
investigate
needs-more-info
performance
pr-welcome
pull-request
question
wontfix
work in progress
work in progress
work-around-available
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
starred/asynq#2167
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Originally created by @ajatprabha on GitHub (Sep 18, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/328
Originally assigned to: @hibiken on GitHub.
Is your feature request related to a problem? Please describe.
N/A
Describe the solution you'd like
Is it possible to set concurrency individually for every queue like it is supported here.
Describe alternatives you've considered
Currently I tried to wire up the server in a way where I calculate total concurrency and set max priority and assuming that the combination will provide some sort of similar effects as in
gocraft/work. Storing the maxConcurrency and priority info per queue and using it for initialisation of the server when starting the adapter (a simple wrapper over asynq).Additional context
Ref: Priority Sampler from gocraft/work
@hibiken commented on GitHub (Sep 20, 2021):
@ajatprabha Thank you for the question.
Currently Asynq doesn't support per queue concurrency rate limiting out of the box. However, it's possible to configure this by a pattern described in the Rate Limiting wiki page.
If you only need to limit the max concurrency of a certain task type within a single process, I think you can create a counting semaphore and check before you handle the task. If the token cannot be acquired (i.e. currently processing at max concurrency), you can return an error indicating that this task was not processed due to rate limiting and configure
IsFailureandRetryDelayFuncfunctions to suit your needs (See the Rate Limit wiki page, and the example below).If you need to restrict the max concurrency of a certain task type across multiple processes, you'd need a more sophisticated way to limit the processing(e.g. distributed semaphore).
Example: Limit concurrent processing of task X.
With this set up, there will be up to three task X's active at any given time. Let me know if you have more questions or feedback!
@ajatprabha commented on GitHub (Sep 21, 2021):
Thank you for the detailed answer, the approach makes total sense.
However, in gocraft/work, per queue concurrency rate limiting is distributed in nature out-of-the-box (since it acquires lock using redis itself). Do you see such a feature getting added to asynq as an addition(not the default) maybe?
I would like to understand the pros and cons of doing such a thing in terms of performance, API changes and effort to implement. It's fine if the decision is not to add it.
@hibiken commented on GitHub (Sep 23, 2021):
I think we can provide a rate limiter implementation using redis.
Usage example would look similar to other rate limit examples. I'm currently thinking something like the following:
Let me know if this solution works for you, or please let me know if you have any feedback :)
@ajatprabha commented on GitHub (Sep 25, 2021):
This looks good. I like that this is still opt-in!
@ajatprabha commented on GitHub (Oct 18, 2021):
hey @hibiken, any updates on this, if this is on the roadmap anytime soon?
I can help with the implementation if you'd like, I don't have experience working with
module/x/abctype of experimental packages though.@hibiken commented on GitHub (Oct 18, 2021):
I'm currently working on #265 and it may take another week or so to get to another feature. I can prioritize this feature request if needed.
But if you are interested, please feel free to open a PR for this one. I'm happy with the API described in the comment above.
Since Redis executes commands/Lua-scripts in a single-thread fashion, we can simply increment/decrement the counter to acquire/release tokens for a given semaphore (I think).
Me neither, but we can figure this out together 👍
@ajatprabha commented on GitHub (Oct 18, 2021):
Sure, I'll pick this up. Might have to go through the style/structure of asynq codebase. Do share any pointers to keep in mind while working on this PR, like the scripting style in rdb package.
@ajatprabha commented on GitHub (Oct 18, 2021):
Turns out
xinx/abcdenotes external and not experimental. Let me know your thoughts on where we should put experimental stuff.@hibiken commented on GitHub (Oct 18, 2021):
I think that's fine! I've seen things like
/contrib,/experimental, but I like the short path name/xso let's go withgithub.com/hibiken/asynq/xdirectory for external/experimental packages 👍