[GH-ISSUE #532] [BUG] Canceled task goes into retry state #2277

Open
opened 2026-03-15 19:55:01 +03:00 by kerem · 15 comments
Owner

Originally created by @patriceshot on GitHub (Sep 1, 2022).
Original GitHub issue: https://github.com/hibiken/asynq/issues/532

Originally assigned to: @hibiken on GitHub.

Describe the bug
Canceled task goes into retry state, and is consumed again.
If I return a Skip retry error, it is ignored.

To Reproduce
Steps to reproduce the behavior (Code snippets if applicable):

  1. Start a worker
  2. Enqueue a task which starts a long running task
  3. Cancel the task (either with inspector or asynqmon)
  4. Task goes into retry state

Expected behavior
A canceled task should be archived or completed, we do not want it to be consumed again.

Code sample
Here's the code of my handler (the rest of the setup is standard):

func doWork(ctx context.Context, t *asynq.Task) error {
    for {
        select {
        case <-ctx.Done():
            log.Println("canceling inside DoWork")
            return nil
        default:
            time.Sleep(2 * time.Second)
            log.Printf("running... %v", time.Now())
        }
    }
}

func HandleStartPullTask(ctx context.Context, t *asynq.Task) error {
    c := make(chan error, 1)
    go func() {
        c <- doWork(ctx, t)
    }()
    select {
    case <-ctx.Done():
        // cancelation signal received, abandon this work.
        log.Println("canceling outside DoWork")
        return asynq.SkipRetry
    case res := <-c:
        return res
    }
}

Environment (please complete the following information):

  • OS: Linux
  • Version of asynq package: v0.23.0

Additional context
When I watch the error in processor.go, handleFailedMessage, the error is "Context canceled", even if I return a "skip retry" error in my task, it is ignored. Naively, I would add "context canceled" to the condition to skip retry, but perhaps I'm missing something ?

Originally created by @patriceshot on GitHub (Sep 1, 2022). Original GitHub issue: https://github.com/hibiken/asynq/issues/532 Originally assigned to: @hibiken on GitHub. **Describe the bug** Canceled task goes into retry state, and is consumed again. If I return a [Skip retry](https://github.com/hibiken/asynq/wiki/Task-Retry#skip-retry) error, it is ignored. **To Reproduce** Steps to reproduce the behavior (Code snippets if applicable): 1. Start a worker 2. Enqueue a task which starts a long running task 3. Cancel the task (either with inspector or asynqmon) 4. Task goes into retry state **Expected behavior** A canceled task should be archived or completed, we do not want it to be consumed again. **Code sample** Here's the code of my handler (the rest of the setup is standard): ``` func doWork(ctx context.Context, t *asynq.Task) error { for { select { case <-ctx.Done(): log.Println("canceling inside DoWork") return nil default: time.Sleep(2 * time.Second) log.Printf("running... %v", time.Now()) } } } func HandleStartPullTask(ctx context.Context, t *asynq.Task) error { c := make(chan error, 1) go func() { c <- doWork(ctx, t) }() select { case <-ctx.Done(): // cancelation signal received, abandon this work. log.Println("canceling outside DoWork") return asynq.SkipRetry case res := <-c: return res } } ``` **Environment (please complete the following information):** - OS: Linux - Version of `asynq` package: v0.23.0 **Additional context** When I watch the error in processor.go, handleFailedMessage, the error is "Context canceled", even if I return a "skip retry" error in my task, it is ignored. Naively, I would add "context canceled" to the condition to skip retry, but perhaps I'm missing something ?
Author
Owner

@dxl3811051 commented on GitHub (Sep 14, 2022):

我也碰到了这个问题 我以为就我碰到了呢,我想把任务终止掉 而且设置了重试次数为0, 但是当我cancel task时 这个任务会重新启动,而且原来的task也没有终止掉 最终时跑了2个任务。。。

<!-- gh-comment-id:1246510455 --> @dxl3811051 commented on GitHub (Sep 14, 2022): 我也碰到了这个问题 我以为就我碰到了呢,我想把任务终止掉 而且设置了重试次数为0, 但是当我cancel task时 这个任务会重新启动,而且原来的task也没有终止掉 最终时跑了2个任务。。。
Author
Owner

@linhbkhn95 commented on GitHub (Sep 21, 2022):

As I know based on asynq code, When a message is assigned to a worker, it also is created a cancel() function via
diagram like:

Biểu đồ không có tiêu đề-Trang-3 drawio

explain by code:

ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline)
p.cancelations.Add(msg.ID, cancel)
defer func() {
  cancel()
  p.cancelations.Delete(msg.ID)
}()
....
// New returns a context and cancel function for a given task message.
func New(base context.Context, msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) {
  metadata := taskMetadata{
  id:         msg.ID,
  maxRetry:   msg.Retry,
  retryCount: msg.Retried,
  qname:      msg.Queue,
  }
  ctx := context.WithValue(base, metadataCtxKey, metadata)
  return context.WithDeadline(ctx, deadline)
}

