[GH-ISSUE #214] [QUESTION] Follow-up question: Is it possible to have multiple servers serving different queues? #1088

Closed
opened 2026-03-07 22:05:30 +03:00 by kerem · 3 comments
Owner

Originally created by @sujit-baniya on GitHub (Dec 2, 2020).
Original GitHub issue: https://github.com/hibiken/asynq/issues/214

Originally assigned to: @hibiken on GitHub.

Continued from: https://github.com/hibiken/asynq/issues/213

Thanks for quick reply.
Actually I've cloned your repo and changed most of the things to match my requirement.
One of the change I did was to add Queue and Handlers per queue dynamically.

Most of the changes are working great but I'm stuck here.


func (srv *Server) AddQueue(queueName string, priority int) error {
	srv.heartbeater.queues[queueName] = priority
	srv.forwarder.queues = append(srv.forwarder.queues, queueName)
	srv.processor.queueConfig[queueName] = priority
	srv.recoverer.queues = append(srv.recoverer.queues, queueName)
	return nil
}

func (srv *Server) RemoveQueue(queueName string) error {
	q := make(map[string]int)
	var qu []string
	for que, t := range srv.heartbeater.queues {
		if que != queueName {
			q[que] = t
			qu = append(qu, que)
		}
	}
	srv.heartbeater.queues = q
	srv.forwarder.queues = qu
	srv.processor.queueConfig = q
	srv.recoverer.queues = qu
	return nil
}

I created 2 servers and assigned different queues per server. But both servers are assigned with all queues.

services.InitRedis()
	srv1 := asynq.NewServer(services.RC, asynq.Config{
		Concurrency: 100,
	})
	srv2 := asynq.NewServer(services.RC, asynq.Config{
		Concurrency: 100,
	})

	srv1.AddQueue("srv1-queue", 1)
	srv2.AddQueue("srv2-queue", 1)
	srv1.Handler.HandleFunc("srv1", func(ctx context.Context, task *asynq.Task) error {
		fmt.Println("srv1")
		return nil
	})
	srv2.Handler.HandleFunc("srv2", func(ctx context.Context, task *asynq.Task) error {
		fmt.Println("srv2")
		return nil
	})
	srv1.Start("127.0.0.1")
	srv2.Start("127.0.0.1")

Here, srv1 and srv2 are assigned with both queues srv1-queue and srv2-queue

Is there anything I'm missing here? Any suggestions would be apprecible.

I've some more questions on this tool. Is there any other way I could contact you or Github Issue would work?
Basically I'm changing this tool to do following things dynamically

  1. Add, Remove, Pause, Resume, Assign Servers to Queue
  2. Assign Queue with Handlers to Server

I can do following operations via REST on single asynq server:

	app.Get("/queue/:queue/tasks", GetTasks)
	app.Get("/queue/:queue/start-bulk", StartBulkTasks)
	app.Get("/queue/:queue/pause", PauseQueue)
	app.Get("/queue/:queue/resume", ResumeQueue)
	app.Get("/queue/:queue/send", SingleTask)
	app.Get("/queue/:queue/servers", ServerListByQueue)
	app.Get("/queue/:queue/assign-server", AssignServer)
	app.Get("/queue/:queue/stop", StopQueue)
	app.Get("/queue/:queue/status", QueueStatus)
	app.Get("/queue/:queue/history/:days", QueueHistory)
	app.Get("/cron/:queue/add-task", AddTaskToCron)
	app.Get("/servers/start", StartServer)
	app.Get("/servers", ServerList)
	app.Get("/queues", QueueList)

but this does not work for any multiple server instances (Handle above per server)

Originally created by @sujit-baniya on GitHub (Dec 2, 2020). Original GitHub issue: https://github.com/hibiken/asynq/issues/214 Originally assigned to: @hibiken on GitHub. Continued from: https://github.com/hibiken/asynq/issues/213 Thanks for quick reply. Actually I've cloned your repo and changed most of the things to match my requirement. One of the change I did was to add Queue and Handlers per queue dynamically. Most of the changes are working great but I'm stuck here. ```go func (srv *Server) AddQueue(queueName string, priority int) error { srv.heartbeater.queues[queueName] = priority srv.forwarder.queues = append(srv.forwarder.queues, queueName) srv.processor.queueConfig[queueName] = priority srv.recoverer.queues = append(srv.recoverer.queues, queueName) return nil } func (srv *Server) RemoveQueue(queueName string) error { q := make(map[string]int) var qu []string for que, t := range srv.heartbeater.queues { if que != queueName { q[que] = t qu = append(qu, que) } } srv.heartbeater.queues = q srv.forwarder.queues = qu srv.processor.queueConfig = q srv.recoverer.queues = qu return nil } ``` I created 2 servers and assigned different queues per server. But both servers are assigned with all queues. ```go services.InitRedis() srv1 := asynq.NewServer(services.RC, asynq.Config{ Concurrency: 100, }) srv2 := asynq.NewServer(services.RC, asynq.Config{ Concurrency: 100, }) srv1.AddQueue("srv1-queue", 1) srv2.AddQueue("srv2-queue", 1) srv1.Handler.HandleFunc("srv1", func(ctx context.Context, task *asynq.Task) error { fmt.Println("srv1") return nil }) srv2.Handler.HandleFunc("srv2", func(ctx context.Context, task *asynq.Task) error { fmt.Println("srv2") return nil }) srv1.Start("127.0.0.1") srv2.Start("127.0.0.1") ``` Here, `srv1` and `srv2` are assigned with both queues `srv1-queue` and `srv2-queue` Is there anything I'm missing here? Any suggestions would be apprecible. I've some more questions on this tool. Is there any other way I could contact you or Github Issue would work? Basically I'm changing this tool to do following things dynamically 1) Add, Remove, Pause, Resume, Assign Servers to Queue 2) Assign Queue with Handlers to Server I can do following operations via REST on single asynq server: ```go app.Get("/queue/:queue/tasks", GetTasks) app.Get("/queue/:queue/start-bulk", StartBulkTasks) app.Get("/queue/:queue/pause", PauseQueue) app.Get("/queue/:queue/resume", ResumeQueue) app.Get("/queue/:queue/send", SingleTask) app.Get("/queue/:queue/servers", ServerListByQueue) app.Get("/queue/:queue/assign-server", AssignServer) app.Get("/queue/:queue/stop", StopQueue) app.Get("/queue/:queue/status", QueueStatus) app.Get("/queue/:queue/history/:days", QueueHistory) app.Get("/cron/:queue/add-task", AddTaskToCron) app.Get("/servers/start", StartServer) app.Get("/servers", ServerList) app.Get("/queues", QueueList) ``` but this does not work for any multiple server instances (Handle above per server)
kerem 2026-03-07 22:05:30 +03:00
Author
Owner

@sujit-baniya commented on GitHub (Dec 2, 2020):

https://www.loom.com/share/c0b9ad054a734eb780341fcb49af2be5

<!-- gh-comment-id:737322804 --> @sujit-baniya commented on GitHub (Dec 2, 2020): https://www.loom.com/share/c0b9ad054a734eb780341fcb49af2be5
Author
Owner

@sujit-baniya commented on GitHub (Dec 3, 2020):

Also, since we can have multiple servers, is it possible to include server cluster and load-balancer?
For e.g. I bring up 3 servers with same queues.

  1. I could choose which server to send
  2. Weighted strategy (Least job priority)
  3. Failover
<!-- gh-comment-id:737597255 --> @sujit-baniya commented on GitHub (Dec 3, 2020): Also, since we can have multiple servers, is it possible to include server cluster and load-balancer? For e.g. I bring up 3 servers with same queues. 1) I could choose which server to send 2) Weighted strategy (Least job priority) 3) Failover
Author
Owner

@hibiken commented on GitHub (Feb 1, 2021):

Sorry for a delayed response. Yes, it's possible to have multiple servers serving different queues.

<!-- gh-comment-id:770612409 --> @hibiken commented on GitHub (Feb 1, 2021): Sorry for a delayed response. Yes, it's possible to have multiple servers serving different queues.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1088
No description provided.