[GH-ISSUE #250] [FEATURE REQUEST] Ability add workers dynamically on a server #1107

Open
opened 2026-03-07 22:05:46 +03:00 by kerem · 12 comments
Owner

Originally created by @sujit-baniya on GitHub (Mar 11, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/250

Originally assigned to: @hibiken on GitHub.

Currently whenever the task is queued, it creates queue dynamically. For a server to allow to consume the queue, server needs to be stopped, add queue and queue handlers on Mux manually and then start the server.

In my view, creating the workers dynamically is important feature (like in RabbitMQ) without manually intervening server operation.

Originally created by @sujit-baniya on GitHub (Mar 11, 2021). Original GitHub issue: https://github.com/hibiken/asynq/issues/250 Originally assigned to: @hibiken on GitHub. Currently whenever the task is queued, it creates queue dynamically. For a server to allow to consume the queue, server needs to be stopped, add queue and queue handlers on Mux manually and then start the server. In my view, creating the workers dynamically is important feature (like in RabbitMQ) without manually intervening server operation.
Author
Owner

@hibiken commented on GitHub (Mar 11, 2021):

@sujit-baniya thank you for creating this issue!

I think this is a commonly requested feature. I'll come up with a design/proposal and update this thread again 👍

<!-- gh-comment-id:796451732 --> @hibiken commented on GitHub (Mar 11, 2021): @sujit-baniya thank you for creating this issue! I think this is a commonly requested feature. I'll come up with a design/proposal and update this thread again 👍
Author
Owner

@sujit-baniya commented on GitHub (Apr 5, 2022):

@hibiken Not sure if it's correct way of adding handler dynamically, this is working for me so far


// Checks server state and returns an error if pre-condition is not met.
// Otherwise it sets the server state to active.
func (srv *Server) start() error {
	srv.state.mu.Lock()
	defer srv.state.mu.Unlock()
	switch srv.state.value {
	case srvStateActive:
		return fmt.Errorf("asynq: the server is already running")
	case srvStateStopped:
		return fmt.Errorf("asynq: the server is in the stopped state. Waiting for shutdown.")
	case srvStateClosed:
		return ErrServerClosed
	}
	srv.state.value = srvStateActive
	return nil
}

func (srv *Server) AddHandler(handler *ServeMux) {
	srv.handler = handler
}

func (srv *Server) AddQueueHandler(queue string, handler func(ctx context.Context, t *Task) error) {
	srv.handler.HandleFunc(queue, handler)
}

func (srv *Server) AddQueues(queues map[string]int) {
	srv.queues = queues
	for queue := range srv.queues {
		srv.forwarder.queues = append(srv.forwarder.queues, queue)
		srv.recoverer.queues = append(srv.recoverer.queues, queue)
	}

	srv.heartbeater.queues = srv.queues
	srv.processor.queueConfig = srv.queues
	ques, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority)
	srv.processor.queueConfig = ques
	srv.processor.orderedQueues = orderedQueues
}

func (srv *Server) AddQueue(queue string, prio ...int) {
	priority := 0
	if len(prio) > 0 {
		priority = prio[0]
	}
	srv.queues[queue] = priority
	srv.heartbeater.queues = srv.queues
	srv.forwarder.queues = append(srv.forwarder.queues, queue)
	srv.processor.queueConfig[queue] = priority
	queues, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority)
	srv.processor.queueConfig = queues
	srv.processor.orderedQueues = orderedQueues
	srv.recoverer.queues = append(srv.recoverer.queues, queue)
}

func (srv *Server) RemoveQueue(queue string) {
	var qName []string
	delete(srv.queues, queue)
	for queue := range srv.queues {
		qName = append(qName, queue)
	}
	srv.heartbeater.queues = srv.queues
	srv.forwarder.queues = qName
	srv.processor.queueConfig = srv.queues
	queues, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority)
	srv.processor.queueConfig = queues
	srv.processor.orderedQueues = orderedQueues
	srv.recoverer.queues = qName
}

func (srv *Server) HasQueue(queueName string) bool {
	for _, que := range srv.forwarder.queues {
		if que != queueName {
			return true
		}
	}
	return false
}

func (srv *Server) Tune(concurrency int) {
	srv.concurrency = concurrency
	srv.heartbeater.concurrency = concurrency
	srv.processor.sema = make(chan struct{}, concurrency)
}

func (srv *Server) IsRunning() bool {
	return srv.state.value == srvStateActive
}

func (srv *Server) IsStopped() bool {
	return srv.state.value == srvStateStopped
}

