mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-26 07:25:56 +03:00
[GH-ISSUE #270] Unable to enqueue new tasks in queue because worker blocking the execution. #1119
Labels
No labels
CLI
bug
designing
documentation
duplicate
enhancement
good first issue
good first issue
help wanted
idea
invalid
investigate
needs-more-info
performance
pr-welcome
pull-request
question
wontfix
work in progress
work in progress
work-around-available
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
starred/asynq#1119
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Originally created by @shubhgangrade on GitHub (May 5, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/270
Hi hibiken,
Thanks for the asynq task/queue library. I have been stuck on below issue for last few days.
So, I have been using this task/queue for feed ingestion to Postgresql database. The feed consists of thousands of programs. I'm enqueuing the programs one by one after unmarshalling the feed to "feed.tasks" queue. And after that I'm calling the worker to dequeue program and ingest the payload to DB.
After worker finished processing all tasks from "feed.tasks" queue, I want to call mergerClient which will fetch newly added rows from DB which were ingested by the worker and enqueue new tasks in "merger.tasks" queue after doing some processing.
The problem that I'm facing is: The worker call is blocking the execution of the problem and mergerClient is never called up.
Workaround:
I'd used goroutine to call worker without any channel. That way I'm able to execute mergerClient but now, both worker and mergerClient processing parallelly and mergerClient fetching incomplete data from the DB cause worker is still processing tasks from "feed.tasks" queue.
In simple words: I just want to ingest all programs to DB and after that I wants to fetch all newly added rows from DB and enqueue this data to another queue after doing some processing.
Some help would be great!
@hibiken commented on GitHub (May 5, 2021):
@shubhgangrade Thank you for opening this issue!
Let me ask you a few clarifying questions so that I can better understand your issue:
Could you clarify what you mean by "program"? You said your feed consists of thousands of "programs", so I'm guessing it's an entity that your application deals with?
It sounds like you have two phases in your pipeline:
First phase takes task from
"feed.tasks"queue and writes the data to Postgres.Second one takes task from
"merger.tasks"queue and reads data from Postgres and do some processing.Do you need all tasks from the first phase to be completed before moving on to the second phase?
Or once data is written to Postgres, second one can start for that data?
@shubhgangrade commented on GitHub (May 5, 2021):
Thanks for the reply @hibiken.
Below are the answers for your queries:
Yes, Feed is a JSON file which is having Array of "Programs" and thousands of "Program" objects are there in "Programs" array.
Yes, I've two phases in the pipeling :
a. Yes, you're right. It takes each "Program" and writes data to Postgres.
b. No, Let me clarify this one :
I just want to fetch newly added rows (which are just inserted by the workerHandler of "feed.tasks") from Postgres based
on time.Now() and created_at attribute. And after doing some processing on this data enqueue them (newly created
merger tasks from this data) to "merger.tasks" queue.
@hibiken commented on GitHub (May 5, 2021):
Ok, thanks for clarifying.
Sounds like you need to a way to signal when you can enqueue the merger task.
Unfortunately, asynq currently doesn't support this type of workflow out of the box, so you need to write that logic yourself. The feature is planned to be implemented in the future #244 .
A solution I can think of:
If you have ID for each feed, you can simply keep track of # of completed task for the feed.
For example, if your FeedA generates 200
"feed.tasks"tasks, you can set that counter in redis before enqueuing the 200 tasks. And within theHandlerfor the"feed.tasks"you can decrement the counter if it successfully written to DB and checks if the counter has reached 0, only then you can enqueue the merger task.This is just one solution I can think of. There's probably other (maybe better) ways to solve this.
@shubhgangrade commented on GitHub (May 8, 2021):
Thanks for the suggestion!
@hibiken when are you planning to release stable version v1? We're planning to use this library in production environment.
@hibiken commented on GitHub (May 8, 2021):
That's great to hear!
I have listed the features/changes I want to implement before releasing v1.0 here: #242
I've been steadily making progress towards the release, but I don't have a definite timeline just yet at this time (Need to balance with full-time job, family time, etc). You can subscribe to the above issue and that'll let you know when there's any updates on it 👍
@hibiken commented on GitHub (May 12, 2021):
Closing this issue in favor of #244