[GH-ISSUE #213] [QUESTION] Is it possible to have multiple servers serving different queues? #2098

Closed
opened 2026-03-15 19:09:17 +03:00 by kerem · 9 comments
Owner

Originally created by @sujit-baniya on GitHub (Dec 2, 2020).
Original GitHub issue: https://github.com/hibiken/asynq/issues/213

Originally assigned to: @hibiken on GitHub.

It may sound stupid to ask this question :)
Is it possible to have multiple asynq Servers serving different queues?

My Use Case:
Based on different plans users are assigned to, I want them to assign queues and no. of workers per user as separate asynq server on the application.

Also this allows me to define different queues on multiple asynq servers

In current implementation, I can define concurrency on server level but can't limit workers per queue.

Originally created by @sujit-baniya on GitHub (Dec 2, 2020). Original GitHub issue: https://github.com/hibiken/asynq/issues/213 Originally assigned to: @hibiken on GitHub. It may sound stupid to ask this question :) Is it possible to have multiple asynq Servers serving different queues? My Use Case: Based on different plans users are assigned to, I want them to assign queues and no. of workers per user as separate asynq server on the application. Also this allows me to define different queues on multiple asynq servers In current implementation, I can define concurrency on server level but can't limit workers per queue.
kerem 2026-03-15 19:09:17 +03:00
  • closed this issue
  • added the
    question
    label
Author
Owner

@hibiken commented on GitHub (Dec 2, 2020):

Hi @sujit-baniya , thanks for the question! (This reminds me that I should create a FAQ section in the doc 😄 )

Yes, you can have multiple asynq servers serve different queues.
You can also run multiple asynq servers to handle tasks from the same queue to load balance as well.

Let me know if you still have more questions!

<!-- gh-comment-id:737299722 --> @hibiken commented on GitHub (Dec 2, 2020): Hi @sujit-baniya , thanks for the question! (This reminds me that I should create a FAQ section in the doc 😄 ) Yes, you can have multiple asynq servers serve different queues. You can also run multiple asynq servers to handle tasks from the same queue to load balance as well. Let me know if you still have more questions!
Author
Owner

@sujit-baniya commented on GitHub (Dec 2, 2020):

Thanks for quick reply.
Actually I've cloned your repo and changed most of the things to match my requirement.
One of the change I did was to add Queue and Handlers per queue dynamically.

Most of the changes are working great but I'm stuck here.

func (srv *Server) AddQueue(queueName string, priority int) error {
	srv.heartbeater.queues[queueName] = priority
	srv.forwarder.queues = append(srv.forwarder.queues, queueName)
	srv.processor.queueConfig[queueName] = priority
	srv.recoverer.queues = append(srv.recoverer.queues, queueName)
	return nil
}

func (srv *Server) RemoveQueue(queueName string) error {
	q := make(map[string]int)
	var qu []string
	for que, t := range srv.heartbeater.queues {
		if que != queueName {
			q[que] = t
			qu = append(qu, que)
		}
	}
	srv.heartbeater.queues = q
	srv.forwarder.queues = qu
	srv.processor.queueConfig = q
	srv.recoverer.queues = qu
	return nil
}

I created 2 servers and assigned different queues per server. But both servers are assigned with all queues.

services.InitRedis()
	srv1 := asynq.NewServer(services.RC, asynq.Config{
		Concurrency: 100,
	})
	srv2 := asynq.NewServer(services.RC, asynq.Config{
		Concurrency: 100,
	})

	srv1.AddQueue("srv1-queue", 1)
	srv2.AddQueue("srv2-queue", 1)
	srv1.Handler.HandleFunc("srv1", func(ctx context.Context, task *asynq.Task) error {
		fmt.Println("srv1")
		return nil
	})
	srv2.Handler.HandleFunc("srv2", func(ctx context.Context, task *asynq.Task) error {
		fmt.Println("srv2")
		return nil
	})
	srv1.Start("127.0.0.1")
	srv2.Start("127.0.0.1")

Here, srv1 and srv2 are assigned with both queues srv1-queue and srv2-queue

Is there anything I'm missing here? Any suggestions would be apprecible.