When you action Cancel the task (either with inspector or asynqmon) then asynq will Publish msg to asynq:cancel

// PublishCancelation publish cancelation message to all subscribers.
// The message is the ID for the task to be canceled.
func (r *RDB) PublishCancelation(id string) error {
	var op errors.Op = "rdb.PublishCancelation"
	ctx := context.Background()
	if err := r.client.Publish(ctx, base.CancelChannel, id).Err(); err != nil {
		return errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", err))
	}
	return nil
}

At the Subscriber will receive msg and action cancel() func.

for {
	pubsub, err = s.broker.CancelationPubSub()
	if err != nil {
		s.logger.Errorf("cannot subscribe to cancelation channel: %v", err)
		select {
		case <-time.After(s.retryTimeout):
			continue
		case <-s.done:
			s.logger.Debug("Subscriber done")
			return
		}
	}
	break
}
cancelCh := pubsub.Channel()
for {
	select {
	case <-s.done:
		pubsub.Close()
		s.logger.Debug("Subscriber done")
		return
	case msg := <-cancelCh:
		cancel, ok := s.cancelations.Get(msg.Payload)
		if ok {
			cancel()
		}
	}
}

Therefore, asynq will perform cancel func => return context canceled incontinently instead of SkipRetry. It means your's handler has not run to return the SkipRetry error. => your msg don't move to archive status.

func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
	if p.errHandler != nil {
		p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
	}
	if !p.isFailureFunc(err) {
		// retry the task without marking it as failed
		p.retry(l, msg, err, false /*isFailure*/)
		return
	}
	if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
		p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
		p.archive(l, msg, err)
	} else {
		p.retry(l, msg, err, true /*isFailure*/)
	}
}

I think it can it is the author's intention.
Please confirm for me if I miss understanding @hibiken

