[GH-ISSUE #321] [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client #138

Closed
opened 2026-03-02 05:19:00 +03:00 by kerem · 26 comments
Owner

Originally created by @seanyu4296 on GitHub (Aug 31, 2021).
Original GitHub issue: https://github.com/hibiken/asynq/issues/321

Originally assigned to: @hibiken on GitHub.

Is your feature request related to a problem? Please describe.

  • I want to add tracing to the underlying redis client instance using dd-trace-go.
  • Is it possible to get access to the underlying redis instance so I can wrap the client with a function like the one below?
import (
	"github.com/go-redis/redis/v7"
	ddRedis "gopkg.in/DataDog/dd-trace-go.v1/contrib/go-redis/redis.v7"
)

func TraceClient(rClient redis.UniversalClient) {
	ddRedis.WrapClient(
		rClient,
		ddRedis.WithServiceName(config.ServiceName()+"-redis"),
		ddRedis.WithAnalytics(config.Enabled()),
	)
}

Describe alternatives you've considered

  • I am also looking if the way to go about this is creating a RedisConnOpt interface that returns a wrapped redis instance with datadog tracing. What do you think?

https://pkg.go.dev/gopkg.in/DataDog/dd-trace-go.v1@v1.33.0/contrib/go-redis/redis.v7#WrapClient

Originally created by @seanyu4296 on GitHub (Aug 31, 2021). Original GitHub issue: https://github.com/hibiken/asynq/issues/321 Originally assigned to: @hibiken on GitHub. **Is your feature request related to a problem? Please describe.** - I want to add tracing to the underlying redis client instance using `dd-trace-go`. - Is it possible to get access to the underlying redis instance so I can wrap the client with a function like the one below? ```golang import ( "github.com/go-redis/redis/v7" ddRedis "gopkg.in/DataDog/dd-trace-go.v1/contrib/go-redis/redis.v7" ) func TraceClient(rClient redis.UniversalClient) { ddRedis.WrapClient( rClient, ddRedis.WithServiceName(config.ServiceName()+"-redis"), ddRedis.WithAnalytics(config.Enabled()), ) } ``` **Describe alternatives you've considered** - I am also looking if the way to go about this is creating a `RedisConnOpt` interface that returns a wrapped redis instance with datadog tracing. What do you think? https://pkg.go.dev/gopkg.in/DataDog/dd-trace-go.v1@v1.33.0/contrib/go-redis/redis.v7#WrapClient
kerem 2026-03-02 05:19:00 +03:00
Author
Owner

@hibiken commented on GitHub (Aug 31, 2021):

@seanyu4296 thank you for opening this issue!

As I commented in #320 , I'd like to avoid exposing the redis client library (and its version) from asynq package API.
I think the alternative you mentioned is a good compromise. You can implement a type that satisfies RedisConnOpt interface and provide an object of that type to asynq.NewClient and asynq.NewServer.

Let me know if that works!

<!-- gh-comment-id:909229311 --> @hibiken commented on GitHub (Aug 31, 2021): @seanyu4296 thank you for opening this issue! As I commented in #320 , I'd like to avoid exposing the redis client library (and its version) from `asynq` package API. I think the alternative you mentioned is a good compromise. You can implement a type that satisfies `RedisConnOpt` interface and provide an object of that type to `asynq.NewClient` and `asynq.NewServer`. Let me know if that works!
Author
Owner

@seanyu4296 commented on GitHub (Aug 31, 2021):

Got it, will test it out the next coming days and let u know. Are there possible gotchas that you @hibiken can think atm since I'm not a pro in go?

<!-- gh-comment-id:909307174 --> @seanyu4296 commented on GitHub (Aug 31, 2021): Got it, will test it out the next coming days and let u know. Are there possible gotchas that you @hibiken can think atm since I'm not a pro in go?
Author
Owner

@seanyu4296 commented on GitHub (Sep 2, 2021):

was able to make it work btw @hibiken 😄 see screenshot below

Screen Shot 2021-09-01 at 10 04 53 PM

do you think it will be helpful to add a wiki section on asynq/Tracing? I can help adding one for datadog

also updated this doc to use Stop and Shutdown https://github.com/hibiken/asynq/wiki/Monitoring-and-Alerting

just wondering also, where do u use asynq? Do u use in your current work? How do you use it?

<!-- gh-comment-id:911204264 --> @seanyu4296 commented on GitHub (Sep 2, 2021): was able to make it work btw @hibiken 😄 see screenshot below ![Screen Shot 2021-09-01 at 10 04 53 PM](https://user-images.githubusercontent.com/11349232/131786966-4b065739-75de-476b-aa10-8beda74ba73d.png) do you think it will be helpful to add a wiki section on asynq/Tracing? I can help adding one for datadog also updated this doc to use Stop and Shutdown https://github.com/hibiken/asynq/wiki/Monitoring-and-Alerting just wondering also, where do u use asynq? Do u use in your current work? How do you use it?
Author
Owner

@hibiken commented on GitHub (Sep 2, 2021):

@seanyu4296 This is fantastic!

It'd be helpful if you could add a page in the Wiki on how to set up tracing using DD.
(And thank you for updating the existing page)

I currently work at Google so I don't get to use asynq in my work, unfortunately. I do however use it whenever I work on a side-project that needs task-queue to handle asynchronous work :)

<!-- gh-comment-id:911655961 --> @hibiken commented on GitHub (Sep 2, 2021): @seanyu4296 This is fantastic! It'd be helpful if you could add a page in the Wiki on how to set up tracing using DD. (And thank you for updating the existing page) I currently work at Google so I don't get to use asynq in my work, unfortunately. I do however use it whenever I work on a side-project that needs task-queue to handle asynchronous work :)
Author
Owner

@hibiken commented on GitHub (Sep 2, 2021):

Also, a heads-up: I just merged PR #298 so you would need to update the redis-client library to v8 when the new release comes out!

<!-- gh-comment-id:911657625 --> @hibiken commented on GitHub (Sep 2, 2021): Also, a heads-up: I just merged PR #298 so you would need to update the redis-client library to v8 when the new release comes out!
Author
Owner

@seanyu4296 commented on GitHub (Sep 2, 2021):

Ohh dang! Doesn't Google let you use your own tools or there's already an internal task scheduler system?

Nice I was actually hoping v8 will be merged before I deploy the usage of asynq in my day job work.

<!-- gh-comment-id:911674866 --> @seanyu4296 commented on GitHub (Sep 2, 2021): Ohh dang! Doesn't Google let you use your own tools or there's already an internal task scheduler system? Nice I was actually hoping v8 will be merged before I deploy the usage of asynq in my day job work.
Author
Owner

@seanyu4296 commented on GitHub (Sep 7, 2021):

By the way, since we upgraded to redis v8. Is it possible for us to have a Context version for asynq functions? So that, redis command traces can be passed down and correlated to the parent span? @hibiken

<!-- gh-comment-id:914086761 --> @seanyu4296 commented on GitHub (Sep 7, 2021): By the way, since we upgraded to `redis v8`. Is it possible for us to have a Context version for asynq functions? So that, redis command traces can be passed down and correlated to the parent span? @hibiken
Author
Owner

@hibiken commented on GitHub (Sep 7, 2021):

Which function are you thinking of? Is it just the Client.Enqueue or something else too?

<!-- gh-comment-id:914331312 --> @hibiken commented on GitHub (Sep 7, 2021): Which function are you thinking of? Is it just the `Client.Enqueue` or something else too?
Author
Owner

@seanyu4296 commented on GitHub (Sep 9, 2021):

So far just the Enqueue to also trace the underlying redis calls

<!-- gh-comment-id:915828717 --> @seanyu4296 commented on GitHub (Sep 9, 2021): So far just the `Enqueue` to also trace the underlying redis calls
Author
Owner

@hibiken commented on GitHub (Sep 9, 2021):

Ok sounds good.

My proposal is to add a new method EnqueueContext(ctx, task) instead of changing the API of the existing one. Let me know if that works.

<!-- gh-comment-id:916070245 --> @hibiken commented on GitHub (Sep 9, 2021): Ok sounds good. My proposal is to add a new method `EnqueueContext(ctx, task)` instead of changing the API of the existing one. Let me know if that works.
Author
Owner

@seanyu4296 commented on GitHub (Sep 15, 2021):

can work on in the next coming week @hibiken

<!-- gh-comment-id:919722133 --> @seanyu4296 commented on GitHub (Sep 15, 2021): can work on in the next coming week @hibiken
Author
Owner

@hibiken commented on GitHub (Sep 15, 2021):

@seanyu4296 if you do work on it, please work off of the next branch instead of master :)

