[GH-ISSUE #90] [BUG] Same task received/processed by more than one worker #1039

Closed
opened 2026-03-07 22:04:25 +03:00 by kerem · 4 comments
Owner

Originally created by @gunnsth on GitHub (Feb 18, 2020).
Original GitHub issue: https://github.com/hibiken/asynq/issues/90

Originally assigned to: @hibiken on GitHub.

Describe the bug
The problem is that I spawned a taskqueuer that queued tasks ranging from now - 10 minutes to now + 10 minutes. With 4 workers running (each at concurrency 1).

The output I got was:

taskrunner2_1  | Creating redis worker
taskrunner2_1  | asynq: pid=1 2020/02/18 22:46:41.351359 INFO: Starting processing
taskrunner_1   | Creating redis worker
taskrunner_1   | asynq: pid=1 2020/02/18 22:46:41.359319 INFO: Starting processing
taskrunner_1   | asynq: pid=1 2020/02/18 22:46:41.443077 INFO: Send signal TSTP to stop processing new tasks
taskrunner_1   | asynq: pid=1 2020/02/18 22:46:41.443085 INFO: Send signal TERM or INT to terminate the process
taskrunner_1   | Got task: task 3
taskrunner_1   | (*asynq.Task)(0xc00000e220)({
taskrunner_1   |  Type: (string) (len=6) "task 3",
taskrunner_1   |  Payload: (asynq.Payload) {
taskrunner_1   |   data: (map[string]interface {}) (len=1) {
taskrunner_1   |    (string) (len=1) "i": (float64) -8
taskrunner_1   |   }
taskrunner_1   |  }
taskrunner_1   | })
taskrunner_1   | Got task: task 4
taskrunner_1   | (*asynq.Task)(0xc00000e440)({
taskrunner_1   |  Type: (string) (len=6) "task 4",
taskrunner_1   |  Payload: (asynq.Payload) {
taskrunner_1   |   data: (map[string]interface {}) (len=1) {
taskrunner_1   |    (string) (len=1) "i": (float64) -7
taskrunner_1   |   }
taskrunner_1   |  }
taskrunner_1   | })
taskrunner2_1  | Got task: task 1
taskrunner2_1  | (*asynq.Task)(0xc00000e380)({
taskrunner2_1  |  Type: (string) (len=6) asynq: pid=1 2020/02/18 22:46:41.443166 INFO: Send signal TSTP to stop processing new tasks
taskrunner2_1  | asynq: pid=1 2020/02/18 22:46:41.443173 INFO: Send signal TERM or INT to terminate the process
taskrunner2_1  | "task 1",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -10
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 2
taskrunner2_1  | (*asynq.Task)(0xc00000e540)({
taskrunner2_1  |  Type: (string) (len=6) "task 2",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -9
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 3
taskrunner2_1  | (*asynq.Task)(0xc00007c900)({
taskrunner2_1  |  Type: (string) (len=6) "task 3",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -8
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 4
taskrunner2_1  | (*asynq.Task)(0xc00000e640)({
taskrunner2_1  |  Type: (string) (len=6) "task 4",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -7
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 6
taskrunner2_1  | (*asynq.Task)(0xc00007ca00)({
taskrunner2_1  |  Type: (string) (len=6) "task 6",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -5
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
taskrunner2_1  | Got task: task 7
taskrunner2_1  | (*asynq.Task)(0xc00000e740)({
taskrunner2_1  |  Type: (string) (len=6) "task 7",
taskrunner2_1  |  Payload: (asynq.Payload) {
taskrunner2_1  |   data: (map[string]interface {}) (len=1) {
taskrunner2_1  |    (string) (len=1) "i": (float64) -4
taskrunner2_1  |   }
taskrunner2_1  |  }
taskrunner2_1  | })
...

So tasks 3 and 4 were received twice, which could lead to problems, although I admit the case I am working with is a bit strange. I.e. minutes in the past etc.

To Reproduce
Steps to reproduce the behavior (Code snippets if applicable):

  1. Setup background processing ...
    Start 4 taskrunners
	bg := asynq.NewBackground(r, &asynq.Config{
		Concurrency: 1,
	})

	bg.Run(asynq.HandlerFunc(handler))
}

func handler(ctx context.Context, t *asynq.Task) error {
	fmt.Printf("Got task: %s\n", t.Type)
	spew.Dump(t)
	return nil
}
  1. Enqueue tasks ...
	fmt.Printf("Creating redis client\n")
	client := asynq.NewClient(r)

	n := 1
	for i := -10; i < 10; i++ {
		name := fmt.Sprintf("task %d", n)
		t := asynq.NewTask(name, map[string]interface{}{"i": i})
		err := client.Schedule(t, time.Now().Add(time.Duration(i)*time.Minute))
		if err != nil {
			fmt.Printf("Error scheduling\n")
			panic(err)
		}

		n+
}
  1. See Error ...
    As per the output above, tasks 3 and 4 were received by two workers. Would be good if we can guarantee that each task can only get processed once.

Note the way I am spawning the workers and queuer, it is possible that the tasks are queued before some workers start. It is all starting at the same time. Ideally that would not matter.

Expected behavior
Expected that each task could only be received by one worker / processed once.

Screenshots
N/A

Environment (please complete the following information):

  • Running on Ubuntu Linux
  • Spawning up redis, taskqueuer, 4x taskrunners in a single docker compose file.
  • Version asynq 0.4

Additional context
If needed, I can clean up my docker compose environment and provide a fully contained example.

Originally created by @gunnsth on GitHub (Feb 18, 2020). Original GitHub issue: https://github.com/hibiken/asynq/issues/90 Originally assigned to: @hibiken on GitHub. **Describe the bug** The problem is that I spawned a taskqueuer that queued tasks ranging from now - 10 minutes to now + 10 minutes. With 4 workers running (each at concurrency 1). The output I got was: ``` taskrunner2_1 | Creating redis worker taskrunner2_1 | asynq: pid=1 2020/02/18 22:46:41.351359 INFO: Starting processing taskrunner_1 | Creating redis worker taskrunner_1 | asynq: pid=1 2020/02/18 22:46:41.359319 INFO: Starting processing taskrunner_1 | asynq: pid=1 2020/02/18 22:46:41.443077 INFO: Send signal TSTP to stop processing new tasks taskrunner_1 | asynq: pid=1 2020/02/18 22:46:41.443085 INFO: Send signal TERM or INT to terminate the process taskrunner_1 | Got task: task 3 taskrunner_1 | (*asynq.Task)(0xc00000e220)({ taskrunner_1 | Type: (string) (len=6) "task 3", taskrunner_1 | Payload: (asynq.Payload) { taskrunner_1 | data: (map[string]interface {}) (len=1) { taskrunner_1 | (string) (len=1) "i": (float64) -8 taskrunner_1 | } taskrunner_1 | } taskrunner_1 | }) taskrunner_1 | Got task: task 4 taskrunner_1 | (*asynq.Task)(0xc00000e440)({ taskrunner_1 | Type: (string) (len=6) "task 4", taskrunner_1 | Payload: (asynq.Payload) { taskrunner_1 | data: (map[string]interface {}) (len=1) { taskrunner_1 | (string) (len=1) "i": (float64) -7 taskrunner_1 | } taskrunner_1 | } taskrunner_1 | }) taskrunner2_1 | Got task: task 1 taskrunner2_1 | (*asynq.Task)(0xc00000e380)({ taskrunner2_1 | Type: (string) (len=6) asynq: pid=1 2020/02/18 22:46:41.443166 INFO: Send signal TSTP to stop processing new tasks taskrunner2_1 | asynq: pid=1 2020/02/18 22:46:41.443173 INFO: Send signal TERM or INT to terminate the process taskrunner2_1 | "task 1", taskrunner2_1 | Payload: (asynq.Payload) { taskrunner2_1 | data: (map[string]interface {}) (len=1) { taskrunner2_1 | (string) (len=1) "i": (float64) -10 taskrunner2_1 | } taskrunner2_1 | } taskrunner2_1 | }) taskrunner2_1 | Got task: task 2 taskrunner2_1 | (*asynq.Task)(0xc00000e540)({ taskrunner2_1 | Type: (string) (len=6) "task 2", taskrunner2_1 | Payload: (asynq.Payload) { taskrunner2_1 | data: (map[string]interface {}) (len=1) { taskrunner2_1 | (string) (len=1) "i": (float64) -9 taskrunner2_1 | } taskrunner2_1 | } taskrunner2_1 | }) taskrunner2_1 | Got task: task 3 taskrunner2_1 | (*asynq.Task)(0xc00007c900)({ taskrunner2_1 | Type: (string) (len=6) "task 3", taskrunner2_1 | Payload: (asynq.Payload) { taskrunner2_1 | data: (map[string]interface {}) (len=1) { taskrunner2_1 | (string) (len=1) "i": (float64) -8 taskrunner2_1 | } taskrunner2_1 | } taskrunner2_1 | }) taskrunner2_1 | Got task: task 4 taskrunner2_1 | (*asynq.Task)(0xc00000e640)({ taskrunner2_1 | Type: (string) (len=6) "task 4", taskrunner2_1 | Payload: (asynq.Payload) { taskrunner2_1 | data: (map[string]interface {}) (len=1) { taskrunner2_1 | (string) (len=1) "i": (float64) -7 taskrunner2_1 | } taskrunner2_1 | } taskrunner2_1 | }) taskrunner2_1 | Got task: task 6 taskrunner2_1 | (*asynq.Task)(0xc00007ca00)({ taskrunner2_1 | Type: (string) (len=6) "task 6", taskrunner2_1 | Payload: (asynq.Payload) { taskrunner2_1 | data: (map[string]interface {}) (len=1) { taskrunner2_1 | (string) (len=1) "i": (float64) -5 taskrunner2_1 | } taskrunner2_1 | } taskrunner2_1 | }) taskrunner2_1 | Got task: task 7 taskrunner2_1 | (*asynq.Task)(0xc00000e740)({ taskrunner2_1 | Type: (string) (len=6) "task 7", taskrunner2_1 | Payload: (asynq.Payload) { taskrunner2_1 | data: (map[string]interface {}) (len=1) { taskrunner2_1 | (string) (len=1) "i": (float64) -4 taskrunner2_1 | } taskrunner2_1 | } taskrunner2_1 | }) ... ``` So tasks 3 and 4 were received twice, which could lead to problems, although I admit the case I am working with is a bit strange. I.e. minutes in the past etc. **To Reproduce** Steps to reproduce the behavior (Code snippets if applicable): 1. Setup background processing ... Start 4 taskrunners ```go bg := asynq.NewBackground(r, &asynq.Config{ Concurrency: 1, }) bg.Run(asynq.HandlerFunc(handler)) } func handler(ctx context.Context, t *asynq.Task) error { fmt.Printf("Got task: %s\n", t.Type) spew.Dump(t) return nil } ``` 2. Enqueue tasks ... ```go fmt.Printf("Creating redis client\n") client := asynq.NewClient(r) n := 1 for i := -10; i < 10; i++ { name := fmt.Sprintf("task %d", n) t := asynq.NewTask(name, map[string]interface{}{"i": i}) err := client.Schedule(t, time.Now().Add(time.Duration(i)*time.Minute)) if err != nil { fmt.Printf("Error scheduling\n") panic(err) } n+ } ``` 3. See Error ... As per the output above, tasks 3 and 4 were received by two workers. Would be good if we can guarantee that each task can only get processed once. Note the way I am spawning the workers and queuer, it is possible that the tasks are queued before some workers start. It is all starting at the same time. Ideally that would not matter. **Expected behavior** Expected that each task could only be received by one worker / processed once. **Screenshots** N/A **Environment (please complete the following information):** - Running on Ubuntu Linux - Spawning up redis, taskqueuer, 4x taskrunners in a single docker compose file. - Version asynq 0.4 **Additional context** If needed, I can clean up my docker compose environment and provide a fully contained example.
kerem 2026-03-07 22:04:25 +03:00
  • closed this issue
  • added the
    bug
    label
Author
Owner

@hibiken commented on GitHub (Feb 19, 2020):

Thanks for filing this bug report!

I ran the same code on my machine with 4 worker processes reading from the same redis instance but could not reproduce the bug.

It could be that you've run the client code multiple times and there were duplicate tasks in Redis. Would you mind trying this again with clean Redis DB?

You can asynqmon stats to make sure that there's no tasks in Redis.
You can flush redis by running redis-cli flushdb

<!-- gh-comment-id:588031457 --> @hibiken commented on GitHub (Feb 19, 2020): Thanks for filing this bug report! I ran the same code on my machine with 4 worker processes reading from the same redis instance but could not reproduce the bug. It could be that you've run the client code multiple times and there were duplicate tasks in Redis. Would you mind trying this again with clean Redis DB? You can `asynqmon stats` to make sure that there's no tasks in Redis. You can flush redis by running `redis-cli flushdb`
Author
Owner

@gunnsth commented on GitHub (Feb 19, 2020):

@hibiken I prepared an environment where this can be reproduced (uses docker, docker-compose):
https://github.com/hibiken/asynq/compare/master...gunnsth:issue90-reproduce?expand=1
to reproduce it:

$ cd testdata
$ docker-compose up

It will not always give exactly the same results, I guess there's some stochastic/random factor that determines which taskrunner catches tasks.

Example outputs:

taskrunner_1   | Tasks processed
taskrunner_1   | (map[int]int) (len=5) {
taskrunner_1   |  (int) 10: (int) 1,
taskrunner_1   |  (int) 2: (int) 1,
taskrunner_1   |  (int) 4: (int) 1,
taskrunner_1   |  (int) 6: (int) 1,
taskrunner_1   |  (int) 8: (int) 1
taskrunner_1   | }
taskrunner3_1  | Tasks processed
taskrunner3_1  | (map[int]int) (len=3) {
taskrunner3_1  |  (int) 1: (int) 1,
taskrunner3_1  |  (int) 2: (int) 1,
taskrunner3_1  |  (int) 20: (int) 1
taskrunner3_1  | }
taskrunner2_1  | Tasks processed
taskrunner2_1  | (map[int]int) (len=7) {
taskrunner2_1  |  (int) 1: (int) 1,
taskrunner2_1  |  (int) 3: (int) 1,
taskrunner2_1  |  (int) 5: (int) 1,
taskrunner2_1  |  (int) 7: (int) 1,
taskrunner2_1  |  (int) 9: (int) 1,
taskrunner2_1  |  (int) 11: (int) 1,
taskrunner2_1  |  (int) 14: (int) 1
taskrunner2_1  | }

We see that both taskrunner1 and 3 have processed task 2. And taskrunner3 and 2 have both processed task1.

Is there any way we can ensure that this does not happen?
There should not be any duplicate tasks since this creates a fresh redis instance.
(to be sure, it's easy to clean all the docker images).

Would it be possible to clear all tasks prior to start creating tasks (programmatically)? Just to make sure?

<!-- gh-comment-id:588274836 --> @gunnsth commented on GitHub (Feb 19, 2020): @hibiken I prepared an environment where this can be reproduced (uses docker, docker-compose): https://github.com/hibiken/asynq/compare/master...gunnsth:issue90-reproduce?expand=1 to reproduce it: ``` $ cd testdata $ docker-compose up ``` It will not always give exactly the same results, I guess there's some stochastic/random factor that determines which taskrunner catches tasks. Example outputs: ``` taskrunner_1 | Tasks processed taskrunner_1 | (map[int]int) (len=5) { taskrunner_1 | (int) 10: (int) 1, taskrunner_1 | (int) 2: (int) 1, taskrunner_1 | (int) 4: (int) 1, taskrunner_1 | (int) 6: (int) 1, taskrunner_1 | (int) 8: (int) 1 taskrunner_1 | } taskrunner3_1 | Tasks processed taskrunner3_1 | (map[int]int) (len=3) { taskrunner3_1 | (int) 1: (int) 1, taskrunner3_1 | (int) 2: (int) 1, taskrunner3_1 | (int) 20: (int) 1 taskrunner3_1 | } taskrunner2_1 | Tasks processed taskrunner2_1 | (map[int]int) (len=7) { taskrunner2_1 | (int) 1: (int) 1, taskrunner2_1 | (int) 3: (int) 1, taskrunner2_1 | (int) 5: (int) 1, taskrunner2_1 | (int) 7: (int) 1, taskrunner2_1 | (int) 9: (int) 1, taskrunner2_1 | (int) 11: (int) 1, taskrunner2_1 | (int) 14: (int) 1 taskrunner2_1 | } ``` We see that both taskrunner1 and 3 have processed task 2. And taskrunner3 and 2 have both processed task1. Is there any way we can ensure that this does not happen? There should not be any duplicate tasks since this creates a fresh redis instance. (to be sure, it's easy to clean all the docker images). Would it be possible to clear all tasks prior to start creating tasks (programmatically)? Just to make sure?
Author
Owner

@hibiken commented on GitHub (Feb 20, 2020):

@gunnsth
Could you try adding this to your taskqueue.go to flush DB to start from clean slate?

import "github.com/go-redis/redis/v7"

func main() {
    // Flush DB first to start from a clean slate.
    rdb := redis.NewClient(&redis.Options{
       Addr: "redis:6379",
       Password: "xxxx",
    })
    if err := rdb.FlushDB().Err(); err != nil {
        log.Fatalln(err)
    }
 
    // ... create asynq.Client and schedule tasks (your existing code)
}

Let me know if you are still seeing duplicate tasks.
Otherwise, we can close this issue 👍

<!-- gh-comment-id:588577311 --> @hibiken commented on GitHub (Feb 20, 2020): @gunnsth Could you try adding this to your `taskqueue.go` to flush DB to start from clean slate? ```go import "github.com/go-redis/redis/v7" func main() { // Flush DB first to start from a clean slate. rdb := redis.NewClient(&redis.Options{ Addr: "redis:6379", Password: "xxxx", }) if err := rdb.FlushDB().Err(); err != nil { log.Fatalln(err) } // ... create asynq.Client and schedule tasks (your existing code) } ``` Let me know if you are still seeing duplicate tasks. Otherwise, we can close this issue 👍
Author
Owner

@hibiken commented on GitHub (Jun 13, 2020):

@gunnsth I finally figured out what's causing this. Fix is in #170

<!-- gh-comment-id:643623064 --> @hibiken commented on GitHub (Jun 13, 2020): @gunnsth I finally figured out what's causing this. Fix is in #170
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1039
No description provided.