[GH-ISSUE #411] [BUG] always retry and report handler not found #181

Closed
opened 2026-03-02 05:19:25 +03:00 by kerem · 6 comments
Owner

Originally created by @ericdong2012 on GitHub (Mar 5, 2022).
Original GitHub issue: https://github.com/hibiken/asynq/issues/411

Originally assigned to: @hibiken on GitHub.

When I send multiple messages to the same type, I find that the message will directly enter the retry, and the error handler not found will be reported. What is the possible reason

Snipaste_2022-03-05_23-47-49

Originally created by @ericdong2012 on GitHub (Mar 5, 2022). Original GitHub issue: https://github.com/hibiken/asynq/issues/411 Originally assigned to: @hibiken on GitHub. When I send multiple messages to the same type, I find that the message will directly enter the retry, and the error handler not found will be reported. What is the possible reason ![Snipaste_2022-03-05_23-47-49](https://user-images.githubusercontent.com/36787305/156890682-063650dd-378a-4681-8abf-ab2fab47d271.png)
kerem 2026-03-02 05:19:25 +03:00
  • closed this issue
  • added the
    bug
    label
Author
Owner

@hibiken commented on GitHub (Mar 6, 2022):

@ericdong2012 Thank you for opening an issue.

Looking at the error messages, it seems that you are running the Server to consume task from a queue, but handler for task with type "command:server:192.168.19.128" is not registered.

If you could provide a code snippet for your Server and ServeMux, I can provide more details 👍

<!-- gh-comment-id:1059891066 --> @hibiken commented on GitHub (Mar 6, 2022): @ericdong2012 Thank you for opening an issue. Looking at the error messages, it seems that you are running the Server to consume task from a queue, but handler for task with type `"command:server:192.168.19.128"` is not registered. If you could provide a code snippet for your `Server` and `ServeMux`, I can provide more details 👍
Author
Owner

@ericdong2012 commented on GitHub (Mar 6, 2022):

@ericdong2012 Thank you for opening an issue.

Looking at the error messages, it seems that you are running the Server to consume task from a queue, but handler for task with type "command:server:192.168.19.128" is not registered.

If you could provide a code snippet for your Server and ServeMux, I can provide more details 👍

thanks for you reply, here are some codes:

type AsynqCmdTask struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
}

func NewAsynqCmdTask(ctx context.Context, svcCtx *svc.ServiceContext) *AsynqCmdTask {
	return &AsynqCmdTask{
		ctx:    ctx,
		svcCtx: svcCtx,
	}
}

func (l *AsynqCmdTask) Start() {
	fmt.Println("AsynqCmd start ")

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: l.svcCtx.Config.Redis.Host, Password: l.svcCtx.Config.Redis.Pass},
		asynq.Config{
			Concurrency: 10,
		},
	)

	mux := asynq.NewServeMux()

	localIp, _ := GetLocalIP()

	mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":"+localIp, l.execCmd)

	if err := srv.Run(mux); err != nil {
		logx.Errorf("could not run server: %v", err)
	}
}

// execCmd

func (l *AsynqCmdTask) execCmd(ctx context.Context, t *asynq.Task) error {
	localIp, _ := GetLocalIP()
	var message asynqmq.CommandServerMessage
	if err := json.Unmarshal(t.Payload(), &message); err != nil {
		logx.WithContext(l.ctx).Errorf("commandagent Unmarshal err : %v , val : %s", err, t.Payload())
	}

	result := make(chan string)
	errinfo := make(chan string)
	go func() {
		res, err := ExecShell(message.Command+" "+strings.Join(message.Args, ""), message.User)
		if err != nil {
			errinfo <- err.Error()
		} else {
			result <- res
		}
	}()

	select {
	case finres := <-result:
		return l.Push2Mq(finres, localIp, "success", message)
	case finerr := <-errinfo:
		return l.Push2Mq(finerr, localIp, "fail", message)
	}

}
<!-- gh-comment-id:1059918913 --> @ericdong2012 commented on GitHub (Mar 6, 2022): > @ericdong2012 Thank you for opening an issue. > > Looking at the error messages, it seems that you are running the Server to consume task from a queue, but handler for task with type `"command:server:192.168.19.128"` is not registered. > > If you could provide a code snippet for your `Server` and `ServeMux`, I can provide more details 👍 thanks for you reply, here are some codes: ``` type AsynqCmdTask struct { ctx context.Context svcCtx *svc.ServiceContext } func NewAsynqCmdTask(ctx context.Context, svcCtx *svc.ServiceContext) *AsynqCmdTask { return &AsynqCmdTask{ ctx: ctx, svcCtx: svcCtx, } } func (l *AsynqCmdTask) Start() { fmt.Println("AsynqCmd start ") srv := asynq.NewServer( asynq.RedisClientOpt{Addr: l.svcCtx.Config.Redis.Host, Password: l.svcCtx.Config.Redis.Pass}, asynq.Config{ Concurrency: 10, }, ) mux := asynq.NewServeMux() localIp, _ := GetLocalIP() mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":"+localIp, l.execCmd) if err := srv.Run(mux); err != nil { logx.Errorf("could not run server: %v", err) } } ``` // execCmd ``` func (l *AsynqCmdTask) execCmd(ctx context.Context, t *asynq.Task) error { localIp, _ := GetLocalIP() var message asynqmq.CommandServerMessage if err := json.Unmarshal(t.Payload(), &message); err != nil { logx.WithContext(l.ctx).Errorf("commandagent Unmarshal err : %v , val : %s", err, t.Payload()) } result := make(chan string) errinfo := make(chan string) go func() { res, err := ExecShell(message.Command+" "+strings.Join(message.Args, ""), message.User) if err != nil { errinfo <- err.Error() } else { result <- res } }() select { case finres := <-result: return l.Push2Mq(finres, localIp, "success", message) case finerr := <-errinfo: return l.Push2Mq(finerr, localIp, "fail", message) } } ```
Author
Owner