<!-- gh-comment-id:920013093 --> @hibiken commented on GitHub (Sep 15, 2021): @seanyu4296 if you do work on it, please work off of the `next` branch instead of `master` :)
Author
Owner

@seanyu4296 commented on GitHub (Sep 23, 2021):

ohhh will this be for v1? i haven't started yet

<!-- gh-comment-id:925519204 --> @seanyu4296 commented on GitHub (Sep 23, 2021): ohhh will this be for v1? i haven't started yet
Author
Owner

@hibiken commented on GitHub (Sep 23, 2021):

No, we have a long way to go before doing a v1 release. I use "next" branch for any next release (most likely v0.19 for the current one)

<!-- gh-comment-id:926001969 --> @hibiken commented on GitHub (Sep 23, 2021): No, we have a long way to go before doing a v1 release. I use "next" branch for any next release (most likely v0.19 for the current one)
Author
Owner

@mrsufgi commented on GitHub (Nov 29, 2021):

@hibiken there's something missing with tracing though.
I'm using OpenTelemetry (OTel)

func (opt CustomClientOpt) MakeRedisClient() interface{} {
	rdb := redis.NewClient(&redis.Options{
		Network:      opt.Network,
		Addr:         opt.Addr,
		Username:     opt.Username,
		Password:     opt.Password,
		DB:           opt.DB,
		DialTimeout:  opt.DialTimeout,
		ReadTimeout:  opt.ReadTimeout,
		WriteTimeout: opt.WriteTimeout,
		PoolSize:     opt.PoolSize,
		TLSConfig:    opt.TLSConfig,
	})

	rdb.AddHook(redisotel.TracingHook{})
	return rdb
}