<!-- gh-comment-id:737315108 --> @sujit-baniya commented on GitHub (Dec 2, 2020): Thanks for quick reply. Actually I've cloned your repo and changed most of the things to match my requirement. One of the change I did was to add Queue and Handlers per queue dynamically. Most of the changes are working great but I'm stuck here. ```go func (srv *Server) AddQueue(queueName string, priority int) error { srv.heartbeater.queues[queueName] = priority srv.forwarder.queues = append(srv.forwarder.queues, queueName) srv.processor.queueConfig[queueName] = priority srv.recoverer.queues = append(srv.recoverer.queues, queueName) return nil } func (srv *Server) RemoveQueue(queueName string) error { q := make(map[string]int) var qu []string for que, t := range srv.heartbeater.queues { if que != queueName { q[que] = t qu = append(qu, que) } } srv.heartbeater.queues = q srv.forwarder.queues = qu srv.processor.queueConfig = q srv.recoverer.queues = qu return nil } ``` I created 2 servers and assigned different queues per server. But both servers are assigned with all queues. ```go services.InitRedis() srv1 := asynq.NewServer(services.RC, asynq.Config{ Concurrency: 100, }) srv2 := asynq.NewServer(services.RC, asynq.Config{ Concurrency: 100, }) srv1.AddQueue("srv1-queue", 1) srv2.AddQueue("srv2-queue", 1) srv1.Handler.HandleFunc("srv1", func(ctx context.Context, task *asynq.Task) error { fmt.Println("srv1") return nil }) srv2.Handler.HandleFunc("srv2", func(ctx context.Context, task *asynq.Task) error { fmt.Println("srv2") return nil }) srv1.Start("127.0.0.1") srv2.Start("127.0.0.1") ``` Here, srv1 and srv2 are assigned with both queues `srv1-queue` and `srv2-queue` Is there anything I'm missing here? Any suggestions would be apprecible.
Author
Owner

@jiangdi0924 commented on GitHub (Jul 5, 2021):

@sujit-baniya Hi sujit-baniya, did you fixed it ? I want to do the same plan with you, but i have not consider maturity.

<!-- gh-comment-id:873746402 --> @jiangdi0924 commented on GitHub (Jul 5, 2021): @sujit-baniya Hi sujit-baniya, did you fixed it ? I want to do the same plan with you, but i have not consider maturity.
Author
Owner

@sujit-baniya commented on GitHub (Jul 5, 2021):

@jiangdi0924 Yes I've changed the API and fixed there as per my need. And it's working fine so far :)

<!-- gh-comment-id:873748828 --> @sujit-baniya commented on GitHub (Jul 5, 2021): @jiangdi0924 Yes I've changed the API and fixed there as per my need. And it's working fine so far :)
Author
Owner

@jiangdi0924 commented on GitHub (Jul 5, 2021):

@sujit-baniya It's awesome. Can you provide some code show me how to define muti server and insert job to the each server ? Thank you very much

<!-- gh-comment-id:873752291 --> @jiangdi0924 commented on GitHub (Jul 5, 2021): @sujit-baniya It's awesome. Can you provide some code show me how to `define muti server ` and ` insert job to the each server` ? Thank you very much
Author
Owner

@sujit-baniya commented on GitHub (Jul 5, 2021):

@jiangdi0924 I've done something like this:


type handlerMap map[string]func(ctx context.Context, t *asynq.Task) error

var (
	Inspector  *asynq.Inspector
	RC         = asynq.RedisClientOpt{}
	Scheduler  *asynq.Scheduler
	HandlerMap handlerMap
	Client     *asynq.Client
	mu         sync.RWMutex
)

type ServerList struct {
	Server map[string]*asynq.Server
	mu     sync.RWMutex
}

var Servers = ServerList{
	Server: make(map[string]*asynq.Server),
}

func Init(redisUri string) {
	if Inspector != nil && Scheduler != nil {
		return
	}
	Inspector = NewInitiator(redisUri, 0)
}

func NewInitiator(addr string, db int, password ...string) *asynq.Inspector {
	pwd := ""
	if len(password) > 0 {
		pwd = password[0]
	}
	connOpt := asynq.RedisClientOpt{
		Addr:     addr,
		DB:       db,
		Password: pwd,
	}
	RC = connOpt
	Client = asynq.NewClient(RC)
	StartScheduler(RC)
	return asynq.NewInspector(connOpt)
}

func AddServer(concurrency int) (*asynq.Server, error) {
	srv := asynq.NewServer(RC, asynq.Config{
		Concurrency:    concurrency,
		StrictPriority: true,
	})
	srv.AddHandler(asynq.NewServeMux())
	if err := srv.Start(); err != nil {
		return nil, err
	}
	Servers.mu.Lock()
	defer Servers.mu.Unlock()
	Servers.Server[srv.ServerID] = srv
	return srv, nil
}

