[GH-ISSUE #195] [FEATURE REQUEST] Create queue workers dynamically #1076

Closed
opened 2026-03-07 22:05:19 +03:00 by kerem · 8 comments
Owner

Originally created by @sujit-baniya on GitHub (Sep 17, 2020).
Original GitHub issue: https://github.com/hibiken/asynq/issues/195

Originally assigned to: @hibiken on GitHub.

Currently the system requires to create and assign workers for a queue on compile time. To add multiple workers to queue, one has to bring up workers for that queue manually which in my view in not scalable.

In my opinion, if we could have following approach of creating workers and assigning them to queue.

  • Ability to bring up cluster of empty worker (no assignment to queue) on different servers
  • Redis store config for workers. For e.g.
[
    {
        "server": "10.1.10.1",
        "queue": "queue-name",
        "worker": 10
    }   
]
  • Workers on different server polls to redis for above config. If their worker IP match the server, worker is created with appropriate concurrency for the queue and wait for task.
  • Same for removal of workers from queue. If the entry in above config is not present, workers are stopped on the server
  • This also allows increase or decrease the concurrency of workers on specific queue and server
Originally created by @sujit-baniya on GitHub (Sep 17, 2020). Original GitHub issue: https://github.com/hibiken/asynq/issues/195 Originally assigned to: @hibiken on GitHub. Currently the system requires to create and assign workers for a queue on compile time. To add multiple workers to queue, one has to bring up workers for that queue manually which in my view in not scalable. In my opinion, if we could have following approach of creating workers and assigning them to queue. - Ability to bring up cluster of empty worker (no assignment to queue) on different servers - Redis store config for workers. For e.g. ```json [ { "server": "10.1.10.1", "queue": "queue-name", "worker": 10 } ] ``` - Workers on different server polls to redis for above config. If their worker IP match the server, worker is created with appropriate concurrency for the queue and wait for task. - Same for removal of workers from queue. If the entry in above config is not present, workers are stopped on the server - This also allows increase or decrease the concurrency of workers on specific queue and server
kerem 2026-03-07 22:05:19 +03:00
Author
Owner

@sujit-baniya commented on GitHub (Sep 17, 2020):

https://github.com/hibiken/asynq/issues/194

<!-- gh-comment-id:693763047 --> @sujit-baniya commented on GitHub (Sep 17, 2020): https://github.com/hibiken/asynq/issues/194
Author
Owner

@sujit-baniya commented on GitHub (Sep 18, 2020):

I don't find the way to get the worker details attached to a queue.

<!-- gh-comment-id:694984993 --> @sujit-baniya commented on GitHub (Sep 18, 2020): I don't find the way to get the worker details attached to a queue.
Author
Owner

@hibiken commented on GitHub (Sep 20, 2020):

Thanks for opening this issue. I agree with some of your points but I don't think I can modify the package to implement all of the features requested.

My current thinking is to add the following features:

  • Allow worker (asynq.Server) to update its concurrency at runtime.
  • Allow worker (asynq.Server) to update queue configs at runtime (add/remove queues from which to consume tasks)

To see which asynq.Servers are consuming from which queues, you can run asynq server ls command .

<!-- gh-comment-id:695682670 --> @hibiken commented on GitHub (Sep 20, 2020): Thanks for opening this issue. I agree with some of your points but I don't think I can modify the package to implement all of the features requested. My current thinking is to add the following features: - Allow worker (`asynq.Server`) to update its concurrency at runtime. - Allow worker (`asynq.Server`) to update queue configs at runtime (add/remove queues from which to consume tasks) To see which `asynq.Server`s are consuming from which queues, you can run `asynq server ls` command .
Author
Owner

@sujit-baniya commented on GitHub (Sep 20, 2020):

@hibiken I cloned the package and modified to accept concurrency and queue at runtime.

Don't know if it's better and optimized but for me it worked


func (srv *Server) Tune(noOfWorkers int) {
	srv.Concurrency = noOfWorkers
	srv.processor.Concurrency = noOfWorkers
	srv.heartbeater.Concurrency = noOfWorkers
}
func (srv *Server) AddQueue(queueName string, priority int) error {
	srv.heartbeater.queues[queueName] = priority
	srv.scheduler.queues = append(srv.scheduler.queues, queueName)
	srv.processor.queueConfig[queueName] = priority
	srv.recoverer.queues = append(srv.recoverer.queues, queueName)
	return nil
}

func (srv *Server) RemoveQueue(queueName string) error {
	q := make(map[string]int)
	var qu []string
	for que, t := range srv.heartbeater.queues {
		if que != queueName {
			q[que] = t
			qu = append(qu, que)
		}
	}
	srv.heartbeater.queues = q
	srv.scheduler.queues = qu
	srv.processor.queueConfig = q
	srv.recoverer.queues = qu
	return nil
}

Also I've updated the server to accept handlers at runtime

func NewServer(r RedisConnOpt, cfg Config) *Server {
	n := cfg.Concurrency
	if n < 1 {
		n = runtime.NumCPU()
	}
	if cfg.Handler == nil {
		cfg.Handler = NewServeMux()
	}
       ...
}
<!-- gh-comment-id:695697574 --> @sujit-baniya commented on GitHub (Sep 20, 2020): @hibiken I cloned the package and modified to accept concurrency and queue at runtime. Don't know if it's better and optimized but for me it worked ```go func (srv *Server) Tune(noOfWorkers int) { srv.Concurrency = noOfWorkers srv.processor.Concurrency = noOfWorkers srv.heartbeater.Concurrency = noOfWorkers } func (srv *Server) AddQueue(queueName string, priority int) error { srv.heartbeater.queues[queueName] = priority srv.scheduler.queues = append(srv.scheduler.queues, queueName) srv.processor.queueConfig[queueName] = priority srv.recoverer.queues = append(srv.recoverer.queues, queueName) return nil } func (srv *Server) RemoveQueue(queueName string) error { q := make(map[string]int) var qu []string for que, t := range srv.heartbeater.queues { if que != queueName { q[que] = t qu = append(qu, que) } } srv.heartbeater.queues = q srv.scheduler.queues = qu srv.processor.queueConfig = q srv.recoverer.queues = qu return nil } ``` Also I've updated the server to accept handlers at runtime ```go func NewServer(r RedisConnOpt, cfg Config) *Server { n := cfg.Concurrency if n < 1 { n = runtime.NumCPU() } if cfg.Handler == nil { cfg.Handler = NewServeMux() } ... } ```
Author
Owner

@sujit-baniya commented on GitHub (Sep 20, 2020):

Now I can do something like:

srv := asynq.NewServer(RC, asynq.Config{
			Concurrency: 100,
		})
		srv.AddQueue(queue, 1)
                srv.Tune(10)
		if err := srv.Start(); err != nil {
			log.Fatal(err)
		}
		srv.Handler.HandleFunc(queue, HandleSmsSendingTask)

https://github.com/itsursujit/asynq/blob/master/examples/worker.go

is example for what I need.
Again! Thanks for this library.. I'll try to modify and check accordingly

<!-- gh-comment-id:695699305 --> @sujit-baniya commented on GitHub (Sep 20, 2020): Now I can do something like: ```go srv := asynq.NewServer(RC, asynq.Config{ Concurrency: 100, }) srv.AddQueue(queue, 1) srv.Tune(10) if err := srv.Start(); err != nil { log.Fatal(err) } srv.Handler.HandleFunc(queue, HandleSmsSendingTask) ``` https://github.com/itsursujit/asynq/blob/master/examples/worker.go is example for what I need. Again! Thanks for this library.. I'll try to modify and check accordingly
Author
Owner

@sujit-baniya commented on GitHub (Sep 20, 2020):

Is there any way to stop server if idle for 30 or stop immediately after n tasks?

<!-- gh-comment-id:695702588 --> @sujit-baniya commented on GitHub (Sep 20, 2020): Is there any way to stop server if idle for 30 or stop immediately after n tasks?
Author
Owner

@hibiken commented on GitHub (Feb 1, 2021):

Currently I'm not intending to support this feature. I'll close this issue for now.

<!-- gh-comment-id:770610604 --> @hibiken commented on GitHub (Feb 1, 2021): Currently I'm not intending to support this feature. I'll close this issue for now.
Author
Owner

@sujit-baniya commented on GitHub (Mar 11, 2021):

@hibiken Maybe you rethink to implement this feature?

Like the queues are created dynamically for each tasks, It would be better if workers could also be created dynamically

<!-- gh-comment-id:796377508 --> @sujit-baniya commented on GitHub (Mar 11, 2021): @hibiken Maybe you rethink to implement this feature? Like the queues are created dynamically for each tasks, It would be better if workers could also be created dynamically
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1076
No description provided.