[GH-ISSUE #1007] [BUG] Description of the bug #2508

Open
opened 2026-03-15 20:44:24 +03:00 by kerem · 0 comments
Owner

Originally created by @txbxxx on GitHub (Jan 15, 2025).
Original GitHub issue: https://github.com/hibiken/asynq/issues/1007

Originally assigned to: @hibiken, @kamikazechaser on GitHub.

Why do I register these scheduled tasks in asynq, and can see them using asynq cron ls, but they are not displayed in the task?.

package main

import (
	"flag"
	"time"

	"github.com/hibiken/asynq"
	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/core/logx"

	"violet/apps/gtask/internal/config"
	// "violet/apps/gtask/internal/task/test_gtask"
	"violet/apps/gtask/internal/task/transaction_task"
	"violet/apps/gtask/internal/types"
)

var configFile = flag.String("f", "apps/stask/etc/stask.yaml", "the config file")

func init() {
	time.Local = time.UTC
}

func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c, conf.UseEnv())
	logx.MustSetup(logx.LogConf{
		ServiceName: c.ServerName,
		Path:        c.LogPath,
		Mode:        c.LogMode,
		Encoding:    c.LogEncoding,
	})
	defer logx.Close()

	scheduler := asynq.NewScheduler(c.Redis.ToAsynqConffig(), &asynq.SchedulerOpts{})

	symbolTasks := transaction_task.NewTransactionTaskFactory()

	_, err := scheduler.Register("@every 60m", symbolTasks.CreatePullSymbolAllTask("test"), asynq.TaskID("symbol"), asynq.Queue(types.TransactionQueue))
	if err != nil {
		logx.Errorf("注册拉取品种任务失败,err:%v", err)
		panic(err)
	}

	// 拉取取交易组
	if _, err = scheduler.Register("@every 30s", symbolTasks.CreatePullRealGroupAllTask("test", true), asynq.TaskID("real_group"), asynq.Queue(types.TransactionQueue)); err != nil {
		logx.Errorf("注册任务失败,err:%v", err)
		panic(err)
	}


	if _, err = scheduler.Register("@every 60m", symbolTasks.CreatePullNoRealGroupAllTask("test", false), asynq.TaskID("no_real_group"), asynq.Queue(types.TransactionQueue)); err != nil {
		logx.Errorf("注册任务失败,err:%v", err)
		panic(err)
	}

	// 拉取品种组
	if _, err := scheduler.Register("@every 60m", symbolTasks.CreatePullSymbolGroupAllTask("test"), asynq.TaskID("real_group"), asynq.Queue(types.TransactionQueue)); err != nil {
		logx.Errorf("注册拉取品种组任务失败,err:%v", err)
		panic(err)
	}

	if err := scheduler.Run(); err != nil {
		logx.Error(err)
		panic(err)
	}
}

EntryID                               Spec        Type                                   Payload                             Options                 Next       Prev
-------                               ----        ----                                   -------                             -------                 ----       ----
55c65ea0-5cac-4962-8af3-ad3482393ca3  @every 1m   sandy:gtask:transaction_real_group     {"platform":"test","is_real":true}   [Queue("transaction")]  In 39s     21s ago
8bed2e24-5a5a-41fe-9dee-0815eb50d2bc  @every 60m  sandy:gtask:transaction_symbol_group   {"platform":"test","is_real":false}  [Queue("transaction")]  In 53m39s  N/A
3f28af36-767d-4275-a0bd-7b7641a30da5  @every 60m  sandy:gtask:transaction_no_real_group  {"platform":"test","is_real":false}  [Queue("transaction")]  In 53m39s  N/A
e036fb0e-25f3-4f58-a3d4-b13db098b7b6  @every 60m  sandy:gtask:transaction_symbol         {"platform":"test","is_real":false}  [Queue("transaction")]  In 53m39s  N/A

worker code:

package main

import (
	"flag"
	"time"

	"github.com/hibiken/asynq"
	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/core/logx"

	"violet/apps/gtask/internal/config"
	"violet/apps/gtask/internal/handler"
	"violet/apps/gtask/internal/svc"
)

var configFile = flag.String("f", "apps/gtask/etc/gtask.yaml", "the config file")

func init() {
	time.Local = time.UTC
}

