[GH-ISSUE #787] [FEATURE REQUEST] Add Metadata/Headers Field for Distributed Tracing in asynq.Task #2412

Open
opened 2026-03-15 20:23:26 +03:00 by kerem · 2 comments
Owner

Originally created by @alabarjasteh on GitHub (Nov 25, 2023).
Original GitHub issue: https://github.com/hibiken/asynq/issues/787

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.
I am attempting to integrate distributed tracing into the task scheduler, but currently, there is no proper way to transfer tracing information between clients and workers. I am looking for something analogous to headers in HTTP or RabbitMQ or metadata in gRPC, which could be used to inject tracing context on the client (producer) side and be extracted on the server (consumer) side. Without such headers, I have to add a "tracingCarrier" to every task struct and unmarshal the Asynq payload into that struct.

Here is an example from the Quickstart modified as explained above:

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/hibiken/asynq"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/propagation"
)

const TypeEmailDelivery = "email:deliver"

type EmailDeliveryPayload struct {
	Metadata

	UserID     int
	TemplateID string
}

var _ propagation.TextMapCarrier = EmailDeliveryPayload{}

func NewEmailDeliveryTask(ctx context.Context, userID int, tmplID string) (*asynq.Task, error) {
	md := Metadata{}
	md = inject(ctx, md)

	payload, err := json.Marshal(EmailDeliveryPayload{Metadata: md, UserID: userID, TemplateID: tmplID})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeEmailDelivery, payload), nil
}


func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
	var p EmailDeliveryPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
	}

	ctx = extract(ctx, p.Metadata)
	_, span := taskTracer.Start(ctx, "task handler")
	defer span.End()

	log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
	// Email delivery code ...

	return nil
}

Describe the solution you'd like
I suggest adding a Metadata or Header field of type map[string]interface{} in the asynq.Task structure. This field would serve as a suitable place to store data related to tracing.

Originally created by @alabarjasteh on GitHub (Nov 25, 2023). Original GitHub issue: https://github.com/hibiken/asynq/issues/787 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** I am attempting to integrate distributed tracing into the task scheduler, but currently, there is no proper way to transfer tracing information between clients and workers. I am looking for something analogous to headers in HTTP or RabbitMQ or metadata in gRPC, which could be used to inject tracing context on the client (producer) side and be extracted on the server (consumer) side. Without such headers, I have to add a "tracingCarrier" to every task struct and unmarshal the Asynq payload into that struct. Here is an example from the Quickstart modified as explained above: ```go import ( "context" "encoding/json" "fmt" "log" "time" "github.com/hibiken/asynq" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) const TypeEmailDelivery = "email:deliver" type EmailDeliveryPayload struct { Metadata UserID int TemplateID string } var _ propagation.TextMapCarrier = EmailDeliveryPayload{} func NewEmailDeliveryTask(ctx context.Context, userID int, tmplID string) (*asynq.Task, error) { md := Metadata{} md = inject(ctx, md) payload, err := json.Marshal(EmailDeliveryPayload{Metadata: md, UserID: userID, TemplateID: tmplID}) if err != nil { return nil, err } return asynq.NewTask(TypeEmailDelivery, payload), nil } func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { var p EmailDeliveryPayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } ctx = extract(ctx, p.Metadata) _, span := taskTracer.Start(ctx, "task handler") defer span.End() log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID) // Email delivery code ... return nil } ``` **Describe the solution you'd like** I suggest adding a `Metadata` or `Header` field of type `map[string]interface{}` in the asynq.Task structure. This field would serve as a suitable place to store data related to tracing.
Author
Owner

@shuqingzai commented on GitHub (Dec 22, 2023):

#547 #774

<!-- gh-comment-id:1867120208 --> @shuqingzai commented on GitHub (Dec 22, 2023): #547 #774
Author
Owner

@al-bglhk commented on GitHub (Feb 27, 2024):

👍 I would like to have a similar feature as well.

<!-- gh-comment-id:1965657757 --> @al-bglhk commented on GitHub (Feb 27, 2024): 👍 I would like to have a similar feature as well.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2412
No description provided.