[PR #1066] Support Dynamic Queue Names with Wildcard Priorities config #3024

Open
opened 2026-03-15 21:15:59 +03:00 by kerem · 0 comments
Owner

📋 Pull Request Information

Original PR: https://github.com/hibiken/asynq/pull/1066
Author: @arttor
Created: 8/1/2025
Status: 🔄 Open

Base: masterHead: dynamic-queue


📝 Commits (1)

  • b2c47d0 support dynamic queue names

📊 Changes

21 files changed (+1464 additions, -228 deletions)

View changed files

📝 aggregator.go (+5 -5)
📝 aggregator_test.go (+1 -1)
📝 forwarder.go (+7 -8)
📝 forwarder_test.go (+6 -3)
📝 go.mod (+1 -1)
📝 heartbeat.go (+22 -23)
📝 heartbeat_test.go (+16 -18)
📝 inspector.go (+2 -0)
📝 internal/base/base.go (+2 -0)
📝 internal/rdb/rdb.go (+13 -0)
📝 internal/testbroker/testbroker.go (+9 -0)
📝 janitor.go (+5 -5)
📝 janitor_test.go (+6 -3)
📝 processor.go (+4 -40)
📝 processor_test.go (+114 -95)
queue_manager.go (+356 -0)
queue_manager_test.go (+803 -0)
📝 recoverer.go (+7 -6)
📝 recoverer_test.go (+1 -1)
📝 server.go (+63 -19)

...and 1 more files

📄 Description

Closes #856

Allows workers to process dynamic queue names. Dynamic queue priorities can be set using wildcard (*) patterns:

	srv := NewServer(redisConnOpt, Config{
		Queues: map[string]int{
			"critical":          10, // exact match
			"email:*":           5,  // wildcard suffix match
			"email:important:*": 6,  // wildcard longest prefix wins
			"*:transfer:*":      7,  // multiple wildcards
			"*":                 3,  // default
		},
		StrictPriority: true,
		DynamicQueues:  true,
	})
  1. tasks from queue named critical will be processed first (priority 10)
  2. then tasks from queues containing :transfer: in its name (priority 7)
  3. then from queues starting from email:important (priority 6)
  4. then starting from email: (priority 5)
  5. then other queues (priority 3)

Implementation details

  • No changes on client needed.
  • No changes in Redis data structure or migration needed.
  • For now, only strict priority for dynamic queue implemented.
  • PR adds queueManager that periodically polls (by default every 5s) all existing queues from existing Redis SET AllQueues. The manager assigns priorities to new queues based on the wildcard configuration.

Motivation

The feature addresses the use case described in #856, enabling dynamic queue creation at runtime (e.g., queue names based on domain objects - for us it is S3 bucket names for data migration). This is particularly useful for applications requiring flexible queue management without predefined queue names.

Side notes

Apologies for submitting a large PR without a prior design proposal. The API change is minimal, and the implementation is best understood by reviewing the source code. Feedback is welcome!

Future work

This PR lays the groundwork for addressing #850, which proposes dynamic queue priority configuration. Potential next steps include:

  1. Replace Redis SET AllQueues with a Redis SortedSet, using scores to represent queue priorities.
  2. On the first enqueue add new queue to the sorted set with priority -1
  3. Update queueManager to periodically fetch new queues with score -1 and assign priorities based on the dynamic configuration.
  4. Add method to the inspector to change queue priority directly in redis which will automatically affect all workers
  5. Since queue priority now stored in Redis, Dequeue logic can be optimised. Looping over queue list could be moved to dequeue Lua script. It will reduce number of requests to Redis from O(N) to O(1) for each message dequeue operation, where N is number of queues in the config.

🔄 This issue represents a GitHub Pull Request. It cannot be merged through Gitea due to API limitations.

## 📋 Pull Request Information **Original PR:** https://github.com/hibiken/asynq/pull/1066 **Author:** [@arttor](https://github.com/arttor) **Created:** 8/1/2025 **Status:** 🔄 Open **Base:** `master` ← **Head:** `dynamic-queue` --- ### 📝 Commits (1) - [`b2c47d0`](https://github.com/hibiken/asynq/commit/b2c47d0e888369183f670a8e1ad4a4deac10876e) support dynamic queue names ### 📊 Changes **21 files changed** (+1464 additions, -228 deletions) <details> <summary>View changed files</summary> 📝 `aggregator.go` (+5 -5) 📝 `aggregator_test.go` (+1 -1) 📝 `forwarder.go` (+7 -8) 📝 `forwarder_test.go` (+6 -3) 📝 `go.mod` (+1 -1) 📝 `heartbeat.go` (+22 -23) 📝 `heartbeat_test.go` (+16 -18) 📝 `inspector.go` (+2 -0) 📝 `internal/base/base.go` (+2 -0) 📝 `internal/rdb/rdb.go` (+13 -0) 📝 `internal/testbroker/testbroker.go` (+9 -0) 📝 `janitor.go` (+5 -5) 📝 `janitor_test.go` (+6 -3) 📝 `processor.go` (+4 -40) 📝 `processor_test.go` (+114 -95) ➕ `queue_manager.go` (+356 -0) ➕ `queue_manager_test.go` (+803 -0) 📝 `recoverer.go` (+7 -6) 📝 `recoverer_test.go` (+1 -1) 📝 `server.go` (+63 -19) _...and 1 more files_ </details> ### 📄 Description Closes #856 Allows workers to process dynamic queue names. Dynamic queue priorities can be set using wildcard (`*`) patterns: ```go srv := NewServer(redisConnOpt, Config{ Queues: map[string]int{ "critical": 10, // exact match "email:*": 5, // wildcard suffix match "email:important:*": 6, // wildcard longest prefix wins "*:transfer:*": 7, // multiple wildcards "*": 3, // default }, StrictPriority: true, DynamicQueues: true, }) ``` 1. tasks from queue named `critical` will be processed first (priority 10) 2. then tasks from queues containing `:transfer:` in its name (priority 7) 3. then from queues starting from `email:important` (priority 6) 4. then starting from `email:` (priority 5) 5. then other queues (priority 3) ### Implementation details - No changes on client needed. - No changes in Redis data structure or migration needed. - For now, only strict priority for dynamic queue implemented. - PR adds `queueManager` that periodically polls (by default every 5s) all existing queues from existing Redis [SET AllQueues](https://github.com/hibiken/asynq/blob/c327bc40a28e4db45195cfe082d88faa808ce87d/internal/base/base.go#L39). The manager assigns priorities to new queues based on the wildcard configuration. ### Motivation The feature addresses the use case described in #856, enabling dynamic queue creation at runtime (e.g., queue names based on domain objects - for us it is S3 bucket names for data migration). This is particularly useful for applications requiring flexible queue management without predefined queue names. ### Side notes Apologies for submitting a large PR without a prior design proposal. The API change is minimal, and the implementation is best understood by reviewing the source code. Feedback is welcome! ### Future work This PR lays the groundwork for addressing #850, which proposes dynamic queue priority configuration. Potential next steps include: 1. Replace Redis [SET AllQueues](https://github.com/hibiken/asynq/blob/c327bc40a28e4db45195cfe082d88faa808ce87d/internal/base/base.go#L39) with a [Redis SortedSet](https://redis.io/glossary/redis-sorted-sets/), using scores to represent queue priorities. 2. On [the first enqueue](https://github.com/hibiken/asynq/blob/c327bc40a28e4db45195cfe082d88faa808ce87d/internal/rdb/rdb.go#L118) add new queue to the sorted set with priority `-1` 3. Update `queueManager` to periodically fetch new queues with score `-1` and assign priorities based on the dynamic configuration. 4. Add method to the [inspector](https://github.com/hibiken/asynq/blob/master/internal/rdb/inspect.go) to change queue priority directly in redis which will automatically affect all workers 5. Since queue priority now stored in Redis, [Dequeue logic](https://github.com/hibiken/asynq/blob/c327bc40a28e4db45195cfe082d88faa808ce87d/internal/rdb/rdb.go#L246-L259) can be optimised. Looping over queue list could be moved to dequeue Lua script. It will reduce number of requests to Redis from `O(N)` to `O(1)` for each message dequeue operation, where `N` is number of queues in the config. --- <sub>🔄 This issue represents a GitHub Pull Request. It cannot be merged through Gitea due to API limitations.</sub>
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#3024
No description provided.