[GH-ISSUE #850] [FEATURE REQUEST] Support changing priority queues at runtime #2439

Open
opened 2026-03-15 20:30:26 +03:00 by kerem · 9 comments
Owner

Originally created by @linhbkhn95 on GitHub (Mar 26, 2024).
Original GitHub issue: https://github.com/hibiken/asynq/issues/850

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
I use asynq at my company and rely on the frequency of the priority queue to determine task ordering. However, there are instances where this order needs to be adjusted.

 Queues: map[string]int{
        "group1": 6,
        "group2":  4
    },

->

 Queues: map[string]int{
        "group1": 4
        "group2":  6
    },

However, I cannot change it immediately in runtime. I have to restart server(pods).
Moreover, my company utilizes a "Dynamic Config Loader" for updating applications at runtime. Therefore, if this feature were to be released, it would prove to be immensely useful
Describe the solution you'd like

  • Should provide change priority queue in runtime like below
func main() {

	srv := asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: redisAddr,
		},
		asynq.Config{
			// Specify how many concurrent workers to use
			Concurrency: 10,
			// Optionally specify multiple queues with different priority.
			Queues: map[string]int{
				"critical": 6,
				"default":  3,
				"low":      1,
			},
			ErrorHandler:   asynq.ErrorHandlerFunc(HandleErrorFunc),
			StrictPriority: true,
		},
	)

	// mux maps a type to a handler
	mux := asynq.NewServeMux()
	mux.HandleFunc(tasks.TypeEmailDelivery, HandleStartPullTask)

	// Reload dynamic config
	// set queuesCfg
	// cfg is application config
	// 	asynq:
	//      strictPriority: true
	//		queues:
	//			map[string]int{
	//				"critical": 3,
	//				"default":  6,
	//				"low":      1,
	//			}
	// 
	 srv.SetQueues(cfg.asynq.queues,cfg,cfg.asynq.strictPriority)

	// mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
	// ...register other handlers...
	//
	if err := srv.Run(mux); err != nil {
		log.Fatalf("could not run server: %v", err)
	}
}

Describe alternatives you've considered

Additional context
We can see a draft solution in my repo
https://github.com/linhbkhn95/asynq/pull/6/files
@hibiken @kamikazechaser How's about think?

Originally created by @linhbkhn95 on GitHub (Mar 26, 2024). Original GitHub issue: https://github.com/hibiken/asynq/issues/850 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** I use asynq at my company and rely on the frequency of the priority queue to determine task ordering. However, there are instances where this order needs to be adjusted. ```go Queues: map[string]int{ "group1": 6, "group2": 4 }, ```` -> ```go Queues: map[string]int{ "group1": 4 "group2": 6 }, ```` However, I cannot change it immediately in runtime. I have to restart server(pods). Moreover, my company utilizes a "Dynamic Config Loader" for updating applications at runtime. Therefore, if this feature were to be released, it would prove to be immensely useful **Describe the solution you'd like** - Should provide change priority queue in runtime like below ```go func main() { srv := asynq.NewServer( asynq.RedisClientOpt{ Addr: redisAddr, }, asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, // Optionally specify multiple queues with different priority. Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, ErrorHandler: asynq.ErrorHandlerFunc(HandleErrorFunc), StrictPriority: true, }, ) // mux maps a type to a handler mux := asynq.NewServeMux() mux.HandleFunc(tasks.TypeEmailDelivery, HandleStartPullTask) // Reload dynamic config // set queuesCfg // cfg is application config // asynq: // strictPriority: true // queues: // map[string]int{ // "critical": 3, // "default": 6, // "low": 1, // } // srv.SetQueues(cfg.asynq.queues,cfg,cfg.asynq.strictPriority) // mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor()) // ...register other handlers... // if err := srv.Run(mux); err != nil { log.Fatalf("could not run server: %v", err) } } ``` **Describe alternatives you've considered** **Additional context** We can see a draft solution in my repo https://github.com/linhbkhn95/asynq/pull/6/files @hibiken @kamikazechaser How's about think?
Author
Owner

@kamikazechaser commented on GitHub (Mar 26, 2024):

I can see such a feature being added if the underlying p.queueConfig map is replaced/updated in a safe way. However, this could set a precedent to update almost every underlying config at runtime 😄.