<!-- gh-comment-id:1253338932 --> @linhbkhn95 commented on GitHub (Sep 21, 2022): As I know based on `asynq` code, When a message is assigned to a worker, it also is created a cancel() function via diagram like: ![Biểu đồ không có tiêu đề-Trang-3 drawio](https://user-images.githubusercontent.com/21085454/191463424-0f822c69-02d0-4535-8fcf-ca68bc5b1888.png) explain by code: ``` ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline) p.cancelations.Add(msg.ID, cancel) defer func() { cancel() p.cancelations.Delete(msg.ID) }() .... // New returns a context and cancel function for a given task message. func New(base context.Context, msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) { metadata := taskMetadata{ id: msg.ID, maxRetry: msg.Retry, retryCount: msg.Retried, qname: msg.Queue, } ctx := context.WithValue(base, metadataCtxKey, metadata) return context.WithDeadline(ctx, deadline) } ``` When you action `Cancel the task (either with inspector or asynqmon)` then `asynq` will Publish msg to `asynq:cancel` ``` // PublishCancelation publish cancelation message to all subscribers. // The message is the ID for the task to be canceled. func (r *RDB) PublishCancelation(id string) error { var op errors.Op = "rdb.PublishCancelation" ctx := context.Background() if err := r.client.Publish(ctx, base.CancelChannel, id).Err(); err != nil { return errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", err)) } return nil } ``` At the Subscriber will receive msg and action `cancel() func`. ``` for { pubsub, err = s.broker.CancelationPubSub() if err != nil { s.logger.Errorf("cannot subscribe to cancelation channel: %v", err) select { case <-time.After(s.retryTimeout): continue case <-s.done: s.logger.Debug("Subscriber done") return } } break } cancelCh := pubsub.Channel() for { select { case <-s.done: pubsub.Close() s.logger.Debug("Subscriber done") return case msg := <-cancelCh: cancel, ok := s.cancelations.Get(msg.Payload) if ok { cancel() } } } ``` Therefore, `asynq` will perform `cancel` func => return `context canceled` incontinently instead of `SkipRetry`. It means your's handler has not run to return the `SkipRetry` error. => your msg don't move to archive status. ``` func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { if p.errHandler != nil { p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) } if !p.isFailureFunc(err) { // retry the task without marking it as failed p.retry(l, msg, err, false /*isFailure*/) return } if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) { p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) p.archive(l, msg, err) } else { p.retry(l, msg, err, true /*isFailure*/) } } ``` I think it can it is the author's intention. Please confirm for me if I miss understanding @hibiken
Author
Owner

@dxl3811051 commented on GitHub (Sep 21, 2022):

why?
I terminated the mission, but the mission run again

<!-- gh-comment-id:1253350379 --> @dxl3811051 commented on GitHub (Sep 21, 2022): > why? I terminated the mission, but the mission run again
Author
Owner

@linhbkhn95 commented on GitHub (Sep 21, 2022):

I think thetask canceling here is to stop work at the moment and then action at another time. It mean status reflect
Running -> Stop. Not Running to Archive

<!-- gh-comment-id:1253358333 --> @linhbkhn95 commented on GitHub (Sep 21, 2022): I think the`task canceling` here is to stop work at the moment and then action at another time. It mean status reflect `Running` -> `Stop`. Not `Running` to `Archive`
Author
Owner

@linhbkhn95 commented on GitHub (Sep 22, 2022):

if @dxl3811051 wants as your expected, I can update it into my repo. Then you can use it for now. We need to confirm the author to merge to this repo.

<!-- gh-comment-id:1254922473 --> @linhbkhn95 commented on GitHub (Sep 22, 2022): if @dxl3811051 wants as your expected, I can update it into my repo. Then you can use it for now. We need to confirm the author to merge to this repo.
Author
Owner

@dxl3811051 commented on GitHub (Sep 22, 2022):

如果@dxl3811051如您所料,我可以将其更新到我的仓库中。然后你可以暂时使用它。我们需要确认作者才能合并到这个 repo。

thanks

<!-- gh-comment-id:1254925242 --> @dxl3811051 commented on GitHub (Sep 22, 2022): > 如果@dxl3811051如您所料,我可以将其更新到我的仓库中。然后你可以暂时使用它。我们需要确认作者才能合并到这个 repo。 thanks
Author
Owner

@OAyomide commented on GitHub (Nov 29, 2022):

@linhbkhn95 how do you move to archive then? i tried cancelling the task but it doesnt work. it says i should use CancelTask and i cannot see reference to this anywhere.

<!-- gh-comment-id:1331423763 --> @OAyomide commented on GitHub (Nov 29, 2022): @linhbkhn95 how do you move to archive then? i tried cancelling the task but it doesnt work. it says i should use `CancelTask` and i cannot see reference to this anywhere.
Author
Owner

@linhbkhn95 commented on GitHub (Dec 3, 2022):

We cannot move directly from a normal task to an archive task. It only performs it when num of retry reach >= the limit. If you need it can improve at my repo instead of at this repo. @OAyomide

<!-- gh-comment-id:1336168166 --> @linhbkhn95 commented on GitHub (Dec 3, 2022): We cannot move directly from a normal task to an archive task. It only performs it when num of retry reach >= the limit. If you need it can improve at my repo instead of at this repo. @OAyomide
Author
Owner

@hungtcs commented on GitHub (Aug 9, 2023):

+1 Same Issues, are there any existing solutions?

image
<!-- gh-comment-id:1671014385 --> @hungtcs commented on GitHub (Aug 9, 2023): +1 Same Issues, are there any existing solutions? <img width="1239" alt="image" src="https://github.com/hibiken/asynq/assets/11692935/2a11315c-366e-4d6d-aed4-1501e1622c8f">
Author
Owner

@agorman commented on GitHub (Feb 29, 2024):

I'm running into this issue as well. It would be trivial to write middleware that could check if the context has been cancelled and return an asynq.SkipRetry. However, the context may have been cancelled for other reasons. For example a timeout or a deadline.

I propose that instead of using context.CancelFunc to cancel the task internally you use a context.CancelCauseFunc. That way we could use context.Cause(ctx) to see why it was cancelled and call asynq.SkipRetry accordingly.

Thoughts?

<!-- gh-comment-id:1972166470 --> @agorman commented on GitHub (Feb 29, 2024): I'm running into this issue as well. It would be trivial to write middleware that could check if the context has been cancelled and return an `asynq.SkipRetry`. However, the context may have been cancelled for other reasons. For example a timeout or a deadline. I propose that instead of using `context.CancelFunc` to cancel the task internally you use a `context.CancelCauseFunc`. That way we could use `context.Cause(ctx)` to see why it was cancelled and call `asynq.SkipRetry` accordingly. Thoughts?
Author
Owner

@agorman commented on GitHub (Mar 1, 2024):

After more investigation my solution above will not work. Asynq returns context.Cancelled error right away and does not wait for the task to finish or read it's error message. This was also explained above by @linhbkhn95 but I missed it.

I think it would be nice if we could have a mechanism to cancel and skip retry via the inspector.

<!-- gh-comment-id:1973849361 --> @agorman commented on GitHub (Mar 1, 2024): After more investigation my solution above will not work. Asynq returns context.Cancelled error right away and does not wait for the task to finish or read it's error message. This was also explained above by @linhbkhn95 but I missed it. I think it would be nice if we could have a mechanism to cancel and skip retry via the inspector.
Author
Owner

@kamikazechaser commented on GitHub (Mar 3, 2024):

I don't think we can have a case where any context.Cancelled error pushes a task into archived state here. The potential side effects are just too many.

Ideally we want something in the handleFailedMessage that either reads the task message metadata (set immediately before signalling context cancellation) or another error type.

<!-- gh-comment-id:1975072658 --> @kamikazechaser commented on GitHub (Mar 3, 2024): I don't think we can have a case where _any_ context.Cancelled error pushes a task into archived state [here](https://github.com/hibiken/asynq/blob/master/processor.go#L326). The potential side effects are just too many. Ideally we want something in the `handleFailedMessage` that either reads the task message metadata (set immediately before signalling context cancellation) or another error type.
Author
Owner

@handsomefox commented on GitHub (Jul 31, 2024):

As a temporary solution until the library itself solves this issue, I've ended up using this code to cancel tasks:

func (t *Tasker) CancelTask(queue, id string) error {
	if err := t.inspector.CancelProcessing(id); err != nil {
		return err
	}

	for range 100 {
		err := t.inspector.ArchiveTask(queue, id)
		if err == nil {
			break
		}
		if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) {
			break
		}
		time.Sleep(time.Millisecond * 100)
	}

	return nil
}

