[GH-ISSUE #379] [FEATURE REQUEST] Dynamic Register Periodic Tasks #2188

Closed
opened 2026-03-15 19:36:26 +03:00 by kerem · 6 comments
Owner

Originally created by @zhaolion on GitHub (Jan 13, 2022).
Original GitHub issue: https://github.com/hibiken/asynq/issues/379

Originally assigned to: @hibiken on GitHub.

feature request idea

Currently periodic tasks need to be bound to a period at runtime, is it possible to support that client can register a dynamic periodic tasks, similar to Register(task, ... options)

Describe alternatives you've considered
Some of our business scenarios will perform periodic tasks with different cycles based on some user data.

func HTTPHandle() {
   // schedule is different by user data
   schedule := "? ? * * *"
   // each task is periodic
   task := NewPeriodicTask(type +"{user_id}")
   asynq.Client.Register(task, schedule, start_at_date, until_at_date ...)
}

If this feature is supported, it will be very easy to use. 😆
I look forward to hearing from you for more contextual information and ideas. 😆

Originally created by @zhaolion on GitHub (Jan 13, 2022). Original GitHub issue: https://github.com/hibiken/asynq/issues/379 Originally assigned to: @hibiken on GitHub. **feature request idea** Currently periodic tasks need to be bound to a period at runtime, is it possible to support that client can register a dynamic periodic tasks, similar to `Register(task, ... options)` **Describe alternatives you've considered** Some of our business scenarios will perform periodic tasks with different cycles based on some user data. ``` func HTTPHandle() { // schedule is different by user data schedule := "? ? * * *" // each task is periodic task := NewPeriodicTask(type +"{user_id}") asynq.Client.Register(task, schedule, start_at_date, until_at_date ...) } ``` If this feature is supported, it will be very easy to use. 😆 I look forward to hearing from you for more contextual information and ideas. 😆
kerem 2026-03-15 19:36:26 +03:00
Author
Owner

@hibiken commented on GitHub (Jan 15, 2022):

Hi @zhaolion thank you for creating this feature request!

You can actually call (*Scheduler).Register dynamically. However, a problem with the current implementation of Scheduler is that it doesn't persist the registered tasks and their schedule in persistent store, so if that process is restarted, it doesn't know what tasks were registered before. (FYI: I'm planning on changing this in the future so that the call to Register writes that entry to redis so that it can survive restarts).

I'll look into this more. But unfortunately asynq doesn't offer any support for this use case, right now.
I found this distributed cron service (https://github.com/distribworks/dkron), but I never used it before and not sure if it helps with your use case.

<!-- gh-comment-id:1013570284 --> @hibiken commented on GitHub (Jan 15, 2022): Hi @zhaolion thank you for creating this feature request! You can actually call `(*Scheduler).Register` dynamically. However, a problem with the current implementation of Scheduler is that it doesn't persist the registered tasks and their schedule in persistent store, so if that process is restarted, it doesn't know what tasks were registered before. (FYI: I'm planning on changing this in the future so that the call to `Register` writes that entry to redis so that it can survive restarts). I'll look into this more. But unfortunately asynq doesn't offer any support for this use case, right now. I found this distributed cron service (https://github.com/distribworks/dkron), but I never used it before and not sure if it helps with your use case.
Author
Owner

@zhaolion commented on GitHub (Jan 16, 2022):

Thank you for your reply 🙌. I think what you are planning to do is a better realized idea. The ability for periodic tasks to be stored and updated is a very exciting feature.

Also thank you very much for some ideas and references to the repository - dkron, i will try to make some demos to see whether I can solve some of my case.

Finally, considering that you are already planning to do some related features, I would close this issue. Thanks again for your response. 👏

<!-- gh-comment-id:1013832556 --> @zhaolion commented on GitHub (Jan 16, 2022): > Thank you for your reply 🙌. I think what you are planning to do is a better realized idea. The ability for periodic tasks to be stored and updated is a very exciting feature. Also thank you very much for some ideas and references to the repository - dkron, i will try to make some demos to see whether I can solve some of my case. Finally, considering that you are already planning to do some related features, I would close this issue. Thanks again for your response. 👏
Author
Owner

@hibiken commented on GitHub (Jan 17, 2022):

Hi @zhaolion, I was doing a bit more research around this topic and I came across this page on sidekiq's Wiki (Sidekiq is a popular background job processing library in Ruby, and Asynq was heavily inspired by sidekiq).

Suggested solution in the wiki applies to users of Asynq as well.
Basic idea is to store dynamic periodic task configuration in a database; And set up a periodic task to query this database table to check whether we need to enqueue dynamic periodic tasks.

The configuration includes data such as:

  • TaskType (e.g type + user_id in the example above)
  • TaskPayload
  • Cronspec (e.g. "* * * * *")
  • StartDate
  • EndTime (you may want to delete a row, if current time is greater than this value)
  • NextRun ime (This field is important in the code below)

For example, you can store the above data in a table PeriodicTaskConfigs in a relational database. And set up a periodic task to query this table every minute and decide whether to enqueue a task. When you enqueue a task, you would update the NextRunTime value.

// Set up a task to check and enqueue dynamic tasks
scheduler.Register("* * * * *", asynq.NewTask("DynamicTask", nil))

// Handler for the "EnqueueDynamicPeriodicTask"
func HandleDynamicTask(ctx context.Context, task *asynq.Task) error {
    // NOTE: code below is for illustration purpose
    configs , err := db.Query("SELECT * FROM PeriodicTaskConfig AS c WHERE c.NextRunTime <= now())
    if err != nil {
           return fmt.Errorf("Failed to query PeriodicTaskConfig: %v", err)
     }
    // Each config data represents a dynamic periodic task that needs to be enqueued.
    for _, c := range configs {
        // Enqueue the task according to the specification in the config.
        t := asynq.NewTask(c.TaskType, c.Payload)
        if _, err := asynq.Client.Enqueue(t); err != nil {
             // ... handle error
        }
        // Make sure to advance the NextRunTime value according the the Cronspec.
        scheduler, err := cronParser.Parse(c.Cronspec)
        if err := nil {
              // ... handle error
        }
       err := db.Update("UPDATE PeriodicTaskConfigs AS c SET NextRunAt = %v WHERE c.ID = %v", scheduler.Next(), c.ID)
       if err != nil {
            // ... handle error
       }   
   }
}

Let me know if this makes sense. I may spend some time to update Asynq's Wiki page with this info 👍

<!-- gh-comment-id:1014127541 --> @hibiken commented on GitHub (Jan 17, 2022): Hi @zhaolion, I was doing a bit more research around this topic and I came across [this page](https://github.com/mperham/sidekiq/wiki/Ent-Periodic-Jobs#dynamic-jobs) on sidekiq's Wiki (Sidekiq is a popular background job processing library in Ruby, and Asynq was heavily inspired by sidekiq). Suggested solution in the wiki applies to users of Asynq as well. Basic idea is to store dynamic periodic task configuration in a database; And set up a periodic task to query this database table to check whether we need to enqueue dynamic periodic tasks. The configuration includes data such as: - TaskType (e.g `type + user_id` in the example above) - TaskPayload - Cronspec (e.g. "* * * * *") - StartDate - EndTime (you may want to delete a row, if current time is greater than this value) - NextRun ime (This field is important in the code below) For example, you can store the above data in a table `PeriodicTaskConfigs` in a relational database. And set up a periodic task to query this table every minute and decide whether to enqueue a task. When you enqueue a task, you would update the `NextRunTime` value. ```go // Set up a task to check and enqueue dynamic tasks scheduler.Register("* * * * *", asynq.NewTask("DynamicTask", nil)) // Handler for the "EnqueueDynamicPeriodicTask" func HandleDynamicTask(ctx context.Context, task *asynq.Task) error { // NOTE: code below is for illustration purpose configs , err := db.Query("SELECT * FROM PeriodicTaskConfig AS c WHERE c.NextRunTime <= now()) if err != nil { return fmt.Errorf("Failed to query PeriodicTaskConfig: %v", err) } // Each config data represents a dynamic periodic task that needs to be enqueued. for _, c := range configs { // Enqueue the task according to the specification in the config. t := asynq.NewTask(c.TaskType, c.Payload) if _, err := asynq.Client.Enqueue(t); err != nil { // ... handle error } // Make sure to advance the NextRunTime value according the the Cronspec. scheduler, err := cronParser.Parse(c.Cronspec) if err := nil { // ... handle error } err := db.Update("UPDATE PeriodicTaskConfigs AS c SET NextRunAt = %v WHERE c.ID = %v", scheduler.Next(), c.ID) if err != nil { // ... handle error } } } ``` Let me know if this makes sense. I may spend some time to update Asynq's Wiki page with this info 👍
Author
Owner

@zhaolion commented on GitHub (Jan 17, 2022):

Hi @zhaolion, I was doing a bit more research around this topic and I came across this page on sidekiq's Wiki (Sidekiq is a popular background job processing library in Ruby, and Asynq was heavily inspired by sidekiq).

Suggested solution in the wiki applies to users of Asynq as well. Basic idea is to store dynamic periodic task configuration in a database; And set up a periodic task to query this database table to check whether we need to enqueue dynamic periodic tasks.

The configuration includes data such as:

  • TaskType (e.g type + user_id in the example above)
  • TaskPayload
  • Cronspec (e.g. "* * * * *")
  • StartDate
  • EndTime (you may want to delete a row, if current time is greater than this value)
  • NextRun ime (This field is important in the code below)

For example, you can store the above data in a table PeriodicTaskConfigs in a relational database. And set up a periodic task to query this table every minute and decide whether to enqueue a task. When you enqueue a task, you would update the NextRunTime value.

// Set up a task to check and enqueue dynamic tasks
scheduler.Register("* * * * *", asynq.NewTask("DynamicTask", nil))

// Handler for the "EnqueueDynamicPeriodicTask"
func HandleDynamicTask(ctx context.Context, task *asynq.Task) error {
    // NOTE: code below is for illustration purpose
    configs , err := db.Query("SELECT * FROM PeriodicTaskConfig AS c WHERE c.NextRunTime <= now())
    if err != nil {
           return fmt.Errorf("Failed to query PeriodicTaskConfig: %v", err)
     }
    // Each config data represents a dynamic periodic task that needs to be enqueued.
    for _, c := range configs {
        // Enqueue the task according to the specification in the config.
        t := asynq.NewTask(c.TaskType, c.Payload)
        if _, err := asynq.Client.Enqueue(t); err != nil {
             // ... handle error
        }
        // Make sure to advance the NextRunTime value according the the Cronspec.
        scheduler, err := cronParser.Parse(c.Cronspec)
        if err := nil {
              // ... handle error
        }
       err := db.Update("UPDATE PeriodicTaskConfigs AS c SET NextRunAt = %v WHERE c.ID = %v", scheduler.Next(), c.ID)
       if err != nil {
            // ... handle error
       }   
   }
}

Let me know if this makes sense. I may spend some time to update Asynq's Wiki page with this info 👍

Hi @hibiken , Thank you so much for your research ! 👏

After reading your solutions, this is a better practice to solve my problems. I would very much like to see some updates on this topic.

<!-- gh-comment-id:1014143648 --> @zhaolion commented on GitHub (Jan 17, 2022): > Hi @zhaolion, I was doing a bit more research around this topic and I came across [this page](https://github.com/mperham/sidekiq/wiki/Ent-Periodic-Jobs#dynamic-jobs) on sidekiq's Wiki (Sidekiq is a popular background job processing library in Ruby, and Asynq was heavily inspired by sidekiq). > > Suggested solution in the wiki applies to users of Asynq as well. Basic idea is to store dynamic periodic task configuration in a database; And set up a periodic task to query this database table to check whether we need to enqueue dynamic periodic tasks. > > The configuration includes data such as: > > * TaskType (e.g `type + user_id` in the example above) > * TaskPayload > * Cronspec (e.g. "* * * * *") > * StartDate > * EndTime (you may want to delete a row, if current time is greater than this value) > * NextRun ime (This field is important in the code below) > > For example, you can store the above data in a table `PeriodicTaskConfigs` in a relational database. And set up a periodic task to query this table every minute and decide whether to enqueue a task. When you enqueue a task, you would update the `NextRunTime` value. > > ```go > // Set up a task to check and enqueue dynamic tasks > scheduler.Register("* * * * *", asynq.NewTask("DynamicTask", nil)) > > // Handler for the "EnqueueDynamicPeriodicTask" > func HandleDynamicTask(ctx context.Context, task *asynq.Task) error { > // NOTE: code below is for illustration purpose > configs , err := db.Query("SELECT * FROM PeriodicTaskConfig AS c WHERE c.NextRunTime <= now()) > if err != nil { > return fmt.Errorf("Failed to query PeriodicTaskConfig: %v", err) > } > // Each config data represents a dynamic periodic task that needs to be enqueued. > for _, c := range configs { > // Enqueue the task according to the specification in the config. > t := asynq.NewTask(c.TaskType, c.Payload) > if _, err := asynq.Client.Enqueue(t); err != nil { > // ... handle error > } > // Make sure to advance the NextRunTime value according the the Cronspec. > scheduler, err := cronParser.Parse(c.Cronspec) > if err := nil { > // ... handle error > } > err := db.Update("UPDATE PeriodicTaskConfigs AS c SET NextRunAt = %v WHERE c.ID = %v", scheduler.Next(), c.ID) > if err != nil { > // ... handle error > } > } > } > ``` > > Let me know if this makes sense. I may spend some time to update Asynq's Wiki page with this info 👍 Hi @hibiken , Thank you so much for your research ! 👏 After reading your solutions, this is a better practice to solve my problems. I would very much like to see some updates on this topic.
Author
Owner

@hibiken commented on GitHub (Jan 22, 2022):

FYI: Just released a new version v0.21 which includes PeriodicTaskManager, which allows you to dynamically add/remove periodic tasks. See this wiki page for an example.

This is a new feature so any feedback is appreciated!

<!-- gh-comment-id:1019284888 --> @hibiken commented on GitHub (Jan 22, 2022): FYI: Just released a new version v0.21 which includes `PeriodicTaskManager`, which allows you to dynamically add/remove periodic tasks. See [this wiki page](https://github.com/hibiken/asynq/wiki/Dynamic-Periodic-Task) for an example. This is a new feature so any feedback is appreciated!
Author
Owner

@christopherPMello commented on GitHub (Sep 29, 2024):

Hi @hibiken, thanks for your work on the PeriodicTaskManager feature. Does this feature support payloads?
something like:

configs:
  - cronspec: "* * * * *"
    task_type: "task_type_1"
    payload:
      param1: 123
<!-- gh-comment-id:2381635939 --> @christopherPMello commented on GitHub (Sep 29, 2024): Hi @hibiken, thanks for your work on the PeriodicTaskManager feature. Does this feature support payloads? something like: ``` configs: - cronspec: "* * * * *" task_type: "task_type_1" payload: param1: 123 ```
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2188
No description provided.