[GH-ISSUE #493] [FEATURE REQUEST] Multiple queues with different workers #224

Closed
opened 2026-03-02 05:19:45 +03:00 by kerem · 3 comments
Owner

Originally created by @zerozh on GitHub (Jun 18, 2022).
Original GitHub issue: https://github.com/hibiken/asynq/issues/493

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
As the document described, the queue concept in asynq is actually priority. All tasks in different queues share the workers specified in Config.

Describe the solution you'd like
With my scenario, I want the queue to be namespace, or a group of tasks. For example, I have different types of tasks(order/sms/mail etc) and they have their own workers(order: 10, sms: 2, mail: 2). These workers only consume the "queue" they want to deal with. That said, we only have 2 workers to run the mail tasks in parallel in case high concurrent traffics.

Describe alternatives you've considered
I can start different executable programs with different configs.

  • A program only handle order queue with concurrency 10
  • A program only handle sms queue with concurrency 2
  • A program only handle mail queue with concurrency 2

I have to monitor several programs on my own with this alternatives. Instead, if asynq could provide the feature it would be great.

Additional context

Originally created by @zerozh on GitHub (Jun 18, 2022). Original GitHub issue: https://github.com/hibiken/asynq/issues/493 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** As the document described, the `queue` concept in asynq is actually `priority`. All tasks in different queues share the workers specified in Config. **Describe the solution you'd like** With my scenario, I want the queue to be namespace, or a group of tasks. For example, I have different types of tasks(`order`/`sms`/`mail` etc) and they have their own workers(`order: 10, sms: 2, mail: 2`). These workers only consume the "queue" they want to deal with. That said, we only have 2 workers to run the mail tasks in parallel in case high concurrent traffics. **Describe alternatives you've considered** I can start different executable programs with different configs. - A program only handle order queue with concurrency 10 - A program only handle sms queue with concurrency 2 - A program only handle mail queue with concurrency 2 I have to monitor several programs on my own with this alternatives. Instead, if asynq could provide the feature it would be great. **Additional context**
kerem 2026-03-02 05:19:45 +03:00
Author
Owner

@hibiken commented on GitHub (Jun 19, 2022):

@zerozh Thank you for creating this issue! I'll consider adding an option to limit the concurrency for each queue.

In the meantime, we could use the approach you suggested with a single executable:
You could create a asynq.Server object for each queue with the desired concurrency and start all servers in one main.
Note: API is a little awkward (maybe we should consider moving Handler to be a part of Config), but this should work.

Example:

package main

import (
	"log"
	"os"
	"os/signal"

	"github.com/hibiken/asynq"
	"golang.org/x/sys/unix"
)

func main() {
	redisConnOpt := asynq.RedisClientOpt{
		Addr: ":6379",
	}
	orderSrv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 10,
		Queues: map[string]int{
			"order": 1,
		},
	})
	smsSrv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 2,
		Queues: map[string]int{
			"sms": 1,
		},
	})
	mailSrv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 2,
		Queues: map[string]int{
			"mail": 1,
		},
	})

	orderHandler := asynq.NewServeMux()
	smsHandler := asynq.NewServeMux()
	mailHandler := asynq.NewServeMux()

	// ... [ommitted] handler regiseration

	if err := orderSrv.Start(orderHandler); err != nil {
		log.Fatal(err)
	}
	if err := smsSrv.Start(smsHandler); err != nil {
		log.Fatal(err)
	}
	if err := mailSrv.Start(mailHandler); err != nil {
		log.Fatal(err)
	}

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
	// Handle SIGTERM, SIGINT to exit the program.
	// Handle SIGTSTP to stop processing new tasks.
	for {
		s := <-sigs
		if s == unix.SIGTSTP {
			orderSrv.Stop() // stop processing new tasks
			smsSrv.Stop()
			mailSrv.Stop()
			continue
		}
		break // received SIGTERM or SIGINT signal
	}

	orderSrv.Shutdown()
	smsSrv.Shutdown()
	mailSrv.Shutdown()
}

Let me know if you have feedback or questions.

<!-- gh-comment-id:1159821677 --> @hibiken commented on GitHub (Jun 19, 2022): @zerozh Thank you for creating this issue! I'll consider adding an option to limit the concurrency for each queue. In the meantime, we could use the approach you suggested with a single executable: You could create a `asynq.Server` object for each queue with the desired concurrency and start all servers in one main. Note: API is a little awkward (maybe we should consider moving `Handler` to be a part of `Config`), but this should work. Example: ```go package main import ( "log" "os" "os/signal" "github.com/hibiken/asynq" "golang.org/x/sys/unix" ) func main() { redisConnOpt := asynq.RedisClientOpt{ Addr: ":6379", } orderSrv := asynq.NewServer(redisConnOpt, asynq.Config{ Concurrency: 10, Queues: map[string]int{ "order": 1, }, }) smsSrv := asynq.NewServer(redisConnOpt, asynq.Config{ Concurrency: 2, Queues: map[string]int{ "sms": 1, }, }) mailSrv := asynq.NewServer(redisConnOpt, asynq.Config{ Concurrency: 2, Queues: map[string]int{ "mail": 1, }, }) orderHandler := asynq.NewServeMux() smsHandler := asynq.NewServeMux() mailHandler := asynq.NewServeMux() // ... [ommitted] handler regiseration if err := orderSrv.Start(orderHandler); err != nil { log.Fatal(err) } if err := smsSrv.Start(smsHandler); err != nil { log.Fatal(err) } if err := mailSrv.Start(mailHandler); err != nil { log.Fatal(err) } sigs := make(chan os.Signal, 1) signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP) // Handle SIGTERM, SIGINT to exit the program. // Handle SIGTSTP to stop processing new tasks. for { s := <-sigs if s == unix.SIGTSTP { orderSrv.Stop() // stop processing new tasks smsSrv.Stop() mailSrv.Stop() continue } break // received SIGTERM or SIGINT signal } orderSrv.Shutdown() smsSrv.Shutdown() mailSrv.Shutdown() } ``` Let me know if you have feedback or questions.
Author
Owner

@zerozh commented on GitHub (Jun 21, 2022):

@hibiken Thanks for the code snippet! That's super helpful.

<!-- gh-comment-id:1160982048 --> @zerozh commented on GitHub (Jun 21, 2022): @hibiken Thanks for the code snippet! That's super helpful.
Author
Owner

@xylomo commented on GitHub (Jun 27, 2022):

@hibiken was this actually complete or is expected to use the multiple server approach?

<!-- gh-comment-id:1167391927 --> @xylomo commented on GitHub (Jun 27, 2022): @hibiken was this actually complete or is expected to use the multiple server approach?
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#224
No description provided.