<!-- gh-comment-id:2020489029 --> @kamikazechaser commented on GitHub (Mar 26, 2024): I can see such a feature being added if the underlying `p.queueConfig` map is replaced/updated in a safe way. However, this could set a precedent to update almost every underlying config at runtime 😄.
Author
Owner

@linhbkhn95 commented on GitHub (Mar 26, 2024):

@kamikazechaser It is a good approach for applying dynamic config loader in most company without restarting app.
I also use for feature flags as a exprirement for new features.
https://github.com/uber/piranha

<!-- gh-comment-id:2020551003 --> @linhbkhn95 commented on GitHub (Mar 26, 2024): @kamikazechaser It is a good approach for applying `dynamic config loader` in most company without restarting app. I also use for `feature flags` as a exprirement for new features. https://github.com/uber/piranha
Author
Owner

@linhbkhn95 commented on GitHub (Mar 26, 2024):

Hi @kamikazechaser, do u use telegram? I was just out of work, I wanna contribute more open-source project like that

<!-- gh-comment-id:2020557373 --> @linhbkhn95 commented on GitHub (Mar 26, 2024): Hi @kamikazechaser, do u use telegram? I was just out of work, I wanna contribute more open-source project like that
Author
Owner

@kamikazechaser commented on GitHub (Mar 26, 2024):

Hi @kamikazechaser, do u use telegram? I was just out of work, I wanna contribute more open-source project like that

Yes. On the same username.

<!-- gh-comment-id:2020644621 --> @kamikazechaser commented on GitHub (Mar 26, 2024): > Hi @kamikazechaser, do u use telegram? I was just out of work, I wanna contribute more open-source project like that Yes. On the same username.
Author
Owner

@linhbkhn95 commented on GitHub (Mar 26, 2024):

I cant find it. my telegram username is linhbkhn95. Can u message me?

<!-- gh-comment-id:2020722479 --> @linhbkhn95 commented on GitHub (Mar 26, 2024): I cant find it. my telegram username is `linhbkhn95`. Can u message me?
Author
Owner

@kamikazechaser commented on GitHub (Mar 26, 2024):

I cant find it. my telegram username is linhbkhn95. Can u message me?

Sent you a message from my alt, my main is kc^.

<!-- gh-comment-id:2020787908 --> @kamikazechaser commented on GitHub (Mar 26, 2024): > I cant find it. my telegram username is `linhbkhn95`. Can u message me? Sent you a message from my alt, my main is kc^.
Author
Owner

@linhbkhn95 commented on GitHub (Mar 27, 2024):

furthermore, For dynamic config loader, if we use a logger, we can change the log level at runtime by the SetLevel method without restart pod(server)

// SetLevel alters the logging level.
func (lvl AtomicLevel) SetLevel(l zapcore.Level) {
	lvl.l.Store(int32(l))
}

https://github.com/uber-go/zap/blob/master/level.go#L122
@kamikazechaser

<!-- gh-comment-id:2022115566 --> @linhbkhn95 commented on GitHub (Mar 27, 2024): furthermore, For `dynamic config loader`, if we use a logger, we can change the log level at runtime by the `SetLevel` method without restart pod(server) ``` // SetLevel alters the logging level. func (lvl AtomicLevel) SetLevel(l zapcore.Level) { lvl.l.Store(int32(l)) } ``` https://github.com/uber-go/zap/blob/master/level.go#L122 @kamikazechaser
Author
Owner

@linhbkhn95 commented on GitHub (May 6, 2024):

Hi @hibiken @kamikazechaser
Can you guys tell me about what we should do next step?

<!-- gh-comment-id:2096209457 --> @linhbkhn95 commented on GitHub (May 6, 2024): Hi @hibiken @kamikazechaser Can you guys tell me about what we should do next step?
Author
Owner

@kamikazechaser commented on GitHub (May 7, 2024):

There are a lot of requests for such a feature, so why not. Something like Server.SetQueuesPriority(map[string]int) error would work. There would need to be checks for whether the new map has the same keys as the old one and whether strictPriority is enabled in the current config e.t.c. and all these behind a mutex guard.

Do you have something similar in mind?

<!-- gh-comment-id:2097469675 --> @kamikazechaser commented on GitHub (May 7, 2024): There are a lot of requests for such a feature, so why not. Something like `Server.SetQueuesPriority(map[string]int) error` would work. There would need to be checks for whether the new map has the same keys as the old one and whether strictPriority is enabled in the current config e.t.c. and all these behind a mutex guard. Do you have something similar in mind?
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2439
No description provided.