[GH-ISSUE #328] [FEATURE REQUEST] Distributed concurrency support per queue / taskType #1159

Closed
opened 2026-03-07 22:06:38 +03:00 by kerem · 9 comments
Owner

Originally created by @ajatprabha on GitHub (Sep 18, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/328

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
N/A

Describe the solution you'd like
Is it possible to set concurrency individually for every queue like it is supported here.

Describe alternatives you've considered
Currently I tried to wire up the server in a way where I calculate total concurrency and set max priority and assuming that the combination will provide some sort of similar effects as in gocraft/work. Storing the maxConcurrency and priority info per queue and using it for initialisation of the server when starting the adapter (a simple wrapper over asynq).

type Adapter struct {
	taskConfigMap map[string]taskConfig
	tcMutex       sync.RWMutex
}

type taskConfig struct {
	priority       int
	maxConcurrency int
}

func (a *Adapter) server() *asynq.Server {
	return asynq.NewServer(a.redisConnOpt, asynq.Config{
		Concurrency:    a.concurrency(),
		Queues:         a.queuePriorityMap(),
	})
}

func (a *Adapter) queuePriorityMap() map[string]int {
	a.tcMutex.RLock()
	defer a.tcMutex.RUnlock()

	m := make(map[string]int)

	for tn, cfg := range f.taskConfigMap {
		if cfg.priority == 0 {
			m[tn] = 1
			continue
		}

		m[tn] = cfg.priority
	}

	return m
}

func (a *Adapter) concurrency() int {
	a.tcMutex.RLock()
	defer a.tcMutex.RUnlock()

	concurrency := 0
	for _, cfg := range f.taskConfigMap {
		if cfg.maxConcurrency == 0 {
			concurrency++
			continue
		}

		concurrency += cfg.maxConcurrency
	}

	if rtcpu := runtime.NumCPU(); concurrency < rtcpu {
		return rtcpu
	}

	return concurrency
}

Additional context
Ref: Priority Sampler from gocraft/work

Originally created by @ajatprabha on GitHub (Sep 18, 2021). Original GitHub issue: https://github.com/hibiken/asynq/issues/328 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** N/A **Describe the solution you'd like** Is it possible to set concurrency individually for every queue like it is supported [here](https://github.com/gocraft/work#job-concurrency). **Describe alternatives you've considered** Currently I tried to wire up the server in a way where I calculate total concurrency and set max priority and assuming that the combination will provide some sort of similar effects as in `gocraft/work`. Storing the maxConcurrency and priority info per queue and using it for initialisation of the server when starting the adapter (a simple wrapper over asynq). ```go type Adapter struct { taskConfigMap map[string]taskConfig tcMutex sync.RWMutex } type taskConfig struct { priority int maxConcurrency int } func (a *Adapter) server() *asynq.Server { return asynq.NewServer(a.redisConnOpt, asynq.Config{ Concurrency: a.concurrency(), Queues: a.queuePriorityMap(), }) } func (a *Adapter) queuePriorityMap() map[string]int { a.tcMutex.RLock() defer a.tcMutex.RUnlock() m := make(map[string]int) for tn, cfg := range f.taskConfigMap { if cfg.priority == 0 { m[tn] = 1 continue } m[tn] = cfg.priority } return m } func (a *Adapter) concurrency() int { a.tcMutex.RLock() defer a.tcMutex.RUnlock() concurrency := 0 for _, cfg := range f.taskConfigMap { if cfg.maxConcurrency == 0 { concurrency++ continue } concurrency += cfg.maxConcurrency } if rtcpu := runtime.NumCPU(); concurrency < rtcpu { return rtcpu } return concurrency } ``` **Additional context** Ref: Priority Sampler from [gocraft/work](https://github.com/gocraft/work/blob/5959e69ad211c5ca37ffdf3ede02e35a5ae41d98/priority_sampler.go#L21)
kerem 2026-03-07 22:06:38 +03:00
Author
Owner

@hibiken commented on GitHub (Sep 20, 2021):

@ajatprabha Thank you for the question.

Currently Asynq doesn't support per queue concurrency rate limiting out of the box. However, it's possible to configure this by a pattern described in the Rate Limiting wiki page.
If you only need to limit the max concurrency of a certain task type within a single process, I think you can create a counting semaphore and check before you handle the task. If the token cannot be acquired (i.e. currently processing at max concurrency), you can return an error indicating that this task was not processed due to rate limiting and configure IsFailure and RetryDelayFunc functions to suit your needs (See the Rate Limit wiki page, and the example below).

If you need to restrict the max concurrency of a certain task type across multiple processes, you'd need a more sophisticated way to limit the processing(e.g. distributed semaphore).

Example: Limit concurrent processing of task X.

// Counting semaphore to limit the number of concurrent processing of task X.
var sema chan struct{}
// Allow up to three concurrent processing of task X.
const concurrency = 3

func init() {
     sema = make(chan struct{}, concurrency)
}

func HandleTaskX(ctx context.Context, task *asynq.Task) error {
    // Check if we can process the task
    select {
    case sema <- struct{}{}:
         // token acquired, defer releasing of the token
         defer func() { <-sema }()
     default:
         // cannot acquire token, retry the task later
         return &RateLimitError{RetryIn: 30 * time.Second }
    }

    // processing logic here
}

// RateLimitError indicates the task was not processed due to rate limiting.
type RateLimitError struct {
     RetryIn time.Duration
}

func (e *RateLimitError) Error() string { return "rate limited" }

func IsRateLimitError(err error) bool {
     _, ok := err.(*RateLimitError)
    return ok
}

func retryDelayFunc(n int, err error, task *asynq.Task) time.Duration {
    rateLimitErr, ok := err.(*RateLimitError)
    if ok {
         return rateLimitErr.RetryIn
    }
    return asynq.DefaultRetryDelayFunc(n, err, task)
}

func main() {
     srv := asynq.NewServer{redisConnOpt, asynq.Config{
         RetryDelayFunc: retryDelayFunc,
         IsFailure: func(err error) { return !IsRateLimitError(err) }
         Concurrency: 10, // this is server level concurrency
     })
     // ... set up handler and run the server
}

With this set up, there will be up to three task X's active at any given time. Let me know if you have more questions or feedback!

<!-- gh-comment-id:923411317 --> @hibiken commented on GitHub (Sep 20, 2021): @ajatprabha Thank you for the question. Currently Asynq doesn't support per queue concurrency rate limiting out of the box. However, it's possible to configure this by a pattern described in[ the Rate Limiting wiki page](https://github.com/hibiken/asynq/wiki/Rate-Limiting). If you only need to limit the max concurrency of a certain task type *within a single process*, I think you can create a counting semaphore and check before you handle the task. If the token cannot be acquired (i.e. currently processing at max concurrency), you can return an error indicating that this task was not processed due to rate limiting and configure `IsFailure` and `RetryDelayFunc` functions to suit your needs (See the Rate Limit wiki page, and the example below). If you need to restrict the max concurrency of a certain task type across multiple processes, you'd need a more sophisticated way to limit the processing(e.g. distributed semaphore). Example: Limit concurrent processing of task X. ```go // Counting semaphore to limit the number of concurrent processing of task X. var sema chan struct{} // Allow up to three concurrent processing of task X. const concurrency = 3 func init() { sema = make(chan struct{}, concurrency) } func HandleTaskX(ctx context.Context, task *asynq.Task) error { // Check if we can process the task select { case sema <- struct{}{}: // token acquired, defer releasing of the token defer func() { <-sema }() default: // cannot acquire token, retry the task later return &RateLimitError{RetryIn: 30 * time.Second } } // processing logic here } // RateLimitError indicates the task was not processed due to rate limiting. type RateLimitError struct { RetryIn time.Duration } func (e *RateLimitError) Error() string { return "rate limited" } func IsRateLimitError(err error) bool { _, ok := err.(*RateLimitError) return ok } func retryDelayFunc(n int, err error, task *asynq.Task) time.Duration { rateLimitErr, ok := err.(*RateLimitError) if ok { return rateLimitErr.RetryIn } return asynq.DefaultRetryDelayFunc(n, err, task) } func main() { srv := asynq.NewServer{redisConnOpt, asynq.Config{ RetryDelayFunc: retryDelayFunc, IsFailure: func(err error) { return !IsRateLimitError(err) } Concurrency: 10, // this is server level concurrency }) // ... set up handler and run the server } ``` With this set up, there will be up to three task X's active at any given time. Let me know if you have more questions or feedback!
Author
Owner

@ajatprabha commented on GitHub (Sep 21, 2021):

Thank you for the detailed answer, the approach makes total sense.

However, in gocraft/work, per queue concurrency rate limiting is distributed in nature out-of-the-box (since it acquires lock using redis itself). Do you see such a feature getting added to asynq as an addition(not the default) maybe?

I would like to understand the pros and cons of doing such a thing in terms of performance, API changes and effort to implement. It's fine if the decision is not to add it.

<!-- gh-comment-id:923712067 --> @ajatprabha commented on GitHub (Sep 21, 2021): Thank you for the detailed answer, the approach makes total sense. However, in gocraft/work, per queue concurrency rate limiting is distributed in nature out-of-the-box (since it acquires lock using redis itself). Do you see such a feature getting added to asynq as an addition(not the default) maybe? I would like to understand the pros and cons of doing such a thing in terms of performance, API changes and effort to implement. It's fine if the decision is not to add it.
Author
Owner

@hibiken commented on GitHub (Sep 23, 2021):

I think we can provide a rate limiter implementation using redis.

Usage example would look similar to other rate limit examples. I'm currently thinking something like the following:

// Export this type from an experimental package directory
import "github.com/hibiken/asynq/x/rate"

var redisConnOpt = asynq.RedisClientOpt{Addr: ":6379"}
// Second argument is the name of this semaphore, it could be scoped to a queue, task-type, etc
var sema = rate.NewSemaphore(redisConnOpt, "my_queue", /* max_concurrency=*/10)

func HandleTaskX(ctx context.Context, task *asynq.Task) error {
    // Check if we can acquire a token, if not return an RateLimitError.
    if !sema.Acquire(ctx) {
        return &RateLimtError{RetryIn: 30 * time.Second}
    }
    // Make sure to release the token once we're done.
    defer sema.Release(ctx)
    // ... handle task
}

Let me know if this solution works for you, or please let me know if you have any feedback :)

<!-- gh-comment-id:926013359 --> @hibiken commented on GitHub (Sep 23, 2021): I think we can provide a rate limiter implementation using redis. Usage example would look similar to other rate limit examples. I'm currently thinking something like the following: ```go // Export this type from an experimental package directory import "github.com/hibiken/asynq/x/rate" var redisConnOpt = asynq.RedisClientOpt{Addr: ":6379"} // Second argument is the name of this semaphore, it could be scoped to a queue, task-type, etc var sema = rate.NewSemaphore(redisConnOpt, "my_queue", /* max_concurrency=*/10) func HandleTaskX(ctx context.Context, task *asynq.Task) error { // Check if we can acquire a token, if not return an RateLimitError. if !sema.Acquire(ctx) { return &RateLimtError{RetryIn: 30 * time.Second} } // Make sure to release the token once we're done. defer sema.Release(ctx) // ... handle task } ``` Let me know if this solution works for you, or please let me know if you have any feedback :)
Author
Owner

@ajatprabha commented on GitHub (Sep 25, 2021):

This looks good. I like that this is still opt-in!

<!-- gh-comment-id:927092485 --> @ajatprabha commented on GitHub (Sep 25, 2021): This looks good. I like that this is still opt-in!
Author
Owner

@ajatprabha commented on GitHub (Oct 18, 2021):

hey @hibiken, any updates on this, if this is on the roadmap anytime soon?
I can help with the implementation if you'd like, I don't have experience working with module/x/abc type of experimental packages though.

<!-- gh-comment-id:945595410 --> @ajatprabha commented on GitHub (Oct 18, 2021): hey @hibiken, any updates on this, if this is on the roadmap anytime soon? I can help with the implementation if you'd like, I don't have experience working with `module/x/abc` type of experimental packages though.
Author
Owner

@hibiken commented on GitHub (Oct 18, 2021):

I'm currently working on #265 and it may take another week or so to get to another feature. I can prioritize this feature request if needed.

But if you are interested, please feel free to open a PR for this one. I'm happy with the API described in the comment above.

Since Redis executes commands/Lua-scripts in a single-thread fashion, we can simply increment/decrement the counter to acquire/release tokens for a given semaphore (I think).

I don't have experience working with module/x/abc type of experimental packages though.

Me neither, but we can figure this out together 👍

<!-- gh-comment-id:945721072 --> @hibiken commented on GitHub (Oct 18, 2021): I'm currently working on #265 and it may take another week or so to get to another feature. I can prioritize this feature request if needed. But if you are interested, please feel free to open a PR for this one. I'm happy with the API described in the comment above. Since Redis executes commands/Lua-scripts in a single-thread fashion, we can simply increment/decrement the counter to acquire/release tokens for a given semaphore (I think). > I don't have experience working with module/x/abc type of experimental packages though. Me neither, but we can figure this out together 👍
Author
Owner

@ajatprabha commented on GitHub (Oct 18, 2021):

Sure, I'll pick this up. Might have to go through the style/structure of asynq codebase. Do share any pointers to keep in mind while working on this PR, like the scripting style in rdb package.

<!-- gh-comment-id:945820345 --> @ajatprabha commented on GitHub (Oct 18, 2021): Sure, I'll pick this up. Might have to go through the style/structure of asynq codebase. Do share any pointers to keep in mind while working on this PR, like the scripting style in rdb package.
Author
Owner

@ajatprabha commented on GitHub (Oct 18, 2021):

Turns out x in x/abc denotes external and not experimental. Let me know your thoughts on where we should put experimental stuff.

<!-- gh-comment-id:945894864 --> @ajatprabha commented on GitHub (Oct 18, 2021): [Turns out](https://groups.google.com/g/golang-nuts/c/eD8dh3T9yyA/m/l5Ail-xfMiAJ) `x` in `x/abc` denotes external and not experimental. Let me know your thoughts on where we should put experimental stuff.
Author
Owner

@hibiken commented on GitHub (Oct 18, 2021):

I think that's fine! I've seen things like /contrib, /experimental, but I like the short path name /x so let's go with github.com/hibiken/asynq/x directory for external/experimental packages 👍

<!-- gh-comment-id:945913123 --> @hibiken commented on GitHub (Oct 18, 2021): I think that's fine! I've seen things like `/contrib`, `/experimental`, but I like the short path name `/x` so let's go with `github.com/hibiken/asynq/x` directory for external/experimental packages 👍
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1159
No description provided.