[GH-ISSUE #194] [FEATURE REQUEST] Implement some feature if not already exists #61

Closed
opened 2026-03-02 05:18:21 +03:00 by kerem · 6 comments
Owner

Originally created by @sujit-baniya on GitHub (Sep 15, 2020).
Original GitHub issue: https://github.com/hibiken/asynq/issues/194

Originally assigned to: @hibiken on GitHub.

Thanks for this great library.

I was wondering if following features already exists in this library:

  • Filter or intercept the task on queue and do operations like Stop/Pause/Resume tasks
  • Move tasks based on filters or intercept to other queue
  • Create queue and workers dynamically
  • Increase or decrease worker concurrency at runtime
  • Add/Remove workers on different servers from a Queue

Basically I'm trying to implement this library for in one of my project where I could manage tasks, queues and workers pool.

Originally created by @sujit-baniya on GitHub (Sep 15, 2020). Original GitHub issue: https://github.com/hibiken/asynq/issues/194 Originally assigned to: @hibiken on GitHub. Thanks for this great library. I was wondering if following features already exists in this library: - Filter or intercept the task on queue and do operations like Stop/Pause/Resume tasks - Move tasks based on filters or intercept to other queue - Create queue and workers dynamically - Increase or decrease worker concurrency at runtime - Add/Remove workers on different servers from a Queue Basically I'm trying to implement this library for in one of my project where I could manage tasks, queues and workers pool.
kerem 2026-03-02 05:18:21 +03:00
Author
Owner

@hibiken commented on GitHub (Sep 15, 2020):

@itsursujit Thanks for opening this issue! These are all very interesting features.


Filter or intercept the task on queue and do operations like Stop/Pause/Resume tasks

We have PauseQueue and UnpauseQueue methods on Inspector. Also, the CLI has the pause and unpause commands.
Not sure if this is what you need. What do you mean by "Filter or intercept the task"?


Move tasks based on filters or intercept to other queue

We don't support "move" operation (move a task from one queue to another). This is mainly due to how we deal with multiple keys in Redis cluster. However, you can simulate move operation by first deleting a task and enqueueing the same to another queue. You can delete a task by calling DeleteTaskByKey or delete command in the CLI.


Create queue and workers dynamically

Queues are created dynamically. If you enqueue a task with a queue name that doesn't already exist, the queue will be created for that task.
Example:

// This will create a queue called "foo" if it doesn't already exist.
client.Enqueue(task, asynq.Queue("foo"))

For the consumer side, we currently don't support dynamic queue configuration. The list of queues to monitor is specified at the initialization time with Config.

srv := asynq.NewServer(redisConnOpt, asynq.Config{
    // List of queues to monitor with priority.
    Queues: map[string]int{
        "foo": 1,
        "bar": 2,    
    },
})

I think we can add a feature to support more dynamic queue configuration. Do you have suggestions?


Increase or decrease worker concurrency at runtime

No, we currently don't have this. But I think this is definitely feasible and we can add this feature.


Add/Remove workers on different servers from a Queue

I'm not quite sure what you mean by this. Would you mind explaining more?

<!-- gh-comment-id:693020035 --> @hibiken commented on GitHub (Sep 15, 2020): @itsursujit Thanks for opening this issue! These are all very interesting features. --- > Filter or intercept the task on queue and do operations like Stop/Pause/Resume tasks We have [`PauseQueue`](https://godoc.org/github.com/hibiken/asynq#Inspector.PauseQueue) and [`UnpauseQueue`](https://godoc.org/github.com/hibiken/asynq#Inspector.UnpauseQueue) methods on [`Inspector`](https://godoc.org/github.com/hibiken/asynq#Inspector). Also, the CLI has the `pause` and `unpause` commands. Not sure if this is what you need. What do you mean by _"Filter or intercept the task"_? --- > Move tasks based on filters or intercept to other queue We don't support "move" operation (move a task from one queue to another). This is mainly due to how we deal with multiple keys in Redis cluster. However, you can simulate move operation by first deleting a task and enqueueing the same to another queue. You can delete a task by calling `DeleteTaskByKey` or delete command in the CLI. --- > Create queue and workers dynamically Queues are created dynamically. If you enqueue a task with a queue name that doesn't already exist, the queue will be created for that task. Example: ```go // This will create a queue called "foo" if it doesn't already exist. client.Enqueue(task, asynq.Queue("foo")) ``` For the consumer side, we currently don't support dynamic queue configuration. The list of queues to monitor is specified at the initialization time with `Config`. ```go srv := asynq.NewServer(redisConnOpt, asynq.Config{ // List of queues to monitor with priority. Queues: map[string]int{ "foo": 1, "bar": 2, }, }) ``` I think we can add a feature to support more dynamic queue configuration. Do you have suggestions? --- > Increase or decrease worker concurrency at runtime No, we currently don't have this. But I think this is definitely feasible and we can add this feature. --- > Add/Remove workers on different servers from a Queue I'm not quite sure what you mean by this. Would you mind explaining more?
Author
Owner

@sujit-baniya commented on GitHub (Sep 16, 2020):

Sure,

We have PauseQueue and UnpauseQueue methods on Inspector. Also, the CLI has the pause and unpause commands.
Not sure if this is what you need. What do you mean by "Filter or intercept the task"?

By filter or Intercept the task, I mean before/after the task is put on the queue and consumer has still not consumed the task, I could filter the list of tasks by payload and do operations like Delay such task, Stop them or pause or resume them.

Use-Case:
Users are sending tasks to a dynamically created queue. As an admin, I could filter and group the list of tasks by any field in the payload and view the stats of tasks on queue. This would help me control the tasks per user or payload

I think we can add a feature to support more dynamic queue configuration. Do you have suggestions?

Use-Case:
Users are sending to a queue with 1 worker. They start to send in high volume. As an Admin, I could attach more workers on different servers or same server for that queue. This will help users with high volume of task

I'm not quite sure what you mean by this. Would you mind explaining more?

Since we're using redis for queue. As an Admin, I would add ip addresses of servers where I've deployed the application. The applications are running without workers initiation. By default, queue worker is running on ServerA. I would add entry for queue worker on redis for ServerB. This entry will be picked by application on ServerB and create workers for that queue on ServerB.

<!-- gh-comment-id:693169306 --> @sujit-baniya commented on GitHub (Sep 16, 2020): Sure, > We have PauseQueue and UnpauseQueue methods on Inspector. Also, the CLI has the pause and unpause commands. Not sure if this is what you need. What do you mean by "Filter or intercept the task"? By filter or Intercept the task, I mean before/after the task is put on the queue and consumer has still not consumed the task, I could filter the list of tasks by payload and do operations like Delay such task, Stop them or pause or resume them. Use-Case: Users are sending tasks to a dynamically created queue. As an admin, I could filter and group the list of tasks by any field in the payload and view the stats of tasks on queue. This would help me control the tasks per user or payload > I think we can add a feature to support more dynamic queue configuration. Do you have suggestions? Use-Case: Users are sending to a queue with 1 worker. They start to send in high volume. As an Admin, I could attach more workers on different servers or same server for that queue. This will help users with high volume of task > I'm not quite sure what you mean by this. Would you mind explaining more? Since we're using redis for queue. As an Admin, I would add ip addresses of servers where I've deployed the application. The applications are running without workers initiation. By default, queue worker is running on ServerA. I would add entry for queue worker on redis for ServerB. This entry will be picked by application on ServerB and create workers for that queue on ServerB.
Author
Owner

@sujit-baniya commented on GitHub (Sep 16, 2020):

Also another important use-case could be Shared Vs Dedicated Queue/Workers:

Shared Queue Workers:

  • As a developer, I would define all required queues (with priorities), no. of workers required for each queues
  • Apply different strategies to dispatch to queues with appropriate priorities.
    ROUND-ROBIN strategy - Whichever queue is free, send to that queue irrespective of queue priorities,
    WEIGHTED strategy - Send to queue with appropriate queue based on messages. High volume messages would go with high priority queue,
    CUSTOM strategy - Admin defines the strategy for user. If userA is set to high priority, his messages will go to high priority queue

Probably making use of mux.Handle("queue", handler)

Dedicate Queue Workers:

  • Everything from Shared Queue workers.
  • Queues and workers are created per user dedicated to them only.

Probably making use of mux.HandleFunc("queue:12", handler)

<!-- gh-comment-id:693576497 --> @sujit-baniya commented on GitHub (Sep 16, 2020): Also another important use-case could be Shared Vs Dedicated Queue/Workers: Shared Queue Workers: - As a developer, I would define all required queues (with priorities), no. of workers required for each queues - Apply different strategies to dispatch to queues with appropriate priorities. ROUND-ROBIN strategy - Whichever queue is free, send to that queue irrespective of queue priorities, WEIGHTED strategy - Send to queue with appropriate queue based on messages. High volume messages would go with high priority queue, CUSTOM strategy - Admin defines the strategy for user. If userA is set to high priority, his messages will go to high priority queue Probably making use of `mux.Handle("queue", handler)` Dedicate Queue Workers: - Everything from Shared Queue workers. - Queues and workers are created per user dedicated to them only. Probably making use of `mux.HandleFunc("queue:12", handler)`
Author
Owner

@hibiken commented on GitHub (Sep 16, 2020):

It's difficult to discuss multiple features in one thread 😞 Would you mind opening an issue per feature request?
That way I can tackle each issue one at a time 👍

<!-- gh-comment-id:693716355 --> @hibiken commented on GitHub (Sep 16, 2020): It's difficult to discuss multiple features in one thread 😞 Would you mind opening an issue per feature request? That way I can tackle each issue one at a time 👍
Author
Owner

@sujit-baniya commented on GitHub (Sep 17, 2020):

Sure

<!-- gh-comment-id:693748680 --> @sujit-baniya commented on GitHub (Sep 17, 2020): Sure
Author
Owner

@hibiken commented on GitHub (Sep 20, 2020):

Tracking this issue with #195, #196 and #197 instead.

<!-- gh-comment-id:695686742 --> @hibiken commented on GitHub (Sep 20, 2020): Tracking this issue with #195, #196 and #197 instead.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#61
No description provided.