func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c, conf.UseEnv())
	logx.MustSetup(logx.LogConf{
		ServiceName: c.ServerName,
		Path:        c.LogPath,
		Mode:        c.LogMode,
		Encoding:    c.LogEncoding,
	})
	defer logx.Close()

	srv := asynq.NewServer(
		c.Redis.ToAsynqConffig(),
		asynq.Config{
			Concurrency: 10, 
			Queues: map[string]int{
				"default":     6,
				"transaction": 8,
			},
		},
	)
	ctx := svc.NewServiceContext(c)
	mux := asynq.NewServeMux()
	handler.RegisterHandlers(mux, ctx)
	if err := srv.Run(mux); err != nil {
		logx.Error(err)
		panic(err)
	}
}

公共参数设置任务类型和队列

package types

// 任务类型
const (
	GTaskTypeTest = "sandy:gtask:test"

	GTasksTransactionPullSymbolAll      = "sandy:gtask:transaction_symbol"        
	GTasksTransactionPullRealGroupAll   = "sandy:gtask:transaction_real_group"  
	GTasksTransactionPullNoRealGroupAll = "sandy:gtask:transaction_no_real_group" 
	GTasksTransactionPullSymbolGroupAll = "sandy:gtask:transaction_symbol_group" 
)

// 队列
const (
	TransactionQueue = "transaction"
)

注册handler文件

func RegisterHandlers(mux *asynq.ServeMux, serverCtx *svc.ServiceContext) {
	mux.HandleFunc(types.GTaskTypeTest, test_gtask.TestGtaskHandler(serverCtx))
	mux.HandleFunc(types.GTasksTransactionPullSymbolAll, transaction.GetTradeSymbolAllHandler(serverCtx))
	mux.HandleFunc(types.GTasksTransactionPullRealGroupAll, transaction.GetTradeGroupAllHandler(serverCtx))
	mux.HandleFunc(types.GTasksTransactionPullNoRealGroupAll, transaction.GetTradeGroupAllHandler(serverCtx))
	mux.HandleFunc(types.GTasksTransactionPullSymbolGroupAll, transaction.GetTradeSymbolGroupAllHandler(serverCtx))
}

This is why my cron is started, the consumer side is also started, but the consumer side cannot get the cron queued tasks, or cron has no queued tasks at all

Using the following command, no queued tasks are found

via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack)
➜ asynq queue inspect transaction
Queue Info
Name:   transaction
Size:   0
Groups: 0
Paused: false

Task Count by State
active  pending  aggregating  scheduled  retry  archived  completed
------  -------  -----------  ---------  -----  --------  ---------
0       0        0            0          0      0         0

Daily Stats 2025-01-14 UTC
processed  failed  error rate
---------  ------  ----------
198        22      11.11%

os: macos
asynq version 0.25.0