EnqueueContext works great and I'm able to see traces in my otel-collector, however, I'm not able to propagate this context to my handlers. perhaps we need something lower level when we dequeue from redis?

Maybe I'm missing something? maybe we need redis-server in the loop?

m.HandleFunc("some-type", MyHandler)
...
func MyHandler(ctx context.Context, t *asynq.Task) {
   ...
}
<!-- gh-comment-id:981668642 --> @mrsufgi commented on GitHub (Nov 29, 2021): @hibiken there's something missing with tracing though. I'm using OpenTelemetry (OTel) ```go func (opt CustomClientOpt) MakeRedisClient() interface{} { rdb := redis.NewClient(&redis.Options{ Network: opt.Network, Addr: opt.Addr, Username: opt.Username, Password: opt.Password, DB: opt.DB, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, TLSConfig: opt.TLSConfig, }) rdb.AddHook(redisotel.TracingHook{}) return rdb } ``` `EnqueueContext` works great and I'm able to see traces in my otel-collector, however, I'm not able to propagate this context to my handlers. perhaps we need something lower level when we dequeue from redis? Maybe I'm missing something? maybe we need redis-server in the loop? ```go m.HandleFunc("some-type", MyHandler) ... func MyHandler(ctx context.Context, t *asynq.Task) { ... } ```
Author
Owner

@crossworth commented on GitHub (Nov 29, 2021):

When asynq dequeues a task, it create a new context for it (from context.Background)
github.com/hibiken/asynq@9f2c321e98/processor.go (L193)

github.com/hibiken/asynq@9f2c321e98/internal/context/context.go (L31-L40)

You should create a middleware for the handle that adds OpenTelemetry support for the context.

Similar to this (ignore the t.Context() part, this was for another issue):
https://github.com/crossworth/asynq-task-ctx-examples/blob/master/examples/middleware.go#L50-L57

<!-- gh-comment-id:981759504 --> @crossworth commented on GitHub (Nov 29, 2021): When asynq dequeues a task, it create a new context for it (from `context.Background`) https://github.com/hibiken/asynq/blob/9f2c321e9804901f06b58e82f9e0ce4125d80b74/processor.go#L193 https://github.com/hibiken/asynq/blob/9f2c321e9804901f06b58e82f9e0ce4125d80b74/internal/context/context.go#L31-L40 You should create a middleware for the handle that adds OpenTelemetry support for the context. Similar to this (ignore the `t.Context()` part, this was for another issue): https://github.com/crossworth/asynq-task-ctx-examples/blob/master/examples/middleware.go#L50-L57
Author
Owner

@mrsufgi commented on GitHub (Nov 30, 2021):

@crossworth that was quite helpful! So now I have a tracing middleware that creates a span (entrypoint)

(based on your code ofc)

var tracer = otel.Tracer("asynq/tasks")

func TaskTracingMiddleware(h asynq.Handler) asynq.Handler {
	return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
		ctx, span := tracer.Start(ctx, fmt.Sprintf("middleware-task-%s", t.Type()))
		defer span.End()

		return h.ProcessTask(ctx, t)
	})
}

Now, I have NEW spans generated from the handler.
Whats missing is using the trace context we had while enqueuing and passing it to the middleware.

Although like you said,
When asynq dequeues a task, it create a new context for it
So thats sounds impossible without adding some headers to the task that can be extracted in the handler middleware.