@ericdong2012 commented on GitHub (Mar 6, 2022):

@ericdong2012 Thank you for opening an issue.

Looking at the error messages, it seems that you are running the Server to consume task from a queue, but handler for task with type "command:server:192.168.19.128" is not registered.

If you could provide a code snippet for your Server and ServeMux, I can provide more details 👍

At present, I only use the default queue to realize different types through different IP addresses. Will there be any problems in this way, and should I design a queue with different IP addresses

<!-- gh-comment-id:1059922840 --> @ericdong2012 commented on GitHub (Mar 6, 2022): > @ericdong2012 Thank you for opening an issue. > > Looking at the error messages, it seems that you are running the Server to consume task from a queue, but handler for task with type `"command:server:192.168.19.128"` is not registered. > > If you could provide a code snippet for your `Server` and `ServeMux`, I can provide more details 👍 At present, I only use the default queue to realize different types through different IP addresses. Will there be any problems in this way, and should I design a queue with different IP addresses
Author
Owner

@hibiken commented on GitHub (Mar 6, 2022):

@ericdong2012 thanks for providing more details!

Does the value of localip used in this line

mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":"+localIp, l.execCmd)

match the value used for the string for task types?


If your tasks have dynamic IP address in their task type, you can call the HandleFunc with just the prefix (since it does the prefix match to route task to a registered handler func).

mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":", l.execCmd)
<!-- gh-comment-id:1059971713 --> @hibiken commented on GitHub (Mar 6, 2022): @ericdong2012 thanks for providing more details! Does the value of `localip` used in this line ```go mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":"+localIp, l.execCmd) ``` match the value used for the string for task types? --- If your tasks have dynamic IP address in their task type, you can call the `HandleFunc` with just the prefix (since it does the prefix match to route task to a registered handler func). ```go mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":", l.execCmd) ```
Author
Owner

@ericdong2012 commented on GitHub (Mar 7, 2022):

@ericdong2012 thanks for providing more details!

Does the value of localip used in this line

mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":"+localIp, l.execCmd)

match the value used for the string for task types?

If your tasks have dynamic IP address in their task type, you can call the HandleFunc with just the prefix (since it does the prefix match to route task to a registered handler func).

mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":", l.execCmd)

Thanks for you reply .
After my test, I found that, as you initially said, I created a new client (consume) in push2mq in the server, but my design scenario is like this. The new server processes the messages of the upper client, parses the messages, processes other logic, and finally creates a new client(consume) and sends the results to the type specified by other servers. What I want to know is, How should this scenario be handled? Look forward to your reply.

<!-- gh-comment-id:1060152822 --> @ericdong2012 commented on GitHub (Mar 7, 2022): > @ericdong2012 thanks for providing more details! > > Does the value of `localip` used in this line > > ```go > mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":"+localIp, l.execCmd) > ``` > > match the value used for the string for task types? > > If your tasks have dynamic IP address in their task type, you can call the `HandleFunc` with just the prefix (since it does the prefix match to route task to a registered handler func). > > ```go > mux.HandleFunc(asynqmq.COMMANDSERVER_TOPIC+":", l.execCmd) > ``` Thanks for you reply . After my test, I found that, as you initially said, I created a new client (consume) in push2mq in the server, but my design scenario is like this. The new server processes the messages of the upper client, parses the messages, processes other logic, and finally creates a new client(consume) and sends the results to the type specified by other servers. What I want to know is, How should this scenario be handled? Look forward to your reply.
Author
Owner

@hibiken commented on GitHub (Mar 8, 2022):

@ericdong2012 are you still seeing the "handler not found" error?

If not, would you mind starting a discussion here and close this issue?
Either way, I think I need a more detailed explanation (e.g. a diagram or high level overview of your use case) to understand the issue :)

<!-- gh-comment-id:1061382477 --> @hibiken commented on GitHub (Mar 8, 2022): @ericdong2012 are you still seeing the "handler not found" error? If not, would you mind starting a discussion [here](https://github.com/hibiken/asynq/discussions/new) and close this issue? Either way, I think I need a more detailed explanation (e.g. a diagram or high level overview of your use case) to understand the issue :)
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#181
No description provided.