[GH-ISSUE #334] [FEATURE REQUEST] Recover tasks immediately when server is crashed #1161

Closed
opened 2026-03-07 22:06:41 +03:00 by kerem · 3 comments
Owner

Originally created by @hawksheng on GitHub (Oct 20, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/334

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
Some tasks of my project plan to run for several hours, however, if the asynq-server crashes/hang, they can't be retried immediately until a timeout is reached.

Describe the solution you'd like

  • Add worker ID into task hashtable when dequeueing from a pending queue.
  • Recoverer periodically iterate tasks in active task set of queues and recover tasks if their workers are offline.

Describe alternatives you've considered
N/A

Additional context
N/A

btw, really a great job you have made 👍

Originally created by @hawksheng on GitHub (Oct 20, 2021). Original GitHub issue: https://github.com/hibiken/asynq/issues/334 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** Some tasks of my project plan to run for several hours, however, if the asynq-server crashes/hang, they can't be retried immediately until a timeout is reached. **Describe the solution you'd like** * Add worker ID into task hashtable when dequeueing from a pending queue. * Recoverer periodically iterate tasks in active task set of queues and recover tasks if their workers are offline. **Describe alternatives you've considered** N/A **Additional context** N/A btw, really a great job you have made 👍
kerem 2026-03-07 22:06:41 +03:00
Author
Owner

@hibiken commented on GitHub (Oct 20, 2021):

@Chosokabeho thank you for creating an issue! This is a great feedback.

I believe RabbitMQ does something similar.
Quote from their getting started guide:

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

A timeout (30 minutes by default) is enforced on consumer delivery acknowledgement. This helps detect buggy (stuck) consumers that never acknowledge deliveries. You can increase this timeout as described in Delivery Acknowledgement Timeout.

We have the second part (Timeout) for Asynq, but as you pointed out, the first part is not implemented in Asynq. I will look into the behavior of other systems (RabbitMQ, etc) and come up with a sensible solution here.
I want to look at other systems because I'm not sure if we want to consider temporary network partitions and deal with it gracefully. For example, if asynq-server processes could not talk to Redis for some time due to network partition, do we want to requeue those tasks immediately? Maybe we need to have some tolerance around network partition (e.g. allow up to certain duration to be offline)
I'll look into this more and find a solution here.

Let me know if anyone has experience around this topic!

<!-- gh-comment-id:947709902 --> @hibiken commented on GitHub (Oct 20, 2021): @Chosokabeho thank you for creating an issue! This is a great feedback. I believe RabbitMQ does something similar. Quote from their getting started guide: > If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die. > A timeout (30 minutes by default) is enforced on consumer delivery acknowledgement. This helps detect buggy (stuck) consumers that never acknowledge deliveries. You can increase this timeout as described in Delivery Acknowledgement Timeout. We have the second part (Timeout) for Asynq, but as you pointed out, the first part is not implemented in Asynq. I will look into the behavior of other systems (RabbitMQ, etc) and come up with a sensible solution here. I want to look at other systems because I'm not sure if we want to consider temporary network partitions and deal with it gracefully. For example, if asynq-server processes could not talk to Redis for some time due to network partition, do we want to requeue those tasks immediately? Maybe we need to have some tolerance around network partition (e.g. allow up to certain duration to be offline) I'll look into this more and find a solution here. Let me know if anyone has experience around this topic!
Author
Owner

@svedova commented on GitHub (Feb 4, 2022):

Our use case is as follows:

  • We receive the task and the worker processes it
  • The worker creates a docker container foreach task
  • When the worker server is restarted during the execution of the task, it's still able to recover the docker container and continue the task
  • However, when the process finishes the task is not moved from Active state to Completed.

Here's a mock code:

func main() {
  mux := asynq.NewServeMux()
  
  go func() { 
    for _, taskId := range staleTaskIds { 
      info, _ := inspector().GetTaskInfo(queueName, taskId);
      mux.ProcessTask(context.Background(), asnyq.NewTask(info.Type, info.Payload))
    }
  }()
  
  mux.HandlerFunc(/* function to handle process */)
  server.Run(mux);
}

Now this is mock code, but basically we retrieve the task ids and reprocess them. The handler receives the reprocessed task but does not update the state to completed afterwards. I tried to manually cancel the task by using inspector.CancelProcessing but it doesn't work.

Any idea how can I resume a task when the worker server restarts?

<!-- gh-comment-id:1029913693 --> @svedova commented on GitHub (Feb 4, 2022): Our use case is as follows: - We receive the task and the worker processes it - The worker creates a docker container foreach task - When the worker server is restarted during the execution of the task, it's still able to recover the docker container and continue the task - However, when the process finishes the task is not moved from Active state to Completed. Here's a mock code: ``` func main() { mux := asynq.NewServeMux() go func() { for _, taskId := range staleTaskIds { info, _ := inspector().GetTaskInfo(queueName, taskId); mux.ProcessTask(context.Background(), asnyq.NewTask(info.Type, info.Payload)) } }() mux.HandlerFunc(/* function to handle process */) server.Run(mux); } ``` Now this is mock code, but basically we retrieve the task ids and reprocess them. The handler receives the reprocessed task but does not update the state to completed afterwards. I tried to manually cancel the task by using `inspector.CancelProcessing` but it doesn't work. Any idea how can I resume a task when the worker server restarts?
Author
Owner

@hibiken commented on GitHub (Feb 4, 2022):

@svedova thanks for commenting, I moved the discussion to #396 👍

<!-- gh-comment-id:1030068177 --> @hibiken commented on GitHub (Feb 4, 2022): @svedova thanks for commenting, I moved the discussion to #396 👍
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1161
No description provided.