[GH-ISSUE #730] [BUG] Archived tasks that are trimmed are not deleted #1373

Open
opened 2026-03-07 22:09:02 +03:00 by kerem · 1 comment
Owner

Originally created by @Harrison-Miller on GitHub (Aug 31, 2023).
Original GitHub issue: https://github.com/hibiken/asynq/issues/730

Originally assigned to: @hibiken on GitHub.

Describe the bug
Currently the keys of archived tasks are stored in a sorted set: asynq:{qname}:archived
These keys are trimmed from the sorted set every time a task is archived, however the actual key is never deleted.
This leads to exploding memory usage from redis if you reach maxArchiveTasks (10k).

Keys that are trimmed are left orphaned from that set and can't be deleted using the inspector.DeleteAllArchivedTasks.
You can see that these orphaned archived tasks exists by running on a redis/asynq server not currently receiving tasks:
KEYS asynq:{qname}:t*
and comparing the count to
ZCARD asynq:{qname}:archived

Other issues have already brought up that the lack of configuration support for:
maxArchiveSize
archivedExpirationInDays
I believe that being able to configure these settings is crucial for production environments. However it is a fine default settings, if the intended behavior worked.

However this bug is a very serious issue that can cause redis clusters to crash if they have limited memory.

To Reproduce

  1. Submit 10k Tasks
  2. Archive the tasks
  3. Submit and Archive 1 more task
  4. Observe that the number of tasks in asynq:{qname}:archived is not equal to the total number of task keys in redis

Test Case

func TestArchiveTrim(t *testing.T) {
	r := setup(t)
	defer r.Close()
	now := time.Now()
	r.SetClock(timeutil.NewSimulatedClock(now))

	errMsg := "SMTP server not responding"

	// create 10k archived tasks
	taskCount := maxArchiveSize - 1
	archivedTasks := make([]base.Z, 0)
	for i := 0; i < taskCount; i++ {

		id := uuid.NewString()
		task := base.TaskMessage{
			ID:      id,
			Type:    "send_email",
			Payload: nil,
			Queue:   "default",
		}
		archivedTasks = append(archivedTasks, base.Z{
			Message: h.TaskMessageWithError(task, errMsg, now),
			Score:   now.Add(-1 * time.Hour).Unix(),
		})
	}

	h.FlushDB(t, r.client) // clean up db before each test case
	h.SeedAllArchivedQueues(t, r.client, map[string][]base.Z{
		"default": archivedTasks,
	})

	archivedEntriesBefore := h.GetArchivedEntries(t, r.client, "default")
	if len(archivedEntriesBefore) != taskCount {
		t.Errorf("len of archived entries before = %v, want %v", len(archivedEntriesBefore), maxArchiveSize-1)
		return
	}

	// set up task that will cause archive queue to be trimmed
	id := uuid.NewString()
	target := &base.TaskMessage{
		ID:      id,
		Type:    "send_email",
		Payload: nil,
		Queue:   "default",
	}

	h.SeedAllActiveQueues(t, r.client, map[string][]*base.TaskMessage{
		"default": {target},
	})
	h.SeedAllLease(t, r.client, map[string][]base.Z{
		"default": {{Message: target, Score: now.Add(10 * time.Second).Unix()}},
	})

	err := r.Archive(context.Background(), target, errMsg)
	if err != nil {
		t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err)
		return
	}

	archivedEntriesInSet := h.GetArchivedEntries(t, r.client, "default")
	if len(archivedEntriesInSet) != taskCount {
		t.Errorf("len of archived entries = %v, want %v", len(archivedEntriesInSet), taskCount)
		return
	}

	// check that the target task is where we expect it
	newestTask := archivedEntriesInSet[len(archivedEntriesInSet)-1].Message
	if newestTask.ID != target.ID {
		t.Errorf("newest task in archive set = %v, want %v", newestTask.ID, target.ID)
		return
	}

	// now check if trim actually deleted the keys see if it's equal to taskCount
	vals := r.client.Keys(context.Background(), base.TaskKeyPrefix("default")+"*").Val()
	if len(vals) != taskCount {
		t.Errorf("len of archived keys = %v, want %v", len(vals), taskCount)
		return
	}
}
Originally created by @Harrison-Miller on GitHub (Aug 31, 2023). Original GitHub issue: https://github.com/hibiken/asynq/issues/730 Originally assigned to: @hibiken on GitHub. **Describe the bug** Currently the keys of archived tasks are stored in a sorted set: asynq:{qname}:archived These keys are trimmed from the sorted set every time a task is archived, however the actual key is never deleted. This leads to exploding memory usage from redis if you reach maxArchiveTasks (10k). Keys that are trimmed are left orphaned from that set and can't be deleted using the inspector.DeleteAllArchivedTasks. You can see that these orphaned archived tasks exists by running on a redis/asynq server not currently receiving tasks: KEYS asynq:{qname}:t* and comparing the count to ZCARD asynq:{qname}:archived Other issues have already brought up that the lack of configuration support for: maxArchiveSize archivedExpirationInDays I believe that being able to configure these settings is crucial for production environments. However it is a fine default settings, if the intended behavior worked. However this bug is a very serious issue that can cause redis clusters to crash if they have limited memory. **To Reproduce** 1. Submit 10k Tasks 2. Archive the tasks 3. Submit and Archive 1 more task 4. Observe that the number of tasks in asynq:{qname}:archived is not equal to the total number of task keys in redis **Test Case** ``` func TestArchiveTrim(t *testing.T) { r := setup(t) defer r.Close() now := time.Now() r.SetClock(timeutil.NewSimulatedClock(now)) errMsg := "SMTP server not responding" // create 10k archived tasks taskCount := maxArchiveSize - 1 archivedTasks := make([]base.Z, 0) for i := 0; i < taskCount; i++ { id := uuid.NewString() task := base.TaskMessage{ ID: id, Type: "send_email", Payload: nil, Queue: "default", } archivedTasks = append(archivedTasks, base.Z{ Message: h.TaskMessageWithError(task, errMsg, now), Score: now.Add(-1 * time.Hour).Unix(), }) } h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllArchivedQueues(t, r.client, map[string][]base.Z{ "default": archivedTasks, }) archivedEntriesBefore := h.GetArchivedEntries(t, r.client, "default") if len(archivedEntriesBefore) != taskCount { t.Errorf("len of archived entries before = %v, want %v", len(archivedEntriesBefore), maxArchiveSize-1) return } // set up task that will cause archive queue to be trimmed id := uuid.NewString() target := &base.TaskMessage{ ID: id, Type: "send_email", Payload: nil, Queue: "default", } h.SeedAllActiveQueues(t, r.client, map[string][]*base.TaskMessage{ "default": {target}, }) h.SeedAllLease(t, r.client, map[string][]base.Z{ "default": {{Message: target, Score: now.Add(10 * time.Second).Unix()}}, }) err := r.Archive(context.Background(), target, errMsg) if err != nil { t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err) return } archivedEntriesInSet := h.GetArchivedEntries(t, r.client, "default") if len(archivedEntriesInSet) != taskCount { t.Errorf("len of archived entries = %v, want %v", len(archivedEntriesInSet), taskCount) return } // check that the target task is where we expect it newestTask := archivedEntriesInSet[len(archivedEntriesInSet)-1].Message if newestTask.ID != target.ID { t.Errorf("newest task in archive set = %v, want %v", newestTask.ID, target.ID) return } // now check if trim actually deleted the keys see if it's equal to taskCount vals := r.client.Keys(context.Background(), base.TaskKeyPrefix("default")+"*").Val() if len(vals) != taskCount { t.Errorf("len of archived keys = %v, want %v", len(vals), taskCount) return } } ```
Author
Owner

@Harrison-Miller commented on GitHub (Sep 1, 2023):

I am working on a PR to fix this issue, will submit it for review in a day or two.

<!-- gh-comment-id:1702099499 --> @Harrison-Miller commented on GitHub (Sep 1, 2023): I am working on a PR to fix this issue, will submit it for review in a day or two.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1373
No description provided.