Originally created by @txbxxx on GitHub (Jan 15, 2025). Original GitHub issue: https://github.com/hibiken/asynq/issues/1007 Originally assigned to: @hibiken, @kamikazechaser on GitHub. Why do I register these scheduled tasks in asynq, and can see them using asynq cron ls, but they are not displayed in the task?. ```go package main import ( "flag" "time" "github.com/hibiken/asynq" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx" "violet/apps/gtask/internal/config" // "violet/apps/gtask/internal/task/test_gtask" "violet/apps/gtask/internal/task/transaction_task" "violet/apps/gtask/internal/types" ) var configFile = flag.String("f", "apps/stask/etc/stask.yaml", "the config file") func init() { time.Local = time.UTC } func main() { flag.Parse() var c config.Config conf.MustLoad(*configFile, &c, conf.UseEnv()) logx.MustSetup(logx.LogConf{ ServiceName: c.ServerName, Path: c.LogPath, Mode: c.LogMode, Encoding: c.LogEncoding, }) defer logx.Close() scheduler := asynq.NewScheduler(c.Redis.ToAsynqConffig(), &asynq.SchedulerOpts{}) symbolTasks := transaction_task.NewTransactionTaskFactory() _, err := scheduler.Register("@every 60m", symbolTasks.CreatePullSymbolAllTask("test"), asynq.TaskID("symbol"), asynq.Queue(types.TransactionQueue)) if err != nil { logx.Errorf("注册拉取品种任务失败,err:%v", err) panic(err) } // 拉取取交易组 if _, err = scheduler.Register("@every 30s", symbolTasks.CreatePullRealGroupAllTask("test", true), asynq.TaskID("real_group"), asynq.Queue(types.TransactionQueue)); err != nil { logx.Errorf("注册任务失败,err:%v", err) panic(err) } if _, err = scheduler.Register("@every 60m", symbolTasks.CreatePullNoRealGroupAllTask("test", false), asynq.TaskID("no_real_group"), asynq.Queue(types.TransactionQueue)); err != nil { logx.Errorf("注册任务失败,err:%v", err) panic(err) } // 拉取品种组 if _, err := scheduler.Register("@every 60m", symbolTasks.CreatePullSymbolGroupAllTask("test"), asynq.TaskID("real_group"), asynq.Queue(types.TransactionQueue)); err != nil { logx.Errorf("注册拉取品种组任务失败,err:%v", err) panic(err) } if err := scheduler.Run(); err != nil { logx.Error(err) panic(err) } } ``` ```shell EntryID Spec Type Payload Options Next Prev ------- ---- ---- ------- ------- ---- ---- 55c65ea0-5cac-4962-8af3-ad3482393ca3 @every 1m sandy:gtask:transaction_real_group {"platform":"test","is_real":true} [Queue("transaction")] In 39s 21s ago 8bed2e24-5a5a-41fe-9dee-0815eb50d2bc @every 60m sandy:gtask:transaction_symbol_group {"platform":"test","is_real":false} [Queue("transaction")] In 53m39s N/A 3f28af36-767d-4275-a0bd-7b7641a30da5 @every 60m sandy:gtask:transaction_no_real_group {"platform":"test","is_real":false} [Queue("transaction")] In 53m39s N/A e036fb0e-25f3-4f58-a3d4-b13db098b7b6 @every 60m sandy:gtask:transaction_symbol {"platform":"test","is_real":false} [Queue("transaction")] In 53m39s N/A ``` worker code: ```go package main import ( "flag" "time" "github.com/hibiken/asynq" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx" "violet/apps/gtask/internal/config" "violet/apps/gtask/internal/handler" "violet/apps/gtask/internal/svc" ) var configFile = flag.String("f", "apps/gtask/etc/gtask.yaml", "the config file") func init() { time.Local = time.UTC } func main() { flag.Parse() var c config.Config conf.MustLoad(*configFile, &c, conf.UseEnv()) logx.MustSetup(logx.LogConf{ ServiceName: c.ServerName, Path: c.LogPath, Mode: c.LogMode, Encoding: c.LogEncoding, }) defer logx.Close() srv := asynq.NewServer( c.Redis.ToAsynqConffig(), asynq.Config{ Concurrency: 10, Queues: map[string]int{ "default": 6, "transaction": 8, }, }, ) ctx := svc.NewServiceContext(c) mux := asynq.NewServeMux() handler.RegisterHandlers(mux, ctx) if err := srv.Run(mux); err != nil { logx.Error(err) panic(err) } } ``` 公共参数设置任务类型和队列 ```go package types // 任务类型 const ( GTaskTypeTest = "sandy:gtask:test" GTasksTransactionPullSymbolAll = "sandy:gtask:transaction_symbol" GTasksTransactionPullRealGroupAll = "sandy:gtask:transaction_real_group" GTasksTransactionPullNoRealGroupAll = "sandy:gtask:transaction_no_real_group" GTasksTransactionPullSymbolGroupAll = "sandy:gtask:transaction_symbol_group" ) // 队列 const ( TransactionQueue = "transaction" ) ``` 注册handler文件 ```go func RegisterHandlers(mux *asynq.ServeMux, serverCtx *svc.ServiceContext) { mux.HandleFunc(types.GTaskTypeTest, test_gtask.TestGtaskHandler(serverCtx)) mux.HandleFunc(types.GTasksTransactionPullSymbolAll, transaction.GetTradeSymbolAllHandler(serverCtx)) mux.HandleFunc(types.GTasksTransactionPullRealGroupAll, transaction.GetTradeGroupAllHandler(serverCtx)) mux.HandleFunc(types.GTasksTransactionPullNoRealGroupAll, transaction.GetTradeGroupAllHandler(serverCtx)) mux.HandleFunc(types.GTasksTransactionPullSymbolGroupAll, transaction.GetTradeSymbolGroupAllHandler(serverCtx)) } ``` This is why my cron is started, the consumer side is also started, but the consumer side cannot get the cron queued tasks, or cron has no queued tasks at all Using the following command, no queued tasks are found ```shell via 🐹 v1.23.3 on 🐳 v27.3.1 (orbstack) ➜ asynq queue inspect transaction Queue Info Name: transaction Size: 0 Groups: 0 Paused: false Task Count by State active pending aggregating scheduled retry archived completed ------ ------- ----------- --------- ----- -------- --------- 0 0 0 0 0 0 0 Daily Stats 2025-01-14 UTC processed failed error rate --------- ------ ---------- 198 22 11.11% ``` os: `macos` asynq version 0.25.0
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2508
No description provided.