[GH-ISSUE #516] [FEATURE REQUEST] Per-task type aggregation #1254

Open
opened 2026-03-07 22:08:01 +03:00 by kerem · 2 comments
Owner

Originally created by @stephen on GitHub (Jul 13, 2022).
Original GitHub issue: https://github.com/hibiken/asynq/issues/516

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
I'm trying out the new task batching/aggregation feature from https://github.com/hibiken/asynq/issues/339. I would like to be able to configure grouping parameters on a task-level basis instead of on the server.

For instance, if I have one kind of batched task that sends out high-urgency notifications (i.e. group any in the last minute) and also a low-urgency notification (i.e. group any in the last day).

Describe the solution you'd like
I'd like to be able to specify wait/maxwait/maxsize/aggregation function per task, maybe like:

mux.HandleFunc(
  taskName,
  func(...),
  mux.Batching(wait, maxWait, aggregationFunc, etc)
)

Describe alternatives you've considered
I think I could also achieve this by creating two separate servers (maybe even two asynq servers in the same process? I have not tested this yet).

Additional context
n/a

Originally created by @stephen on GitHub (Jul 13, 2022). Original GitHub issue: https://github.com/hibiken/asynq/issues/516 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** I'm trying out the new task batching/aggregation feature from https://github.com/hibiken/asynq/issues/339. I would like to be able to configure grouping parameters on a task-level basis instead of on the server. For instance, if I have one kind of batched task that sends out high-urgency notifications (i.e. group any in the last minute) and also a low-urgency notification (i.e. group any in the last day). **Describe the solution you'd like** I'd like to be able to specify wait/maxwait/maxsize/aggregation function per task, maybe like: ```golang mux.HandleFunc( taskName, func(...), mux.Batching(wait, maxWait, aggregationFunc, etc) ) ``` **Describe alternatives you've considered** I think I could also achieve this by creating two separate servers (maybe even two asynq servers in the same process? I have not tested this yet). **Additional context** n/a
Author
Owner

@stephen commented on GitHub (Jul 13, 2022):

(this may end up just becoming a thread of notes on the aggregation feature - let me know if I should split this out into different issues)

Another feedback might be to support returning errors from GroupAggregatorFunc. The example doesn't return an error, but you could imagine a more complex aggregation function that needs to deserialize the individual tasks to build the aggregated one:

func Aggregator(group string, tasks []*asynq.Task) *asynq.Task {
	payloads := make([]*Payload, 0, len(tasks))
	for _, t := range tasks {
		n := &notifications.Payload{}
		if err := serialization.Unmarshal(t.Payload(), n); err != nil {
			// what do we do with this error?
		}

		payloads = append(payloads, n.Notification)
	}

	combined := &BatchedPayload{
		Batched: payloads,
	}

	m, err := serialization.Marshal(combined)
	if err != nil {
		// same here?
	}

	return asynq.NewTask(tasks[0].Type(), m)
}
<!-- gh-comment-id:1183628814 --> @stephen commented on GitHub (Jul 13, 2022): (this may end up just becoming a thread of notes on the aggregation feature - let me know if I should split this out into different issues) Another feedback might be to support returning errors from `GroupAggregatorFunc`. The example doesn't return an error, but you could imagine a more complex aggregation function that needs to deserialize the individual tasks to build the aggregated one: ```golang func Aggregator(group string, tasks []*asynq.Task) *asynq.Task { payloads := make([]*Payload, 0, len(tasks)) for _, t := range tasks { n := &notifications.Payload{} if err := serialization.Unmarshal(t.Payload(), n); err != nil { // what do we do with this error? } payloads = append(payloads, n.Notification) } combined := &BatchedPayload{ Batched: payloads, } m, err := serialization.Marshal(combined) if err != nil { // same here? } return asynq.NewTask(tasks[0].Type(), m) } ```
Author
Owner

@hibiken commented on GitHub (Jul 19, 2022):

@stephen Thanks for this great feedback! Let me think about API changes to accommodate these use cases

<!-- gh-comment-id:1188519322 --> @hibiken commented on GitHub (Jul 19, 2022): @stephen Thanks for this great feedback! Let me think about API changes to accommodate these use cases
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1254
No description provided.