[GH-ISSUE #622] [FEATURE REQUEST] fetch metadata of task from within the message #303

Open
opened 2026-03-02 05:20:19 +03:00 by kerem · 1 comment
Owner

Originally created by @nikhilsaraf on GitHub (Feb 27, 2023).
Original GitHub issue: https://github.com/hibiken/asynq/issues/622

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
From within my messageHandler, I am trying to enqueue a chained message with the same parameters as the originating message (retention policy, max retry).

Today, this is not easy to do because I cannot access the taskInfo from within the message directly.
I can fetch the taskInfo by passing in (queueName, taskID) to the inspector since I have the taskID from the asynq.Task but not the queueName.

Describe the solution you'd like

  • I would like asynq.Task to include the queueName. Then I can directly get the metadata using the inspector.
  • If you are willing to put the taskInfo directly on the asynq.Task then that would be even better and I can access that directly without having to fetch it from the inspector.

Describe alternatives you've considered
To solve this today, I'm currently getting a list of all the queues in the system and cycling through them to find the associated task for the given taskID. Using taskInfo I can get the required metadata (retention policy, max retry).

However, this can be problematic because:
- if two queues have different messages with the same taskID then it could end up selecting the incorrect message
- this approach is just an inefficient way to fetch the metadata for a given message

Additional context
This is the function I wrote to fetch the metadata:

func fetchTaskInfo(taskID string) (*asynq.TaskInfo, error) {
	asynqInspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: GlobalRedisAddrString})
	defer asynqInspector.Close()

	queues, e := asynqInspector.Queues()
	if e != nil {
		return nil, fmt.Errorf("failed to get queues from asynqInspector: %v", e)
	}

	taskInfoMap := map[string]*asynq.TaskInfo{}
	for _, queue := range queues {
		taskInfo, e := asynqInspector.GetTaskInfo(queue, taskID)
		if e != nil {
			if strings.Contains(e.Error(), asynq.ErrTaskNotFound.Error()) {
				// task not found in this queue, try the next one
				continue
			}
			return nil, fmt.Errorf("failed to get task info and queue for task with taskID=%s from queue=%s even though task was found: %v", taskID, queue, e)
		}
		// don't break here because we want to check all queues to make sure the task is not in multiple queues
		taskInfoMap[queue] = taskInfo
	}

	if len(taskInfoMap) == 0 {
		return nil, fmt.Errorf("no task info found for task with taskID=%s", taskID)
	}
	if len(taskInfoMap) > 1 {
		return nil, fmt.Errorf("found task with taskID=%s in multiple queues: %s", taskID, GlobalMapKeysFn(taskInfoMap))
	}

	for _, taskInfo := range taskInfoMap {
		// we know there is only one taskInfo in the map
		return taskInfo, nil
	}
	return nil, fmt.Errorf("reached end of fetchTask function and failed to get task info and queue for task with taskID=%s", taskID)
}
Originally created by @nikhilsaraf on GitHub (Feb 27, 2023). Original GitHub issue: https://github.com/hibiken/asynq/issues/622 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** From within my messageHandler, I am trying to enqueue a chained message with the same parameters as the originating message (retention policy, max retry). Today, this is not easy to do because I cannot access the `taskInfo` from within the message directly. I can fetch the `taskInfo` by passing in `(queueName, taskID)` to the inspector since I have the taskID from the asynq.Task but not the `queueName`. **Describe the solution you'd like** - I would like `asynq.Task` to include the `queueName`. Then I can directly get the metadata using the inspector. - If you are willing to put the taskInfo directly on the `asynq.Task` then that would be even better and I can access that directly without having to fetch it from the inspector. **Describe alternatives you've considered** To solve this today, I'm currently getting a list of all the queues in the system and cycling through them to find the associated task for the given taskID. Using taskInfo I can get the required metadata (retention policy, max retry). However, this can be problematic because: - if two queues have different messages with the same taskID then it could end up selecting the incorrect message - this approach is just an inefficient way to fetch the metadata for a given message **Additional context** This is the function I wrote to fetch the metadata: ```golang func fetchTaskInfo(taskID string) (*asynq.TaskInfo, error) { asynqInspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: GlobalRedisAddrString}) defer asynqInspector.Close() queues, e := asynqInspector.Queues() if e != nil { return nil, fmt.Errorf("failed to get queues from asynqInspector: %v", e) } taskInfoMap := map[string]*asynq.TaskInfo{} for _, queue := range queues { taskInfo, e := asynqInspector.GetTaskInfo(queue, taskID) if e != nil { if strings.Contains(e.Error(), asynq.ErrTaskNotFound.Error()) { // task not found in this queue, try the next one continue } return nil, fmt.Errorf("failed to get task info and queue for task with taskID=%s from queue=%s even though task was found: %v", taskID, queue, e) } // don't break here because we want to check all queues to make sure the task is not in multiple queues taskInfoMap[queue] = taskInfo } if len(taskInfoMap) == 0 { return nil, fmt.Errorf("no task info found for task with taskID=%s", taskID) } if len(taskInfoMap) > 1 { return nil, fmt.Errorf("found task with taskID=%s in multiple queues: %s", taskID, GlobalMapKeysFn(taskInfoMap)) } for _, taskInfo := range taskInfoMap { // we know there is only one taskInfo in the map return taskInfo, nil } return nil, fmt.Errorf("reached end of fetchTask function and failed to get task info and queue for task with taskID=%s", taskID) } ```
Author
Owner

@rclark commented on GitHub (Feb 26, 2025):

I would go a step further than just exposing the queue name, and just ask that the TaskInfo data ought to be made available via some method of the Task that each iteration of the handler receives. Just like we can ask for task.Payload() to find the job's inputs, we should be able to ask for something like task.Info() to learn about its state, number of failures, and so on.

<!-- gh-comment-id:2686134867 --> @rclark commented on GitHub (Feb 26, 2025): I would go a step further than just exposing the queue name, and just ask that the `TaskInfo` data ought to be made available via some method of the `Task` that each iteration of the handler receives. Just like we can ask for `task.Payload()` to find the job's inputs, we should be able to ask for something like `task.Info()` to learn about its state, number of failures, and so on.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#303
No description provided.