func RemoveServer(srvID string) error {
	if srv, ok := Servers.Server[srvID]; ok {
		srv.Stop()
		delete(Servers.Server, srvID)
		return nil
	} else {
		return errors.New("Server doesn't exists with provided ID")
	}
}

PLEASE NOTE THE API IS CHANGED AND YOU'VE TO ADJUST ACCORDINGLY

<!-- gh-comment-id:873756139 --> @sujit-baniya commented on GitHub (Jul 5, 2021): @jiangdi0924 I've done something like this: ```go type handlerMap map[string]func(ctx context.Context, t *asynq.Task) error var ( Inspector *asynq.Inspector RC = asynq.RedisClientOpt{} Scheduler *asynq.Scheduler HandlerMap handlerMap Client *asynq.Client mu sync.RWMutex ) type ServerList struct { Server map[string]*asynq.Server mu sync.RWMutex } var Servers = ServerList{ Server: make(map[string]*asynq.Server), } func Init(redisUri string) { if Inspector != nil && Scheduler != nil { return } Inspector = NewInitiator(redisUri, 0) } func NewInitiator(addr string, db int, password ...string) *asynq.Inspector { pwd := "" if len(password) > 0 { pwd = password[0] } connOpt := asynq.RedisClientOpt{ Addr: addr, DB: db, Password: pwd, } RC = connOpt Client = asynq.NewClient(RC) StartScheduler(RC) return asynq.NewInspector(connOpt) } func AddServer(concurrency int) (*asynq.Server, error) { srv := asynq.NewServer(RC, asynq.Config{ Concurrency: concurrency, StrictPriority: true, }) srv.AddHandler(asynq.NewServeMux()) if err := srv.Start(); err != nil { return nil, err } Servers.mu.Lock() defer Servers.mu.Unlock() Servers.Server[srv.ServerID] = srv return srv, nil } func RemoveServer(srvID string) error { if srv, ok := Servers.Server[srvID]; ok { srv.Stop() delete(Servers.Server, srvID) return nil } else { return errors.New("Server doesn't exists with provided ID") } } ``` PLEASE NOTE THE API IS CHANGED AND YOU'VE TO ADJUST ACCORDINGLY
Author
Owner

@sahilsk11 commented on GitHub (Jul 19, 2021):

Hi, wanted to re-ask the question here. I have a client service pushing jobs into a queue. I spin up one worker service and it's able to pull tasks off just fine, but when I run a second worker service, it seems to replace the first.

Example:

  • I have one worker service initialized with a mux that handles a "SEND_EMAIL" job. Jobs are being entered and executed just fine.
  • Then I create a second worker serviced initialized with a mux that handles a "REFRESH" job. As soon as I launch this service and a new send email job is enqueued, I see errors saying "no handler for SEND_EMAIL."

The expected behavior here is that these two worker services should run in parallel and dequeue jobs from the same queue, but it seems they are replacing each other when launching. I do not see any error messages in the original worker logs.

Hi @sujit-baniya , thanks for the question! (This reminds me that I should create a FAQ section in the doc 😄 )

Yes, you can have multiple asynq servers serve different queues.
You can also run multiple asynq servers to handle tasks from the same queue to load balance as well.

Let me know if you still have more questions!

@hibiken, based on this response, it seems I should be able to run both of these worker services together? I'm using the default configuration outlined in the Getting Started doc.

Appreciate the help :)

<!-- gh-comment-id:882188928 --> @sahilsk11 commented on GitHub (Jul 19, 2021): Hi, wanted to re-ask the question here. I have a client service pushing jobs into a queue. I spin up one worker service and it's able to pull tasks off just fine, but when I run a second worker service, it seems to replace the first. Example: - I have one worker service initialized with a mux that handles a "SEND_EMAIL" job. Jobs are being entered and executed just fine. - Then I create a second worker serviced initialized with a mux that handles a "REFRESH" job. As soon as I launch this service and a new send email job is enqueued, I see errors saying "no handler for SEND_EMAIL." The expected behavior here is that these two worker services should run in parallel and dequeue jobs from the same queue, but it seems they are replacing each other when launching. I do not see any error messages in the original worker logs. > Hi @sujit-baniya , thanks for the question! (This reminds me that I should create a FAQ section in the doc 😄 ) > > Yes, you can have multiple asynq servers serve different queues. > You can also run multiple asynq servers to handle tasks from the same queue to load balance as well. > > Let me know if you still have more questions! @hibiken, based on this response, it seems I should be able to run both of these worker services together? I'm using the default configuration outlined in the Getting Started doc. Appreciate the help :)
Author
Owner