func (srv *Server) IsClosed() bool {
	return srv.state.value == srvStateClosed
}
<!-- gh-comment-id:1088649898 --> @sujit-baniya commented on GitHub (Apr 5, 2022): @hibiken Not sure if it's correct way of adding handler dynamically, this is working for me so far ```go // Checks server state and returns an error if pre-condition is not met. // Otherwise it sets the server state to active. func (srv *Server) start() error { srv.state.mu.Lock() defer srv.state.mu.Unlock() switch srv.state.value { case srvStateActive: return fmt.Errorf("asynq: the server is already running") case srvStateStopped: return fmt.Errorf("asynq: the server is in the stopped state. Waiting for shutdown.") case srvStateClosed: return ErrServerClosed } srv.state.value = srvStateActive return nil } func (srv *Server) AddHandler(handler *ServeMux) { srv.handler = handler } func (srv *Server) AddQueueHandler(queue string, handler func(ctx context.Context, t *Task) error) { srv.handler.HandleFunc(queue, handler) } func (srv *Server) AddQueues(queues map[string]int) { srv.queues = queues for queue := range srv.queues { srv.forwarder.queues = append(srv.forwarder.queues, queue) srv.recoverer.queues = append(srv.recoverer.queues, queue) } srv.heartbeater.queues = srv.queues srv.processor.queueConfig = srv.queues ques, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority) srv.processor.queueConfig = ques srv.processor.orderedQueues = orderedQueues } func (srv *Server) AddQueue(queue string, prio ...int) { priority := 0 if len(prio) > 0 { priority = prio[0] } srv.queues[queue] = priority srv.heartbeater.queues = srv.queues srv.forwarder.queues = append(srv.forwarder.queues, queue) srv.processor.queueConfig[queue] = priority queues, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority) srv.processor.queueConfig = queues srv.processor.orderedQueues = orderedQueues srv.recoverer.queues = append(srv.recoverer.queues, queue) } func (srv *Server) RemoveQueue(queue string) { var qName []string delete(srv.queues, queue) for queue := range srv.queues { qName = append(qName, queue) } srv.heartbeater.queues = srv.queues srv.forwarder.queues = qName srv.processor.queueConfig = srv.queues queues, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority) srv.processor.queueConfig = queues srv.processor.orderedQueues = orderedQueues srv.recoverer.queues = qName } func (srv *Server) HasQueue(queueName string) bool { for _, que := range srv.forwarder.queues { if que != queueName { return true } } return false } func (srv *Server) Tune(concurrency int) { srv.concurrency = concurrency srv.heartbeater.concurrency = concurrency srv.processor.sema = make(chan struct{}, concurrency) } func (srv *Server) IsRunning() bool { return srv.state.value == srvStateActive } func (srv *Server) IsStopped() bool { return srv.state.value == srvStateStopped } func (srv *Server) IsClosed() bool { return srv.state.value == srvStateClosed } ```
Author
Owner

@realmicro commented on GitHub (Apr 12, 2022):

@sujit-baniya
Can you list the completed code?
Thanks

<!-- gh-comment-id:1096718594 --> @realmicro commented on GitHub (Apr 12, 2022): @sujit-baniya Can you list the completed code? Thanks
Author
Owner

@sujit-baniya commented on GitHub (Apr 16, 2022):

@realmicro The full code for server.go

https://gist.github.com/sujit-baniya/4d7145c1adb2f97d80955fb31858716e

<!-- gh-comment-id:1100687206 --> @sujit-baniya commented on GitHub (Apr 16, 2022): @realmicro The full code for server.go https://gist.github.com/sujit-baniya/4d7145c1adb2f97d80955fb31858716e
Author
Owner

@ghstahl commented on GitHub (Apr 27, 2022):

Another approach, related to the following https://github.com/hibiken/asynq/issues/439, is to dynamically shutdown the whole server and restart it. The theory is that the server must be in a certain state and the simplest thing for me to do was shut it down and bring it back up with the new settings.

You have to manage the go routine that the asynq server runs in, but that is not that much code.

A scenario with a single app hosting 2 asynq servers:
A first go routine hosts an entire asynq server with all its settings (queues from config 1, what handlers from config 1)
A second go routine hosts an entire asynq server with all its settings (queues from config 2, what handlers from config 2)

Again, it was much simpler to just shut the thing down and restart a given go routine with new settings.

<!-- gh-comment-id:1110375350 --> @ghstahl commented on GitHub (Apr 27, 2022): Another approach, related to the following https://github.com/hibiken/asynq/issues/439, is to dynamically shutdown the whole server and restart it. The theory is that the server must be in a certain state and the simplest thing for me to do was shut it down and bring it back up with the new settings. You have to manage the go routine that the asynq server runs in, but that is not that much code. A scenario with a single app hosting 2 asynq servers: A first go routine hosts an entire asynq server with all its settings (queues from config 1, what handlers from config 1) A second go routine hosts an entire asynq server with all its settings (queues from config 2, what handlers from config 2) Again, it was much simpler to just shut the thing down and restart a given go routine with new settings.
Author
Owner

