[GH-ISSUE #396] [QUESTION] Resuming tasks on worker restart #2201

Closed
opened 2026-03-15 19:39:33 +03:00 by kerem · 5 comments
Owner

Originally created by @hibiken on GitHub (Feb 4, 2022).
Original GitHub issue: https://github.com/hibiken/asynq/issues/396

Our use case is as follows:

  • We receive the task and the worker processes it
  • The worker creates a docker container foreach task
  • When the worker server is restarted during the execution of the task, it's still able to recover the docker container and continue the task
  • However, when the process finishes the task is not moved from Active state to Completed.

Here's a mock code:

func main() {
  mux := asynq.NewServeMux()
  
  go func() { 
    for _, taskId := range staleTaskIds { 
      info, _ := inspector().GetTaskInfo(queueName, taskId);
      mux.ProcessTask(context.Background(), asnyq.NewTask(info.Type, info.Payload))
    }
  }()
  
  mux.HandlerFunc(/* function to handle process */)
  server.Run(mux);
}

Now this is mock code, but basically we retrieve the task ids and reprocess them. The handler receives the reprocessed task but does not update the state to completed afterwards. I tried to manually cancel the task by using inspector.CancelProcessing but it doesn't work.

Any idea how can I resume a task when the worker server restarts?

Originally posted by @svedova in https://github.com/hibiken/asynq/issues/334#issuecomment-1029913693

Originally created by @hibiken on GitHub (Feb 4, 2022). Original GitHub issue: https://github.com/hibiken/asynq/issues/396 Our use case is as follows: - We receive the task and the worker processes it - The worker creates a docker container foreach task - When the worker server is restarted during the execution of the task, it's still able to recover the docker container and continue the task - However, when the process finishes the task is not moved from Active state to Completed. Here's a mock code: ``` func main() { mux := asynq.NewServeMux() go func() { for _, taskId := range staleTaskIds { info, _ := inspector().GetTaskInfo(queueName, taskId); mux.ProcessTask(context.Background(), asnyq.NewTask(info.Type, info.Payload)) } }() mux.HandlerFunc(/* function to handle process */) server.Run(mux); } ``` Now this is mock code, but basically we retrieve the task ids and reprocess them. The handler receives the reprocessed task but does not update the state to completed afterwards. I tried to manually cancel the task by using `inspector.CancelProcessing` but it doesn't work. Any idea how can I resume a task when the worker server restarts? _Originally posted by @svedova in https://github.com/hibiken/asynq/issues/334#issuecomment-1029913693_
kerem closed this issue 2026-03-15 19:39:39 +03:00
Author
Owner

@hibiken commented on GitHub (Feb 4, 2022):

@svedova Thank you for commenting with a question (I moved to a new issue since it's slightly different from the original issue).

Where are you getting this staleTaskIds? Is this a channel or a slice?

<!-- gh-comment-id:1030067664 --> @hibiken commented on GitHub (Feb 4, 2022): @svedova Thank you for commenting with a question (I moved to a new issue since it's slightly different from the original issue). Where are you getting this `staleTaskIds`? Is this a channel or a slice?
Author
Owner

@svedova commented on GitHub (Feb 7, 2022):

TLDR;

staleTaskIds are populated by looking at the filesystem. It's a slice of string. We store the task id in the filesytem each time a job is processed and delete them when the job id is processed. When the server restarts, we check the filesystem and if there is anything it means that the task was incomplete and the server crashed while processing the task.

=====

Longer version:

@hibiken so this is our use case at Stormkit:

  1. We build and deploy users applications. Each time they hit deploy we create a deployment task.
  2. Once the worker server receives the task, we store the task id in badger. This keeps the task id in the filesystem. This task id is removed from the filesystem when the task is processed.
  3. If the server crashes, since it's in the filesystem, it's repicked and reprocessed when the server restarts.

The is the code we use to this operation:

mux.HandleFunc(tasks.DeploymentStart, func(ctx context.Context, t *asynq.Task) error {
    payload := t.Payload()
    message := messageFromPayload(payload)

    // This is the part where we store the stale IDs
    // Once the deployment is over, the task is considered done and we remove 
    // the task id from the filesystem
    db.Write(message.TaskID(), payload)
    defer db.Delete(message.TaskID())

    ProcessDeployment(message)
    return nil
})

For some reason, the next time the task is picked up (in case server restarts), the task is processed but it's not moved to the completed tab. We use the same task id as we assign a custom id to the task.

<!-- gh-comment-id:1031919616 --> @svedova commented on GitHub (Feb 7, 2022): TLDR; `staleTaskIds` are populated by looking at the filesystem. It's a slice of string. We store the task id in the filesytem each time a job is processed and delete them when the job id is processed. When the server restarts, we check the filesystem and if there is anything it means that the task was incomplete and the server crashed while processing the task. ===== Longer version: @hibiken so this is our use case at [Stormkit](https://www.stormkit.io): 1. We build and deploy users applications. Each time they hit deploy we create a deployment task. 2. Once the worker server receives the task, we store the task id in [badger](https://github.com/dgraph-io/badger). This keeps the task id in the filesystem. This task id is removed from the filesystem when the task is processed. 3. If the server crashes, since it's in the filesystem, it's repicked and reprocessed when the server restarts. The is the code we use to this operation: ``` mux.HandleFunc(tasks.DeploymentStart, func(ctx context.Context, t *asynq.Task) error { payload := t.Payload() message := messageFromPayload(payload) // This is the part where we store the stale IDs // Once the deployment is over, the task is considered done and we remove // the task id from the filesystem db.Write(message.TaskID(), payload) defer db.Delete(message.TaskID()) ProcessDeployment(message) return nil }) ``` For some reason, the next time the task is picked up (in case server restarts), the task is processed but it's not moved to the completed tab. We use the same task id as we assign a custom id to the task.
Author
Owner

@hibiken commented on GitHub (Feb 8, 2022):

@svedova Thanks for providing more detailed info.

We store the task id in the filesytem each time a job is processed and delete them when the job id is processed. When the server restarts, we check the filesystem and if there is anything it means that the task was incomplete and the server crashed while processing the task.

You don't have to do this yourself, let the library handle this.

If a worker server crashed and left tasks in active state (i.e. stale/orphaned tasks), it'll be recovered automatically by another server listening to the same queue (this another server could be a restarted process).

So this code is not necessary:

  go func() { 
    for _, taskId := range staleTaskIds { 
      info, _ := inspector().GetTaskInfo(queueName, taskId);
      mux.ProcessTask(context.Background(), asnyq.NewTask(info.Type, info.Payload))
    }
  }()

Current implementation (v0.21.0) will recover an orphaned task once its deadline has exceeded (FYI: we have an open issue to improve this #334 by making the recovery quicker). If you want these tasks to be recovered quickly, a recommendation is to specify an appropriate timeout. For example, if your task should be handled within 1m, then you'd specify that via Timeout option.

info, err := client.Enqueue(task, asynq.Timeout(1 * time.Minute))

With this, if a server crashes while handling this task, the task's deadline will reach within the next 1m and another server listening to the same queue can recover this task (by moving it from the active state to retry state).

Let me know if you have more questions! I'm happy to assist :)

<!-- gh-comment-id:1032158059 --> @hibiken commented on GitHub (Feb 8, 2022): @svedova Thanks for providing more detailed info. > We store the task id in the filesytem each time a job is processed and delete them when the job id is processed. When the server restarts, we check the filesystem and if there is anything it means that the task was incomplete and the server crashed while processing the task. You don't have to do this yourself, let the library handle this. If a worker server crashed and left tasks in active state (i.e. stale/orphaned tasks), it'll be [recovered automatically](https://github.com/hibiken/asynq/blob/master/recoverer.go) by another server listening to the same queue (this *another server* could be a restarted process). So this code is not necessary: ```go go func() { for _, taskId := range staleTaskIds { info, _ := inspector().GetTaskInfo(queueName, taskId); mux.ProcessTask(context.Background(), asnyq.NewTask(info.Type, info.Payload)) } }() ``` Current implementation (v0.21.0) will recover an orphaned task once its deadline has exceeded (FYI: we have an open issue to improve this #334 by making the recovery quicker). If you want these tasks to be recovered quickly, a recommendation is to specify an appropriate timeout. For example, if your task should be handled within 1m, then you'd specify that via `Timeout` option. ```go info, err := client.Enqueue(task, asynq.Timeout(1 * time.Minute)) ``` With this, if a server crashes while handling this task, the task's deadline will reach within the next 1m and another server listening to the same queue can recover this task (by moving it from the active state to retry state). Let me know if you have more questions! I'm happy to assist :)
Author
Owner

@svedova commented on GitHub (Feb 8, 2022):

@hibiken thanks a lot for the answer! I guess I'll use it that way but it would be very nice to have the ability to pick it up immediately. Our timeout is 10 minutes and if the server crashes during the first minute the user will have to wait approximately 10 minutes for the task to be picked up. So I guess this becomes a feature request :)

<!-- gh-comment-id:1033118615 --> @svedova commented on GitHub (Feb 8, 2022): @hibiken thanks a lot for the answer! I guess I'll use it that way but it would be very nice to have the ability to pick it up immediately. Our timeout is 10 minutes and if the server crashes during the first minute the user will have to wait approximately 10 minutes for the task to be picked up. So I guess this becomes a feature request :)
Author
Owner

@hibiken commented on GitHub (Feb 9, 2022):

Totally agree with your concern. I'm actively working this right now and it'll be part of the next version.
Please feel free to subscribe to #334, so that you'll be notified when that issue is closed.

Duplicate of #334.

<!-- gh-comment-id:1033297367 --> @hibiken commented on GitHub (Feb 9, 2022): Totally agree with your concern. I'm actively working this right now and it'll be part of the next version. Please feel free to subscribe to #334, so that you'll be notified when that issue is closed. Duplicate of #334.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2201
No description provided.