It might not work if you have no retry delay set, so use at your own risk.

<!-- gh-comment-id:2261580852 --> @handsomefox commented on GitHub (Jul 31, 2024): As a temporary solution until the library itself solves this issue, I've ended up using this code to cancel tasks: ```go func (t *Tasker) CancelTask(queue, id string) error { if err := t.inspector.CancelProcessing(id); err != nil { return err } for range 100 { err := t.inspector.ArchiveTask(queue, id) if err == nil { break } if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) { break } time.Sleep(time.Millisecond * 100) } return nil } ``` It might not work if you have no retry delay set, so use at your own risk.
Author
Owner

@alexshopee commented on GitHub (Aug 24, 2024):

Suppose there is a long running task and it would not success anymore, then the best practice is to just cancel it without any retries to save the worker resources

<!-- gh-comment-id:2308407480 --> @alexshopee commented on GitHub (Aug 24, 2024): Suppose there is a long running task and it would not success anymore, then the best practice is to just cancel it without any retries to save the worker resources
Author
Owner

@s-k-yx commented on GitHub (Sep 17, 2025):

		select {
		case <-p.abort:
			// time is up, push the message back to queue and quit this worker goroutine.
			p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
			p.requeue(lease, msg)
			return
		case <-lease.Done():
			cancel()
			p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
			return
		case <-ctx.Done():
			p.handleFailedMessage(ctx, lease, msg, ctx.Err())
			return
		case resErr := <-resCh:
			if resErr != nil {
				p.handleFailedMessage(ctx, lease, msg, resErr)
				return
			}
			p.handleSucceededMessage(lease, msg)
		}

github.com/hibiken/asynq@c327bc40a2/processor.go (L236C1-L255C5)
problem is here, user expects to handle the context on their own, maybe case <-ctx.Done(): is not needed, user should check it in their own function, and return some error

<!-- gh-comment-id:3301065441 --> @s-k-yx commented on GitHub (Sep 17, 2025): select { case <-p.abort: // time is up, push the message back to queue and quit this worker goroutine. p.logger.Warnf("Quitting worker. task id=%s", msg.ID) p.requeue(lease, msg) return case <-lease.Done(): cancel() p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired) return case <-ctx.Done(): p.handleFailedMessage(ctx, lease, msg, ctx.Err()) return case resErr := <-resCh: if resErr != nil { p.handleFailedMessage(ctx, lease, msg, resErr) return } p.handleSucceededMessage(lease, msg) } https://github.com/hibiken/asynq/blob/c327bc40a28e4db45195cfe082d88faa808ce87d/processor.go#L236C1-L255C5 problem is here, user expects to handle the context on their own, maybe `case <-ctx.Done():` is not needed, user should check it in their own function, and return some error
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2277
No description provided.