@archit-harness commented on GitHub (Sep 16, 2022):

@sujit-baniya @hibiken were we able to solve this, that we can request dequeue from any given queue name?

<!-- gh-comment-id:1249344967 --> @archit-harness commented on GitHub (Sep 16, 2022): @sujit-baniya @hibiken were we able to solve this, that we can request dequeue from any given queue name?
Author
Owner

@sujit-baniya commented on GitHub (Sep 16, 2022):

@archit-harness I've been using https://gist.github.com/sujit-baniya/4d7145c1adb2f97d80955fb31858716e to solve this. It's working fine as per my need. @hibiken might consider working on it later as per his priority

<!-- gh-comment-id:1249349159 --> @sujit-baniya commented on GitHub (Sep 16, 2022): @archit-harness I've been using https://gist.github.com/sujit-baniya/4d7145c1adb2f97d80955fb31858716e to solve this. It's working fine as per my need. @hibiken might consider working on it later as per his priority
Author
Owner

@archit-harness commented on GitHub (Sep 16, 2022):

Thanks @sujit-baniya for such quick response, i see there are many differences in new server.go file and your file which you have shared, but I think the main difference is added new functions above right? https://github.com/hibiken/asynq/issues/250#issuecomment-1088649898

So basically now our workers can add listening to queues and remove it.

Few Questions -

  1. Did you see any performance issues with it?
  2. How many queues are you trying with it? The reason is my use-case is to create queues for each user which will be max 50K, Do you see any issues with it?

For first point, what we tried to benchmark is, suppose we create 1k queues having 10 items each and our server has 1k queues to consumer from vs all items in single queue and consumer reading from it, the latter has much better performance.

<!-- gh-comment-id:1249363127 --> @archit-harness commented on GitHub (Sep 16, 2022): Thanks @sujit-baniya for such quick response, i see there are many differences in new server.go file and your file which you have shared, but I think the main difference is added new functions above right? https://github.com/hibiken/asynq/issues/250#issuecomment-1088649898 So basically now our workers can add listening to queues and remove it. Few Questions - 1. Did you see any performance issues with it? 2. How many queues are you trying with it? The reason is my use-case is to create queues for each user which will be max 50K, Do you see any issues with it? For first point, what we tried to benchmark is, suppose we create 1k queues having 10 items each and our server has 1k queues to consumer from vs all items in single queue and consumer reading from it, the latter has much better performance.
Author
Owner

@archit-harness commented on GitHub (Sep 16, 2022):

But for our use-case we cannot club everything into single queue, as we want to allow the worker to run only X number of tasks for each user.

<!-- gh-comment-id:1249364120 --> @archit-harness commented on GitHub (Sep 16, 2022): But for our use-case we cannot club everything into single queue, as we want to allow the worker to run only X number of tasks for each user.
Author
Owner

@archit-harness commented on GitHub (Sep 19, 2022):

@sujit-baniya @hibiken did you get chance to look at my question?

<!-- gh-comment-id:1250750857 --> @archit-harness commented on GitHub (Sep 19, 2022): @sujit-baniya @hibiken did you get chance to look at my question?
Author
Owner

@sujit-baniya commented on GitHub (Sep 19, 2022):

@archit-harness I've not done any benchmarks for my use-case. But I'm using this changes on asynq in on of my products: https://oarkflow.com and working well so far. Multiple Servers and Queues are created on demand and destroyed as required.

but I think the main difference is added new functions above right?

  1. Added new functions
  2. Change in API as well for Server to handle the "Handler"
<!-- gh-comment-id:1250842222 --> @sujit-baniya commented on GitHub (Sep 19, 2022): @archit-harness I've not done any benchmarks for my use-case. But I'm using this changes on asynq in on of my products: https://oarkflow.com and working well so far. Multiple Servers and Queues are created on demand and destroyed as required. > but I think the main difference is added new functions above right? 1) Added new functions 2) Change in API as well for Server to handle the "Handler"
Author
Owner

@gebv commented on GitHub (Aug 1, 2023):

@hibiken Hi! We need functionality related to dynamic queues with a quality of service (qos) of 1 for each queue. Is there a branch where this work is being carried out? I would like to join in the implementation or contribute in any other way to this feature.

<!-- gh-comment-id:1660498611 --> @gebv commented on GitHub (Aug 1, 2023): @hibiken Hi! We need functionality related to dynamic queues with a quality of service (qos) of 1 for each queue. Is there a branch where this work is being carried out? I would like to join in the implementation or contribute in any other way to this feature.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1107
No description provided.