@hibiken commented on GitHub (Jul 19, 2021):

@sahilsk11 Thanks for the question! I think this is a common misunderstanding (indicates that we need better documentation).

Based on what you shared, I'm assuming both "SEND_EMAIL" and "REFRESH" tasks are enqueued to the same queue (probably the "default" queue unless you specified using the Queue option).

Your workers will dequeue these tasks from queues based on the Config you passed to initialize the server.

You have two worker services: worker1 and worker2.
I'm assuming the following:

  • worker1 only knows how to process "SEND_EMAIL" tasks
  • worker2 only knows how to process "REFRESH" tasks

but both worker1 and worker2 are configured to consume tasks from "default" queue. So inevitably when worker1 dequeues a "REFRESH" task, it doesn't know how to handle the task and fail. Same thing with worker2 and "SEND_EMAIL" tasks.

If you want to dedicate each worker to handle specific task type, I recommend enqueuing to a specific queue for each task type.

Example:

// Enqueue "SEND_EMAIL" task to "email" queue
info, err := client.Enqueue(sendEmailTask, asynq.Queue("email"))

// Enqueue "REFRESH" task to "refresh" queue
info, err := client.Enqueue(refreshTask, asynq.Queue("refresh"))

And configure each worker to consume from each queue:

// worker1 consumes tasks from "email" queue
srv1 := asynq.NewServer(redisConnOpt, asynq.Config{
    Queues: map[string]int{"email": 1},
})

// worker2 consumes tasks from "refresh" queue.
srv2 := asynq.NewServer(redisConnOpt, asynq.Config{
    Queues: map[string]int{"refresh": 1},
})
<!-- gh-comment-id:882196994 --> @hibiken commented on GitHub (Jul 19, 2021): @sahilsk11 Thanks for the question! I think this is a common misunderstanding (indicates that we need better documentation). Based on what you shared, I'm assuming both "SEND_EMAIL" and "REFRESH" tasks are enqueued to the same queue (probably the "default" queue unless you specified using the `Queue` option). Your workers will dequeue these tasks from queues based on the `Config` you passed to initialize the server. You have two worker services: worker1 and worker2. I'm assuming the following: - worker1 only knows how to process "SEND_EMAIL" tasks - worker2 only knows how to process "REFRESH" tasks but both worker1 and worker2 are configured to consume tasks from "default" queue. So inevitably when worker1 dequeues a "REFRESH" task, it doesn't know how to handle the task and fail. Same thing with worker2 and "SEND_EMAIL" tasks. If you want to dedicate each worker to handle specific task type, I recommend enqueuing to a specific queue for each task type. Example: ```go // Enqueue "SEND_EMAIL" task to "email" queue info, err := client.Enqueue(sendEmailTask, asynq.Queue("email")) // Enqueue "REFRESH" task to "refresh" queue info, err := client.Enqueue(refreshTask, asynq.Queue("refresh")) ``` And configure each worker to consume from each queue: ```go // worker1 consumes tasks from "email" queue srv1 := asynq.NewServer(redisConnOpt, asynq.Config{ Queues: map[string]int{"email": 1}, }) // worker2 consumes tasks from "refresh" queue. srv2 := asynq.NewServer(redisConnOpt, asynq.Config{ Queues: map[string]int{"refresh": 1}, }) ```
Author
Owner

@sahilsk11 commented on GitHub (Jul 19, 2021):

Wow, @hibiken thanks for the quick turnaround and explanation! I spun it up in a test repository and was able to enqueue and run both jobs in parallel just fine.

Link to that if anyone else is interested: https://github.com/sahilsk11/asynq

Thanks again for the incredible tool!

<!-- gh-comment-id:882205434 --> @sahilsk11 commented on GitHub (Jul 19, 2021): Wow, @hibiken thanks for the quick turnaround and explanation! I spun it up in a test repository and was able to enqueue and run both jobs in parallel just fine. Link to that if anyone else is interested: https://github.com/sahilsk11/asynq Thanks again for the incredible tool!
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2098
No description provided.