<!-- gh-comment-id:982471714 --> @mrsufgi commented on GitHub (Nov 30, 2021): @crossworth that was quite helpful! So now I have a tracing middleware that creates a span (entrypoint) (based on your code ofc) ```go var tracer = otel.Tracer("asynq/tasks") func TaskTracingMiddleware(h asynq.Handler) asynq.Handler { return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error { ctx, span := tracer.Start(ctx, fmt.Sprintf("middleware-task-%s", t.Type())) defer span.End() return h.ProcessTask(ctx, t) }) } ``` Now, I have NEW spans generated from the handler. Whats missing is using the trace context we had while enqueuing and passing it to the middleware. Although like you said, `When asynq dequeues a task, it create a new context for it ` So thats sounds impossible without adding some headers to the task that can be extracted in the handler middleware.
Author
Owner

@hibiken commented on GitHub (Nov 30, 2021):

@mrsufgi thank you for these comment and thank you @crossworth for jumping in to help!

I'm new to opentelemetry so I may be wrong, but I believe there should be two traces like @crossworth mentioned.
First trace includes the client.Enqueue to enqueue a task to a queue while handling HTTP request, for example.
Second trace is when the task gets delivered to the Handler to be processed.
I believe these two traces should be separate because of the asynchronous nature of task queue pattern.

But I think we should be able to link these two traces, since they are related. Looking at the otel doc, this Link (https://pkg.go.dev/go.opentelemetry.io/otel/trace#Link) seems promising and maybe something we can use?

Please leave a comment in this thread if anyone has suggestions/proposals :)

<!-- gh-comment-id:982698615 --> @hibiken commented on GitHub (Nov 30, 2021): @mrsufgi thank you for these comment and thank you @crossworth for jumping in to help! I'm new to opentelemetry so I may be wrong, but I believe there should be two traces like @crossworth mentioned. First trace includes the `client.Enqueue` to enqueue a task to a queue while handling HTTP request, for example. Second trace is when the task gets delivered to the Handler to be processed. I believe these two traces should be separate because of the asynchronous nature of task queue pattern. But I think we should be able to link these two traces, since they are related. Looking at the otel doc, this `Link` (https://pkg.go.dev/go.opentelemetry.io/otel/trace#Link) seems promising and maybe something we can use? Please leave a comment in this thread if anyone has suggestions/proposals :)
Author
Owner
<!-- gh-comment-id:982712173 --> @crossworth commented on GitHub (Nov 30, 2021): OpenTelemetry has the concept of a Producer and Consumer https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#spankind I think is possible to do context propagation similar to this https://www.splunk.com/en_us/blog/devops/distributed-tracing-for-kafka-clients-with-opentelemetry-and-splunk-apm.html Maybe something like this should work: https://gist.github.com/crossworth/0dc7f1c094192e49c06fee9dc617f328 (not tested). Some reference material: - https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#spankind - https://github.com/open-telemetry/opentelemetry-go/blob/5ea9171d5f86db33c48e737e850d98cae654e5ef/internal/internaltest/text_map_carrier.go#L26 - https://doordash.engineering/2021/06/17/leveraging-opentelemetry-for-custom-context-propagation/
Author
Owner

@seanyu4296 commented on GitHub (Dec 1, 2021):

Just chiming in quickly I actually add the trace ids in the task like a Metadata and extract it during task processing to have one trace. I'll post later on how I did it

<!-- gh-comment-id:983296669 --> @seanyu4296 commented on GitHub (Dec 1, 2021): Just chiming in quickly I actually add the trace ids in the task like a Metadata and extract it during task processing to have one trace. I'll post later on how I did it
Author
Owner
<!-- gh-comment-id:986597118 --> @mrsufgi commented on GitHub (Dec 6, 2021): > OpenTelemetry has the concept of a Producer and Consumer https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#spankind > > I think is possible to do context propagation similar to this https://www.splunk.com/en_us/blog/devops/distributed-tracing-for-kafka-clients-with-opentelemetry-and-splunk-apm.html > > Maybe something like this should work: https://gist.github.com/crossworth/0dc7f1c094192e49c06fee9dc617f328 (not tested). > > Some reference material: > > * https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#spankind > * https://github.com/open-telemetry/opentelemetry-go/blob/5ea9171d5f86db33c48e737e850d98cae654e5ef/internal/internaltest/text_map_carrier.go#L26 > * https://doordash.engineering/2021/06/17/leveraging-opentelemetry-for-custom-context-propagation/ Based on your answers I have my final snippet https://gist.github.com/mrsufgi/99ad8f697c2d2894b9904067d28223e5 (sorry that its not a full workingfiles but Its meddled with our codebase) if anyone is looking for a fuller solution ping me :)
Author
Owner

