[GH-ISSUE #265] [FEATURE REQUEST] Processed job's payload and result #1116

Closed
opened 2026-03-07 22:05:56 +03:00 by kerem · 15 comments
Owner

Originally created by @unicod3 on GitHub (Apr 24, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/265

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
Asynq currently removes the successfully processed jobs from Redis.
That means the information about the processed jobs also vanishes like;

  • When did the client pushed the job into the queue
  • When is the job completed.
  • Is a given job id processed successfully.

Describe the solution you'd like
It would be nice to keep processed jobs until a given TTL and return a basic struct that contains something like id, queued_at, completed_at.
I think to achieve that, the first thing is to generate a unique id for each task and keep them like asynq:{default}:processed:{job_id}

Describe alternatives you've considered
Python RQ or Laravel Horizon might be good examples to take a look at.

Originally created by @unicod3 on GitHub (Apr 24, 2021). Original GitHub issue: https://github.com/hibiken/asynq/issues/265 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** Asynq currently removes the successfully processed jobs from Redis. That means the information about the processed jobs also vanishes like; - When did the client pushed the job into the queue - When is the job completed. - Is a given job id processed successfully. **Describe the solution you'd like** It would be nice to keep processed jobs until a given TTL and return a basic struct that contains something like ` id, queued_at, completed_at `. I think to achieve that, the first thing is to generate a unique id for each task and keep them like `asynq:{default}:processed:{job_id}` **Describe alternatives you've considered** [Python RQ](https://python-rq.org/) or [Laravel Horizon](https://laravel.com/docs/8.x/horizon) might be good examples to take a look at.
kerem 2026-03-07 22:05:56 +03:00
Author
Owner

@hibiken commented on GitHub (Apr 24, 2021):

@unicod3 Thank you for opening this issue!

Yes, I have been thinking about a feature like the one you described. I really like assigning TTL duration for completed task to keep them around. We can expose that option as a new Option type.

Example:

client.Enqueue(task, asynq.ResultTTL(30 * time.Minute))

In addition to those timestamps you mentioned, it'd be nice (in some scenarios) to allow users to store "result" associated with the task. (We need to be careful what we store in redis, however, since it may use up too much memory).
Currently I'm thinking about exposing a type called ResultWriter through context to write task's result.

Example:

// In Handler code
func myHandler(ctx context.Context, task *asynq.Task) error {
    res, err := DoSomeWork(task)
    if err != nil {
        return err
    }
    if _, err := asynq.GetResultWriter(ctx).Write(res); err != nil {
        return err
    }
}

I'm in the process of re-designing the redis key so that each task gets its own key in redis and values are stored as HASH so it'd be quite easy to implement something like this.

I'll update this thread when I have a concrete plan to go forward with this feature 👍

<!-- gh-comment-id:826039684 --> @hibiken commented on GitHub (Apr 24, 2021): @unicod3 Thank you for opening this issue! Yes, I have been thinking about a feature like the one you described. I really like assigning TTL duration for completed task to keep them around. We can expose that option as a new `Option` type. Example: ```go client.Enqueue(task, asynq.ResultTTL(30 * time.Minute)) ``` In addition to those timestamps you mentioned, it'd be nice (in some scenarios) to allow users to store "result" associated with the task. (We need to be careful what we store in redis, however, since it may use up too much memory). Currently I'm thinking about exposing a type called `ResultWriter` through context to write task's result. Example: ```go // In Handler code func myHandler(ctx context.Context, task *asynq.Task) error { res, err := DoSomeWork(task) if err != nil { return err } if _, err := asynq.GetResultWriter(ctx).Write(res); err != nil { return err } } ``` I'm in the process of re-designing the redis key so that each task gets its own key in redis and values are stored as HASH so it'd be quite easy to implement something like this. I'll update this thread when I have a concrete plan to go forward with this feature 👍
Author
Owner

@unicod3 commented on GitHub (Apr 24, 2021):

That sounds nice, if you could create something like a milestone or tasks, I also would love to participate in implementing.

<!-- gh-comment-id:826160005 --> @unicod3 commented on GitHub (Apr 24, 2021): That sounds nice, if you could create something like a milestone or tasks, I also would love to participate in implementing.
Author
Owner

@hibiken commented on GitHub (Sep 9, 2021):

I'm revisiting this thread together with #322.

My current (tentative) plan is to make the following changes:

  • Add ResultTTL(time.Duration) Option helper to specify TTL after success processing
  • (internal) Add asynq:{qname}:completed ZSET score being the timestamp of TTL expiry
  • Add ResultWriter type that writes result for the given task and GetResultWriter(ctx) ResultWriter helper to get the writer
  • (internal) add "result" field under task key asynq:{qname}:t:{task_id}

Also I'm considering changing the NewTask function to take Options as variadic arguments
Example:


// update function signature to take opts
func NewTask(typename string, payload []byte, opts ...Option) Task

// example
task := asynq.NewTask("my_task", []byte("my_payload"), asynq.ResultTTL(30*time.Minute), asynq.Queue("my_queue"))
<!-- gh-comment-id:916084435 --> @hibiken commented on GitHub (Sep 9, 2021): I'm revisiting this thread together with #322. My current (tentative) plan is to make the following changes: - Add `ResultTTL(time.Duration) Option` helper to specify TTL after success processing - (internal) Add `asynq:{qname}:completed` ZSET score being the timestamp of TTL expiry - Add `ResultWriter` type that writes result for the given task and `GetResultWriter(ctx) ResultWriter` helper to get the writer - (internal) add `"result"` field under task key `asynq:{qname}:t:{task_id}` Also I'm considering changing the `NewTask` function to take Options as variadic arguments Example: ```go // update function signature to take opts func NewTask(typename string, payload []byte, opts ...Option) Task // example task := asynq.NewTask("my_task", []byte("my_payload"), asynq.ResultTTL(30*time.Minute), asynq.Queue("my_queue")) ```
Author
Owner

@hibiken commented on GitHub (Nov 6, 2021):

This feature is now available since version 0.19

<!-- gh-comment-id:962518251 --> @hibiken commented on GitHub (Nov 6, 2021): This feature is now available since version 0.19
Author
Owner

@tpoxa commented on GitHub (Nov 20, 2021):

Thanks, @hibiken Would be great to have an example of the usage.

<!-- gh-comment-id:974675580 --> @tpoxa commented on GitHub (Nov 20, 2021): Thanks, @hibiken Would be great to have an example of the usage.
Author
Owner

@tpoxa commented on GitHub (Nov 20, 2021):

I keep receiving an empty result within my taskInfo

<!-- gh-comment-id:974680071 --> @tpoxa commented on GitHub (Nov 20, 2021): I keep receiving an empty result within my taskInfo
Author
Owner

@hibiken commented on GitHub (Nov 21, 2021):

@tpoxa Thank you for the comment! I'll make sure to add some examples in the godoc and wiki #354

But in the meantime, this is a simple example.

 // make sure to set retention option, otherwise the task will be deleted on completion.
info, err := client.Enqueue(task, asynq.Retention(2 * time.Hour))

// In handler code
func HandleMyTask(ctx context.Context, task *asynq.Task) error {
    res, err := DoStuff(ctx, task)
    if err != nil {
        return err
    }
    if _, err := task.ResultWriter().Write(res); err != nil {
        log.Printf("Failed to write task result: %v", err)
        return err
    }
    return nil
}

Then you should be able to see the result payload in the Web UI or read that result bytes programmatically by calling Inspector.GetTaskInfo. Let me know if you have further questions. Also make sure that you are using asynq v0.19 👍

<!-- gh-comment-id:974754842 --> @hibiken commented on GitHub (Nov 21, 2021): @tpoxa Thank you for the comment! I'll make sure to add some examples in the godoc and wiki #354 But in the meantime, this is a simple example. ```go // make sure to set retention option, otherwise the task will be deleted on completion. info, err := client.Enqueue(task, asynq.Retention(2 * time.Hour)) // In handler code func HandleMyTask(ctx context.Context, task *asynq.Task) error { res, err := DoStuff(ctx, task) if err != nil { return err } if _, err := task.ResultWriter().Write(res); err != nil { log.Printf("Failed to write task result: %v", err) return err } return nil } ``` Then you should be able to see the result payload in the Web UI or read that result bytes programmatically by calling `Inspector.GetTaskInfo`. Let me know if you have further questions. Also make sure that you are using asynq v0.19 👍
Author
Owner

@tpoxa commented on GitHub (Nov 21, 2021):

Sorry, does not work. github.com/hibiken/asynq@9f2c321e98/client.go (L379) is that ok that Result returned is always nil?

<!-- gh-comment-id:974809637 --> @tpoxa commented on GitHub (Nov 21, 2021): Sorry, does not work. https://github.com/hibiken/asynq/blob/9f2c321e9804901f06b58e82f9e0ce4125d80b74/client.go#L379 is that ok that Result returned is always nil?
Author
Owner

@hibiken commented on GitHub (Nov 21, 2021):

I think there's some misunderstanding here. TaskInfo returned from Client.Enqueue will always have nil result because you just enqueued the task and haven't handled them in your Handler.

Result will be available after you write result in your handler. Just make sure you set the Retention option so that the tasks don't get deleted once they are processed successfully.

<!-- gh-comment-id:974824600 --> @hibiken commented on GitHub (Nov 21, 2021): I think there's some misunderstanding here. `TaskInfo` returned from `Client.Enqueue` will always have nil result because you just enqueued the task and haven't handled them in your Handler. Result will be available after you write result in your handler. Just make sure you set the `Retention` option so that the tasks don't get deleted once they are processed successfully.
Author
Owner

@tpoxa commented on GitHub (Nov 21, 2021):

Yes, it might be a misunderstanding.
Yes, I did set up retention time, so how I can get the actual result?
I thought I enqueue tasks synchronously so I must have a result in task info.

How does the enqueuer know that a task is accomplished?

Thank you!
Sorry for the silly questions.

<!-- gh-comment-id:974841446 --> @tpoxa commented on GitHub (Nov 21, 2021): Yes, it might be a misunderstanding. Yes, I did set up retention time, so how I can get the actual result? I thought I enqueue tasks synchronously so I must have a result in task info. How does the enqueuer know that a task is accomplished? Thank you! Sorry for the silly questions.
Author
Owner

@hibiken commented on GitHub (Nov 21, 2021):

I thought I enqueue tasks synchronously so I must have a result in task info.

Enqueue only puts the given task on a queue to be handled asynchronously by workers (Are you running the worker server process with your Handler?), only after the Handler processes the task the result will be available (Assuming that you write result using the ResultWriter, see example above).

How does the enqueuer know that a task is accomplished?

Unfortunately there's no out-of-the-box solution to listen on the event when the task is completed. You just need to query the task using Inspector.GetTaskInfo if you want to do this programatically.

If you don't mind, could you describe your use case? The main motivation behind implementing this feature was to prepare for #244 feature which we'll implement in the future.

<!-- gh-comment-id:974851801 --> @hibiken commented on GitHub (Nov 21, 2021): > I thought I enqueue tasks synchronously so I must have a result in task info. `Enqueue` only puts the given task on a queue to be handled asynchronously by workers (Are you running the worker server process with your `Handler`?), only after the Handler processes the task the result will be available (Assuming that you write result using the `ResultWriter`, see example above). > How does the enqueuer know that a task is accomplished? Unfortunately there's no out-of-the-box solution to listen on the event when the task is completed. You just need to query the task using `Inspector.GetTaskInfo` if you want to do this programatically. If you don't mind, could you describe your use case? The main motivation behind implementing this feature was to prepare for #244 feature which we'll implement in the future.
Author
Owner

@tpoxa commented on GitHub (Nov 21, 2021):

Ok, for example. Webform sends a task, get a website's screenshot. The user submits the URL and waits for a few seconds (up to 30) when it's done. As the result, the website shows a screenshot URL.

One of the workers gets a payload (website URL) makes a screenshot, publishes it to the bucket, and returns URL of the uploaded image.

<!-- gh-comment-id:974854843 --> @tpoxa commented on GitHub (Nov 21, 2021): Ok, for example. Webform sends a task, get a website's screenshot. The user submits the URL and waits for a few seconds (up to 30) when it's done. As the result, the website shows a screenshot URL. One of the workers gets a payload (website URL) makes a screenshot, publishes it to the bucket, and returns URL of the uploaded image.
Author
Owner

@tpoxa commented on GitHub (Nov 21, 2021):

So ideal (IMHO) it would be to have something like this:

taskInfo->wait(ctx)
fmt.Println(taskInfo.Result)
<!-- gh-comment-id:974855059 --> @tpoxa commented on GitHub (Nov 21, 2021): So ideal (IMHO) it would be to have something like this: ```go taskInfo->wait(ctx) fmt.Println(taskInfo.Result) ```
Author
Owner

@crossworth commented on GitHub (Nov 21, 2021):

So ideal (IMHO) it would be to have something like this:

taskInfo->wait(ctx)
fmt.Println(taskInfo.Result)

Hello, I don't think this would work. I mean the wait would have to use Inspector.GetTaskInfo in a loop with a timer to know when the task is finished, but if this is the case, I think using a goroutines and channels would be better, or if you really need to wait maybe doing it synchronously without a goroutine should work as well.

Maybe you should generate a task with an ID and create a route that uses Inspector.GetTaskInfo and check it periodically on the frontend?

Since you are taking websites screenshot's you could use the URL as key.

Route that enqueues the job

  • Enqueues a job with task ID screenshot:https//google.com.br (you could add timeout and unique options as well, this way if two requests are made for the same URL would generate the same key and you would only need to process they one time)
  • Returns the task id: screenshot:https//google.com.br

Route that check the job result

  • Accepts an task ID and uses Inspector.GetTaskInfo to check the task result
  • Return the task result

Your frontend could use javascript to query the route that checks the job result every 5seconds.

<!-- gh-comment-id:974859104 --> @crossworth commented on GitHub (Nov 21, 2021): > So ideal (IMHO) it would be to have something like this: > > ```go > taskInfo->wait(ctx) > fmt.Println(taskInfo.Result) > ``` Hello, I don't think this would work. I mean the `wait` would have to use `Inspector.GetTaskInfo` in a loop with a timer to know when the task is finished, but if this is the case, I think using a goroutines and channels would be better, or if you really need to wait maybe doing it synchronously without a goroutine should work as well. Maybe you should generate a task with an ID and create a route that uses ` Inspector.GetTaskInfo` and check it periodically on the frontend? Since you are taking websites screenshot's you could use the URL as key. **Route that enqueues the job** - Enqueues a job with task ID `screenshot:https//google.com.br` (you could add timeout and unique options as well, this way if two requests are made for the same URL would generate the same key and you would only need to process they one time) - Returns the task id: `screenshot:https//google.com.br` **Route that check the job result** - Accepts an task ID and uses `Inspector.GetTaskInfo` to check the task result - Return the task result Your frontend could use javascript to query the route that checks the job result every 5seconds.
Author
Owner

@tpoxa commented on GitHub (Nov 21, 2021):

Ok, guys thanks. I came up with sth like this:

        conn := asynq.RedisClientOpt{Addr: redisAddr, Password: "password123"}
	client := asynq.NewClient(conn)
	defer client.Close()

	inspector := asynq.NewInspector(conn)
	defer inspector.Close()

	task, err := tasks.NewWebsiteScreenshotTask("https://eples2.com")
	if err != nil {
		log.Fatalf("could not create task: %v", err)
	}
	info, err := client.Enqueue(task,
		asynq.Unique(time.Minute*5),
		asynq.Timeout(time.Minute),
		asynq.Retention(2*time.Minute),
		asynq.Queue("default"),
	)
	if err != nil {
		log.Fatalf("could not enqueue task: %v", err)
	}

        // @todo use HTTP request's context
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
	defer cancel()

	res, err := waitForResult(ctx, inspector, "default", info.ID)
	if err != nil {
		log.Fatalf("unable to wait for resilt: %v", err)
	}
	var result tasks.WebsiteScreenshotResult
	err = json.Unmarshal(res.Result, &result)
	if err != nil {
		log.Fatalf("could not decode result: %v", err)
	}
	log.Printf("enqueued task: id=%s queue=%s result %v", info.ID, info.Queue, result)

Where wait function is next:


func waitForResult(ctx context.Context, i *asynq.Inspector, queue, taskID string) (*asynq.TaskInfo, error) {
	t := time.NewTicker(time.Second)
	defer t.Stop()
	for {
		select {
		case <-t.C:
			taskInfo, err := i.GetTaskInfo(queue, taskID)
			if err != nil {
				return nil, err
			}
			if taskInfo.CompletedAt.IsZero() {
				continue
			}
			return taskInfo, nil
		case <-ctx.Done():
			return nil, fmt.Errorf("context closed")
		}
	}
}

<!-- gh-comment-id:974872141 --> @tpoxa commented on GitHub (Nov 21, 2021): Ok, guys thanks. I came up with sth like this: ```go conn := asynq.RedisClientOpt{Addr: redisAddr, Password: "password123"} client := asynq.NewClient(conn) defer client.Close() inspector := asynq.NewInspector(conn) defer inspector.Close() task, err := tasks.NewWebsiteScreenshotTask("https://eples2.com") if err != nil { log.Fatalf("could not create task: %v", err) } info, err := client.Enqueue(task, asynq.Unique(time.Minute*5), asynq.Timeout(time.Minute), asynq.Retention(2*time.Minute), asynq.Queue("default"), ) if err != nil { log.Fatalf("could not enqueue task: %v", err) } // @todo use HTTP request's context ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() res, err := waitForResult(ctx, inspector, "default", info.ID) if err != nil { log.Fatalf("unable to wait for resilt: %v", err) } var result tasks.WebsiteScreenshotResult err = json.Unmarshal(res.Result, &result) if err != nil { log.Fatalf("could not decode result: %v", err) } log.Printf("enqueued task: id=%s queue=%s result %v", info.ID, info.Queue, result) ``` Where wait function is next: ```go func waitForResult(ctx context.Context, i *asynq.Inspector, queue, taskID string) (*asynq.TaskInfo, error) { t := time.NewTicker(time.Second) defer t.Stop() for { select { case <-t.C: taskInfo, err := i.GetTaskInfo(queue, taskID) if err != nil { return nil, err } if taskInfo.CompletedAt.IsZero() { continue } return taskInfo, nil case <-ctx.Done(): return nil, fmt.Errorf("context closed") } } } ```
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1116
No description provided.