Table of Contents
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
Overview
You can run a Scheduler alongside with the Server to process tasks periodically. Scheduler enqueues tasks at regular intervals, that are then executed by available worker servers in the cluster.
You have to ensure only a single scheduler is running for a schedule at a time, otherwise you’d end up with duplicate tasks. Using a centralized approach means the schedule doesn’t have to be synchronized, and the service can operate without using locks.
If you need to dynamically add and remove periodic tasks, use PeriodicTaskManager instead of using Scheduler directly. See this wiki page for more details.
Time Zones
The periodic task schedules uses the UTC time zone by default, but you can change the time zone used using the SchedulerOpts.
// Example of using America/Los_Angeles timezone instead of the default UTC timezone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
Entries
To enqueue a task periodically you have to register an entry with the scheduler.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
task := asynq.NewTask("example_task", nil)
// You can use cron spec string to specify the schedule.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// You can use "@every <duration>" to specify the interval.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// You can also pass options.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
Run the Scheduler
To start the scheduler, call Run on the scheduler.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
// ... Register tasks
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
The call to Run will wait for TERM or INT signal (e.g. Ctrl-C keypress).
Error Handling
You can provide a function to handle an error if the scheduler could not enqueue a task.
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// your error handling logic
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
Inspection via CLI
The CLI has a subcommand cron to inspect scheduler entries.
To see all entries from the currently running scheduler, you can run:
asynq cron ls
This command will output a list of entries each with its IDs, Schedule Spec, Next enqueue time, Previous enqueue time.
You can also see a history of each entry by running:
asynq cron history <entryID>
Basics
Advanced Topics
just a test