@jcgarciaram commented on GitHub (Dec 6, 2021):

@seanyu4296 I am working on adding tracing to DataDog as well. Would it be possible for you to share code examples and tips on how you got this to work?

<!-- gh-comment-id:987375901 --> @jcgarciaram commented on GitHub (Dec 6, 2021): @seanyu4296 I am working on adding tracing to DataDog as well. Would it be possible for you to share code examples and tips on how you got [this](https://github.com/hibiken/asynq/issues/321#issuecomment-911204264) to work?
Author
Owner

@hibiken commented on GitHub (Dec 7, 2021):

@seanyu4296 would you mind creating a wiki page describing your solution so that other users can use it as a reference? (https://github.com/hibiken/asynq/wiki/_new)

<!-- gh-comment-id:987541838 --> @hibiken commented on GitHub (Dec 7, 2021): @seanyu4296 would you mind creating a wiki page describing your solution so that other users can use it as a reference? (https://github.com/hibiken/asynq/wiki/_new)
Author
Owner

@goxiaoy commented on GitHub (Jun 21, 2022):

Blocked by #497

package job

import (
	"context"
	"fmt"
	"github.com/hibiken/asynq"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/codes"
	"go.opentelemetry.io/otel/propagation"
	"net/http"
)

type SpanKind string

const (
	KindProducer SpanKind = "PRODUCER"
	KindConsumer SpanKind = "CONSUMER"
)

var (
	fixedAttrs = []attribute.KeyValue{
		attribute.String("job.system", "asynq"),
	}
	tracer     = otel.Tracer("asynq/tasks")
	propagator = propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{})
)

func TracingServer() asynq.MiddlewareFunc {
	kind := KindConsumer
	return func(h asynq.Handler) asynq.Handler {
		return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) (err error) {
			//TODO recover context from header?
			//ctx = propagator.Extract(ctx, propagation.HeaderCarrier(t.Header))
			ctx, span := tracer.Start(ctx, fmt.Sprintf("job-%s", t.Type()))
			id, _ := asynq.GetTaskID(ctx)
			queue, _ := asynq.GetQueueName(ctx)
			maxRetry, _ := asynq.GetMaxRetry(ctx)
			retryCount, _ := asynq.GetRetryCount(ctx)
			attrs := append(
				fixedAttrs,
				attribute.String("span.otel.kind", string(kind)),
				attribute.String("job.job_id", id),
				attribute.String("job.queue", queue),
				attribute.Int("job.max_retry", maxRetry),
				attribute.Int("job.retry_count", retryCount),
			)
			span.SetAttributes(attrs...)
			defer func() {
				if err != nil {
					span.RecordError(err)
					span.SetStatus(codes.Error, err.Error())
				} else {
					span.SetStatus(codes.Ok, "OK")
				}
				span.End()
			}()
			err = h.ProcessTask(ctx, t)
			return
		})
	}
}

func SetTracingOption(ctx context.Context) asynq.Option {
	//recover header
	var header = propagation.HeaderCarrier(http.Header{})
	ctx = propagator.Extract(ctx, header)
	//TODO ability to set header
	panic("unimplemented")
}

func EnqueueWithTracing(ctx context.Context, client *asynq.Client, task *asynq.Task, opts ...asynq.Option) (taskinfo *asynq.TaskInfo, err error) {
	ctx, span := tracer.Start(ctx, fmt.Sprintf("job-%s", task.Type()))
	defer func() {
		attrs := append(
			fixedAttrs,
			attribute.String("span.otel.kind", string(KindProducer)),
		)
		if taskinfo != nil {
			attrs = append(attrs, attribute.String("job.job_id", taskinfo.ID),
				attribute.String("job.queue", taskinfo.Queue),
				attribute.Int("job.max_retry", taskinfo.MaxRetry))
		}
		span.SetAttributes(attrs...)
		if err != nil {
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())
		} else {
			span.SetStatus(codes.Ok, "OK")
		}
		span.End()
	}()

	opts = append(opts, SetTracingOption(ctx))
	taskinfo, err = client.EnqueueContext(ctx, task, opts...)
	return
}

