[GH-ISSUE #663] [FEATURE REQUEST] Queue Adapters (eg: SQS, GCP Pub/Sub etc) #1347

Open
opened 2026-03-07 22:08:49 +03:00 by kerem · 8 comments
Owner

Originally created by @yoshdog on GitHub (May 26, 2023).
Original GitHub issue: https://github.com/hibiken/asynq/issues/663

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
Would be great if we could configure different queue backends like SQS, GCP Pub/Sub etc...

Describe the solution you'd like
Ability to use a different queue backend other then redis.

Additional context
Alternative packages that support this: https://github.com/RichardKnop/machinery

Originally created by @yoshdog on GitHub (May 26, 2023). Original GitHub issue: https://github.com/hibiken/asynq/issues/663 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** Would be great if we could configure different queue backends like SQS, GCP Pub/Sub etc... **Describe the solution you'd like** Ability to use a different queue backend other then redis. **Additional context** Alternative packages that support this: https://github.com/RichardKnop/machinery
Author
Owner

@anupriya17 commented on GitHub (Jun 19, 2023):

A Pubsub Interface like this will help, I just copied the redis pubsub methods:

type Message struct {
	Channel      string
	Payload      string
}

type PubSub interface {
	Subscribe(ctx context.Context, channels ...string) error
	PSubscribe(ctx context.Context, patterns ...string) error
	SSubscribe(ctx context.Context, channels ...string) error
	Unsubscribe(ctx context.Context, channels ...string) error
	PUnsubscribe(ctx context.Context, patterns ...string) error
	SUnsubscribe(ctx context.Context, channels ...string) error
	Ping(ctx context.Context, payload ...string) error
	ReceiveTimeout(ctx context.Context, timeout time.Duration) (interface{}, error)
	Receive(ctx context.Context) (interface{}, error)
	ReceiveMessage(ctx context.Context) (*Message, error)
	Channel(opts ...ChannelOption) <-chan *Message
	Close() error
}

type ChannelOption func(*channelOptions)

type channelOptions struct {
	size        int
	maxInterval int64
}

func WithChannelSize(size int) ChannelOption {
	return func(opts *channelOptions) {
		opts.size = size
	}
}

func WithMaxInterval(maxInterval int64) ChannelOption {
	return func(opts *channelOptions) {
		opts.maxInterval = maxInterval
	}
}
<!-- gh-comment-id:1596849578 --> @anupriya17 commented on GitHub (Jun 19, 2023): A Pubsub Interface like this will help, I just copied the redis pubsub methods: ``` type Message struct { Channel string Payload string } type PubSub interface { Subscribe(ctx context.Context, channels ...string) error PSubscribe(ctx context.Context, patterns ...string) error SSubscribe(ctx context.Context, channels ...string) error Unsubscribe(ctx context.Context, channels ...string) error PUnsubscribe(ctx context.Context, patterns ...string) error SUnsubscribe(ctx context.Context, channels ...string) error Ping(ctx context.Context, payload ...string) error ReceiveTimeout(ctx context.Context, timeout time.Duration) (interface{}, error) Receive(ctx context.Context) (interface{}, error) ReceiveMessage(ctx context.Context) (*Message, error) Channel(opts ...ChannelOption) <-chan *Message Close() error } type ChannelOption func(*channelOptions) type channelOptions struct { size int maxInterval int64 } func WithChannelSize(size int) ChannelOption { return func(opts *channelOptions) { opts.size = size } } func WithMaxInterval(maxInterval int64) ChannelOption { return func(opts *channelOptions) { opts.maxInterval = maxInterval } } ```
Author
Owner

@yoshdog commented on GitHub (Jul 11, 2023):

ok seems like there is already a broker interface
https://github.com/hibiken/asynq/blob/master/internal/base/base.go#L715-L755

So we would just need to implement one for every type... eg: SQS, RabbitMQ, PUB SUB etc...

<!-- gh-comment-id:1630496007 --> @yoshdog commented on GitHub (Jul 11, 2023): ok seems like there is already a broker interface https://github.com/hibiken/asynq/blob/master/internal/base/base.go#L715-L755 So we would just need to implement one for every type... eg: SQS, RabbitMQ, PUB SUB etc...
Author
Owner

@anupriya17 commented on GitHub (Jul 11, 2023):

Thanks. broker interface depends on redis pubsub currently.
https://github.com/hibiken/asynq/blob/master/internal/base/base.go#L751

IMO to decouple pub/sub to its own interface. We don't need all the methods

<!-- gh-comment-id:1630510413 --> @anupriya17 commented on GitHub (Jul 11, 2023): Thanks. broker interface depends on redis pubsub currently. https://github.com/hibiken/asynq/blob/master/internal/base/base.go#L751 IMO to decouple pub/sub to its own interface. We don't need all the methods
Author
Owner

@anhoder commented on GitHub (Jul 18, 2023):

Looking forward to this feature👀

<!-- gh-comment-id:1639472593 --> @anhoder commented on GitHub (Jul 18, 2023): Looking forward to this feature👀
Author
Owner

@bowenli86 commented on GitHub (Oct 12, 2023):

+1

<!-- gh-comment-id:1758870556 --> @bowenli86 commented on GitHub (Oct 12, 2023): +1
Author
Owner

@zandwang commented on GitHub (Feb 29, 2024):

It looks like there are already some relevant PRS?
https://github.com/hibiken/asynq/pull/577

<!-- gh-comment-id:1970609767 --> @zandwang commented on GitHub (Feb 29, 2024): It looks like there are already some relevant PRS? https://github.com/hibiken/asynq/pull/577
Author
Owner

@djragsdale commented on GitHub (Apr 28, 2025):

Based on #577 and #667, it looks like PRs for other brokers are being declined. What doesn't get asked is why one would choose asynq over "other libraries out there that allow you to bring your own broker for lesser features". I'll summarize the reasons I was hoping to use asynq, even though I would prefer to use SQS as the broker:

  • Some alternatives, such as https://github.com/RichardKnop/machinery, are not actively maintained. It had releases in March 2024, but before that had not been updated since June 2021. And per the author in https://github.com/RichardKnop/machinery/issues/790#issuecomment-1620468456: "It is not actively maintained anymore."
  • Many alternatives do not offer retries. This is an incredible feature, and IMO a must-have.
  • Many alternatives do not bring a Web UI. Of course it is possible to develop a Web UI that can read and write to the message broker, but this could be 10x the code written for just the workers.
  • Many alternatives do not support queue priorities. For example, I have jobs that are actively created by users, jobs which are scheduled to run overnight, and jobs that involve data cleanup/synchronization. Those are perfect fits for Critical, Default, and Low priority queues.
  • Many alternatives do not support timeouts. This feels simple, but a surprising amount of options lack this necessary feature. This is really important to enforce one job doesn't lock up an entire queue, or continue to hold onto memory after an OOM or overflow error.

I honestly haven't seen any option except Asynq that checks all those boxes.

The natural next question is "How does a maintainer with limited availability support other brokers?" Simple: they don't. Decouple. If there is an Asynq feature that Redis offers which can't be achieved with SQS, GCP Pub/Sub, or other brokers, I haven't seen the technical limitation. If there is one, please communicate it. Otherwise, I would happily contribute to the project similar to the aforementioned PRs so that a bring-your-own broker is achievable.

<!-- gh-comment-id:2836387502 --> @djragsdale commented on GitHub (Apr 28, 2025): Based on #577 and #667, it looks like PRs for other brokers are being declined. What doesn't get asked is why one would choose asynq over "other libraries out there that allow you to bring your own broker for lesser features". I'll summarize the reasons I was hoping to use asynq, even though I would prefer to use SQS as the broker: * Some alternatives, such as https://github.com/RichardKnop/machinery, are not actively maintained. It had releases in March 2024, but before that had not been updated since June 2021. And per the author in https://github.com/RichardKnop/machinery/issues/790#issuecomment-1620468456: "It is not actively maintained anymore." * Many alternatives do not offer retries. This is an incredible feature, and IMO a must-have. * Many alternatives do not bring a Web UI. Of course it is possible to develop a Web UI that can read and write to the message broker, but this could be 10x the code written for just the workers. * Many alternatives do not support queue priorities. For example, I have jobs that are actively created by users, jobs which are scheduled to run overnight, and jobs that involve data cleanup/synchronization. Those are perfect fits for Critical, Default, and Low priority queues. * Many alternatives do not support timeouts. This feels simple, but a surprising amount of options lack this necessary feature. This is really important to enforce one job doesn't lock up an entire queue, or continue to hold onto memory after an OOM or overflow error. I honestly haven't seen any option except Asynq that checks all those boxes. The natural next question is "How does a maintainer with limited availability support other brokers?" Simple: they don't. Decouple. If there is an Asynq feature that Redis offers which can't be achieved with SQS, GCP Pub/Sub, or other brokers, I haven't seen the technical limitation. If there is one, please communicate it. Otherwise, I would happily contribute to the project similar to the aforementioned PRs so that a bring-your-own broker is achievable.
Author
Owner

@arjendevos commented on GitHub (Jul 17, 2025):

+1

<!-- gh-comment-id:3083113238 --> @arjendevos commented on GitHub (Jul 17, 2025): +1
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1347
No description provided.