<!-- gh-comment-id:1161441066 --> @goxiaoy commented on GitHub (Jun 21, 2022): Blocked by #497 ```go package job import ( "context" "fmt" "github.com/hibiken/asynq" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "net/http" ) type SpanKind string const ( KindProducer SpanKind = "PRODUCER" KindConsumer SpanKind = "CONSUMER" ) var ( fixedAttrs = []attribute.KeyValue{ attribute.String("job.system", "asynq"), } tracer = otel.Tracer("asynq/tasks") propagator = propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}) ) func TracingServer() asynq.MiddlewareFunc { kind := KindConsumer return func(h asynq.Handler) asynq.Handler { return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) (err error) { //TODO recover context from header? //ctx = propagator.Extract(ctx, propagation.HeaderCarrier(t.Header)) ctx, span := tracer.Start(ctx, fmt.Sprintf("job-%s", t.Type())) id, _ := asynq.GetTaskID(ctx) queue, _ := asynq.GetQueueName(ctx) maxRetry, _ := asynq.GetMaxRetry(ctx) retryCount, _ := asynq.GetRetryCount(ctx) attrs := append( fixedAttrs, attribute.String("span.otel.kind", string(kind)), attribute.String("job.job_id", id), attribute.String("job.queue", queue), attribute.Int("job.max_retry", maxRetry), attribute.Int("job.retry_count", retryCount), ) span.SetAttributes(attrs...) defer func() { if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } else { span.SetStatus(codes.Ok, "OK") } span.End() }() err = h.ProcessTask(ctx, t) return }) } } func SetTracingOption(ctx context.Context) asynq.Option { //recover header var header = propagation.HeaderCarrier(http.Header{}) ctx = propagator.Extract(ctx, header) //TODO ability to set header panic("unimplemented") } func EnqueueWithTracing(ctx context.Context, client *asynq.Client, task *asynq.Task, opts ...asynq.Option) (taskinfo *asynq.TaskInfo, err error) { ctx, span := tracer.Start(ctx, fmt.Sprintf("job-%s", task.Type())) defer func() { attrs := append( fixedAttrs, attribute.String("span.otel.kind", string(KindProducer)), ) if taskinfo != nil { attrs = append(attrs, attribute.String("job.job_id", taskinfo.ID), attribute.String("job.queue", taskinfo.Queue), attribute.Int("job.max_retry", taskinfo.MaxRetry)) } span.SetAttributes(attrs...) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } else { span.SetStatus(codes.Ok, "OK") } span.End() }() opts = append(opts, SetTracingOption(ctx)) taskinfo, err = client.EnqueueContext(ctx, task, opts...) return } ```
Author
Owner

@reidcooper commented on GitHub (Jan 6, 2024):

Thanks for the inspiration @seanyu4296 . This is a pending PR of mine that I need to verify but it all compiles.

This below is code that allows me to wrap my Redis instance with Datadog, then using that wrapped Redis instance to Asynq.

// project/redis.go

package redis

import (
	"fmt"

	goRedis "github.com/redis/go-redis/v9"
	ddRedis "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/go-redis.v9"
)

type ClientConnOpts struct {
	Addr    string
	AppName string
}

func NewClient(co *ClientConnOpts) *Client {
	redisOpts := goRedis.Options{Addr: co.Addr}
	rdb := goRedis.NewClient(&redisOpts)

	ddRedis.WrapClient(
		rdb,
		ddRedis.WithAnalytics(true),
		ddRedis.WithServiceName(formatTracerServiceName(co.AppName)),
	)

	client := Client{
		redis: rdb,
	}

	return &client
}

type Client struct {
	redis *goRedis.Client
}

// Used to adhere to the asynq redis interface
func (client Client) MakeRedisClient() interface{} {
	return client.redis
}

func formatTracerServiceName(appName string) string {
	return fmt.Sprintf("%s-redis", appName)
}
// project/asynqClient

ac := asynq.NewClient(redis)
// project/asynqServer

server := asynq.NewServer(
		pco.Redis,
		pco.configOptions(),
	)

One thing to note is that I recommend having a new redis instance for the Asynq Processor and Server if you are running both within the same application. The reason why is that when Asynq shuts down, it will close the Redis connection for you therefore causing others who are using that Redis connection to encounter unintended consequences.

<!-- gh-comment-id:1879475558 --> @reidcooper commented on GitHub (Jan 6, 2024): Thanks for the inspiration @seanyu4296 . This is a pending PR of mine that I need to verify but it all compiles. This below is code that allows me to wrap my Redis instance with Datadog, then using that wrapped Redis instance to Asynq. ``` // project/redis.go package redis import ( "fmt" goRedis "github.com/redis/go-redis/v9" ddRedis "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/go-redis.v9" ) type ClientConnOpts struct { Addr string AppName string } func NewClient(co *ClientConnOpts) *Client { redisOpts := goRedis.Options{Addr: co.Addr} rdb := goRedis.NewClient(&redisOpts) ddRedis.WrapClient( rdb, ddRedis.WithAnalytics(true), ddRedis.WithServiceName(formatTracerServiceName(co.AppName)), ) client := Client{ redis: rdb, } return &client } type Client struct { redis *goRedis.Client } // Used to adhere to the asynq redis interface func (client Client) MakeRedisClient() interface{} { return client.redis } func formatTracerServiceName(appName string) string { return fmt.Sprintf("%s-redis", appName) } ``` ``` // project/asynqClient ac := asynq.NewClient(redis) ``` ``` // project/asynqServer server := asynq.NewServer( pco.Redis, pco.configOptions(), ) ``` One thing to note is that I recommend having a new redis instance for the Asynq Processor and Server if you are running both within the same application. The reason why is that when Asynq shuts down, it will close the Redis connection for you therefore causing others who are using that Redis connection to encounter unintended consequences.
Author
Owner

@reidcooper commented on GitHub (Jan 6, 2024):

I do agree that there should be two separate contexts due to the asynchronous nature like @hibiken shared above. However, I could see the value in somehow linking the two contexts from enqueueing to processing. I would love to hear how everyone did it.

To add Datadog tracing to Asynq, I found it to be pretty straight forward. I added a pretty fun integration with Datadog, Logging, and StatsD.

You have to leverage the mutex's ability to add middleware so I broke it down into 3 main pieces:

  1. Start a Datadog Trace.
  2. Next, integrate your logging middleware
  3. Finally, add your StatsD middleware

  1. Add a middleware function that sets up Datadog. This set up is pretty generic out of the box from Datadog's README's. The important piece is getting the context from Asynq, injecting it into the Datadog Span, and continue forward. I chose to say that each Task Type was it's own Resource which lets me track individual jobs.
func BuildDatadogMiddleware(next asynq.Handler) asynq.Handler {
	return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
		metadata := commonLogMetadata(ctx, task)

		optTags := datadog.BuildStartSpanOptionTags(metadata)
		optTags = append(optTags, tracer.ResourceName(task.Type()))

		span, ctx := tracer.StartSpanFromContext(
			ctx,
			jobProcessor,
			optTags...,
		)

		err := next.ProcessTask(ctx, task)
		defer span.Finish(tracer.WithError(err))

		if err != nil {
			return fmt.Errorf("datadog middleware: %w", err)
		}

		return nil
	})
}
  1. Add your Logging Middleware, this is nice to see any generic logs about when a job is started and finished with any info about the tasks etc.. What is helpful here is to add your DD Trace/Span IDs to all your logging messages too so you can link those.
func BuildLoggingMiddleware(
	l logger.Loggable,
) func(next asynq.Handler) asynq.Handler {
	return func(next asynq.Handler) asynq.Handler {
		return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
			metadata := commonLogMetadata(ctx, task)

			addltContext := logger.AdditionalContext{
				TraceID: metadata[datadog.DDTraceID],
				SpanID:  metadata[datadog.DDSpanID],
			}

			ctx = logger.WithAdditionalContext(ctx, addltContext)

			l.Info(
				ctx,
				fmt.Sprintf("Starting task %s", task.Type()),
				metadata,
			)

			err := next.ProcessTask(ctx, task)

			if err != nil {
				err = fmt.Errorf(
					"%w. task %s: %w",
					errUnableToFinishTask,
					task.Type(),
					err,
				)

				metadata["task_error"] = err.Error()

				l.Error(
					ctx,
					err.Error(),
					metadata,
				)

				return err
			}

			l.Info(
				ctx,
				fmt.Sprintf("Finished task %s", task.Type()),
				metadata,
			)

			return nil
		})
	}
}
  1. Lastly, if you wanna use StatsD to help send custom metrics like, inProgress, completed, failed, you can do that as well as a middleware.
func BuildJobMetricsMiddleware(
	statsD *datadog.StatsDClient,
	logger logger.Loggable,
) func(next asynq.Handler) asynq.Handler {
	return func(next asynq.Handler) asynq.Handler {
		return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
			metadata := commonLogMetadata(ctx, task)

			if statDErr := statsD.Increment([]string{job, inProgress}, metadata, 1); statDErr != nil {
				logger.Warn(
					ctx,
					fmt.Errorf("%w: %w",
						errUnableToSendStat,
						statDErr,
					).Error(),
					metadata,
				)
			}

			err := next.ProcessTask(ctx, task)

			if statDErr := statsD.Decrement([]string{job, inProgress}, metadata, 1); statDErr != nil {
				logger.Warn(
					ctx,
					fmt.Errorf("%w: %w",
						errUnableToSendStat,
						statDErr,
					).Error(),
					metadata,
				)
			}

			if err != nil {
				if statDErr := statsD.Increment([]string{job, failed}, metadata, 1); statDErr != nil {
					logger.Warn(
						ctx,
						fmt.Errorf("%w: %w",
							errUnableToSendStat,
							statDErr,
						).Error(),
						metadata,
					)
				}

				return fmt.Errorf("jobprocessor job metrics middleware: %w", err)
			}

			if statDErr := statsD.Increment([]string{job, completed}, metadata, 1); statDErr != nil {
				logger.Warn(
					ctx,
					fmt.Errorf("%w: %w",
						errUnableToSendStat,
						statDErr,
					).Error(),
					metadata,
				)
			}

			return nil
		})
	}
}
<!-- gh-comment-id:1879482204 --> @reidcooper commented on GitHub (Jan 6, 2024): I do agree that there should be two separate contexts due to the asynchronous nature like @hibiken shared above. However, I could see the value in somehow linking the two contexts from enqueueing to processing. I would love to hear how everyone did it. To add Datadog tracing to Asynq, I found it to be pretty straight forward. I added a pretty fun integration with Datadog, Logging, and StatsD. You have to leverage the mutex's ability to add middleware so I broke it down into 3 main pieces: 1. Start a Datadog Trace. 2. Next, integrate your logging middleware 3. Finally, add your StatsD middleware --- 1. Add a middleware function that sets up Datadog. This set up is pretty generic out of the box from Datadog's README's. The important piece is getting the context from Asynq, injecting it into the Datadog Span, and continue forward. I chose to say that each Task Type was it's own Resource which lets me track individual jobs. ``` func BuildDatadogMiddleware(next asynq.Handler) asynq.Handler { return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error { metadata := commonLogMetadata(ctx, task) optTags := datadog.BuildStartSpanOptionTags(metadata) optTags = append(optTags, tracer.ResourceName(task.Type())) span, ctx := tracer.StartSpanFromContext( ctx, jobProcessor, optTags..., ) err := next.ProcessTask(ctx, task) defer span.Finish(tracer.WithError(err)) if err != nil { return fmt.Errorf("datadog middleware: %w", err) } return nil }) } ``` 2. Add your Logging Middleware, this is nice to see any generic logs about when a job is started and finished with any info about the tasks etc.. What is helpful here is to add your DD Trace/Span IDs to all your logging messages too so you can link those. ``` func BuildLoggingMiddleware( l logger.Loggable, ) func(next asynq.Handler) asynq.Handler { return func(next asynq.Handler) asynq.Handler { return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error { metadata := commonLogMetadata(ctx, task) addltContext := logger.AdditionalContext{ TraceID: metadata[datadog.DDTraceID], SpanID: metadata[datadog.DDSpanID], } ctx = logger.WithAdditionalContext(ctx, addltContext) l.Info( ctx, fmt.Sprintf("Starting task %s", task.Type()), metadata, ) err := next.ProcessTask(ctx, task) if err != nil { err = fmt.Errorf( "%w. task %s: %w", errUnableToFinishTask, task.Type(), err, ) metadata["task_error"] = err.Error() l.Error( ctx, err.Error(), metadata, ) return err } l.Info( ctx, fmt.Sprintf("Finished task %s", task.Type()), metadata, ) return nil }) } } ``` 3. Lastly, if you wanna use StatsD to help send custom metrics like, `inProgress`, `completed`, `failed`, you can do that as well as a middleware. ``` func BuildJobMetricsMiddleware( statsD *datadog.StatsDClient, logger logger.Loggable, ) func(next asynq.Handler) asynq.Handler { return func(next asynq.Handler) asynq.Handler { return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error { metadata := commonLogMetadata(ctx, task) if statDErr := statsD.Increment([]string{job, inProgress}, metadata, 1); statDErr != nil { logger.Warn( ctx, fmt.Errorf("%w: %w", errUnableToSendStat, statDErr, ).Error(), metadata, ) } err := next.ProcessTask(ctx, task) if statDErr := statsD.Decrement([]string{job, inProgress}, metadata, 1); statDErr != nil { logger.Warn( ctx, fmt.Errorf("%w: %w", errUnableToSendStat, statDErr, ).Error(), metadata, ) } if err != nil { if statDErr := statsD.Increment([]string{job, failed}, metadata, 1); statDErr != nil { logger.Warn( ctx, fmt.Errorf("%w: %w", errUnableToSendStat, statDErr, ).Error(), metadata, ) } return fmt.Errorf("jobprocessor job metrics middleware: %w", err) } if statDErr := statsD.Increment([]string{job, completed}, metadata, 1); statDErr != nil { logger.Warn( ctx, fmt.Errorf("%w: %w", errUnableToSendStat, statDErr, ).Error(), metadata, ) } return nil }) } } ```
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#138
No description provided.