[GH-ISSUE #862] How to mark a task as completed externally #1438

Open
opened 2026-03-07 22:09:33 +03:00 by kerem · 1 comment
Owner

Originally created by @ScribeSavant on GitHub (Apr 14, 2024).
Original GitHub issue: https://github.com/hibiken/asynq/issues/862

Hello, How can I mark a process as completed externally, for example I have an http server and I create a job with it and I want to stop the job with it, I tried the 'Inspector.CancelProcessing()' option but the job falls into the retry section and then starts again.

When I set MaxRetry to 0, the work goes to the archive section on restarts, and it does not start again.

Here is my full code

Controller

// create job
func (s *Server) createEvm() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		payload, err := io.ReadAll(r.Body)

		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}

		now := time.Now()
		fiveHours := now.Add(5 * time.Hour)
		t1 := workers.NewEvmTask(payload, asynq.MaxRetry(0), asynq.Deadline(fiveHours))
		info, err := s.workerServer.Enqueue(t1)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}
		w.Header().Set("content-type", "application/json")
		json.NewEncoder(w).Encode(info)
	}
}

// stop job
func (s *Server) stopTask() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		var payload *CancelTaskPayload

		if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}

		if err := s.workerServer.Inspector.CancelProcessing(payload.JobID); err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}
		w.Header().Set("content-type", "application/json")
		json.NewEncoder(w).Encode(map[string]bool{"stopped": true})
	}
}

Evm Worker

package workers

import (
	"context"
	"fmt"
	"time"

	"github.com/hibiken/asynq"
)

type EvmTaskPayload struct {
	UserID int `json:"userId"`
}

type EvmJob struct {
}

func NewEvmTask(payload []byte, opts ...asynq.Option) *asynq.Task {
	return asynq.NewTask("evm:task", payload, opts...)
}

func (e *EvmJob) Processor(ctx context.Context, t *asynq.Task) error {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("Stopped")
			return nil
		default:
			fmt.Println("Still working")
			time.Sleep(3 * time.Second)
		}
	}
}

Worker server

package workers

import (
	"context"

	"github.com/hibiken/asynq"
)

type WorkerServer struct {
	*asynq.Server
	*asynq.ServeMux
	*asynq.Client
	*asynq.Inspector
}

func CreateWorkerServer() *WorkerServer {
	s := &WorkerServer{
		Server: asynq.NewServer(
			asynq.RedisClientOpt{Addr: "localhost:6379"},
			asynq.Config{Concurrency: 1000},
		),
		ServeMux: asynq.NewServeMux(),
		Client: asynq.NewClient(
			asynq.RedisClientOpt{Addr: "localhost:6379"},
		),
		Inspector: asynq.NewInspector(
			asynq.RedisClientOpt{Addr: "localhost:6379"},
		),
	}
	
	s.routes()
	return s
}

func (w *WorkerServer) routes() {
	w.ServeMux.HandleFunc("evm:task", func(ctx context.Context, t *asynq.Task) error {
		job := &EvmJob{}
		return job.Processor(ctx, t)
	})
}

func (w *WorkerServer) StopJob(jobId string) {
	w.Inspector.CancelProcessing(jobId)
	w.Inspector.DeleteAllArchivedTasks("default")
}

In this example, when the process is stopped, it falls into the retry section, but since the maximum Retry is 0, it is automatically archived, but the problem is that if the server or the entire code is restarted, the jobs fall into the archive in the same way.

Originally created by @ScribeSavant on GitHub (Apr 14, 2024). Original GitHub issue: https://github.com/hibiken/asynq/issues/862 Hello, How can I mark a process as completed externally, for example I have an http server and I create a job with it and I want to stop the job with it, I tried the 'Inspector.CancelProcessing()' option but the job falls into the retry section and then starts again. When I set MaxRetry to 0, the work goes to the archive section on restarts, and it does not start again. Here is my full code ### Controller ```go // create job func (s *Server) createEvm() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { payload, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } now := time.Now() fiveHours := now.Add(5 * time.Hour) t1 := workers.NewEvmTask(payload, asynq.MaxRetry(0), asynq.Deadline(fiveHours)) info, err := s.workerServer.Enqueue(t1) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } w.Header().Set("content-type", "application/json") json.NewEncoder(w).Encode(info) } } // stop job func (s *Server) stopTask() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var payload *CancelTaskPayload if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } if err := s.workerServer.Inspector.CancelProcessing(payload.JobID); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } w.Header().Set("content-type", "application/json") json.NewEncoder(w).Encode(map[string]bool{"stopped": true}) } } ``` ### Evm Worker ```go package workers import ( "context" "fmt" "time" "github.com/hibiken/asynq" ) type EvmTaskPayload struct { UserID int `json:"userId"` } type EvmJob struct { } func NewEvmTask(payload []byte, opts ...asynq.Option) *asynq.Task { return asynq.NewTask("evm:task", payload, opts...) } func (e *EvmJob) Processor(ctx context.Context, t *asynq.Task) error { for { select { case <-ctx.Done(): fmt.Println("Stopped") return nil default: fmt.Println("Still working") time.Sleep(3 * time.Second) } } } ``` ### Worker server ```go package workers import ( "context" "github.com/hibiken/asynq" ) type WorkerServer struct { *asynq.Server *asynq.ServeMux *asynq.Client *asynq.Inspector } func CreateWorkerServer() *WorkerServer { s := &WorkerServer{ Server: asynq.NewServer( asynq.RedisClientOpt{Addr: "localhost:6379"}, asynq.Config{Concurrency: 1000}, ), ServeMux: asynq.NewServeMux(), Client: asynq.NewClient( asynq.RedisClientOpt{Addr: "localhost:6379"}, ), Inspector: asynq.NewInspector( asynq.RedisClientOpt{Addr: "localhost:6379"}, ), } s.routes() return s } func (w *WorkerServer) routes() { w.ServeMux.HandleFunc("evm:task", func(ctx context.Context, t *asynq.Task) error { job := &EvmJob{} return job.Processor(ctx, t) }) } func (w *WorkerServer) StopJob(jobId string) { w.Inspector.CancelProcessing(jobId) w.Inspector.DeleteAllArchivedTasks("default") } ``` In this example, when the process is stopped, it falls into the retry section, but since the maximum Retry is 0, it is automatically archived, but the problem is that if the server or the entire code is restarted, the jobs fall into the archive in the same way.
Author
Owner

@thucnq commented on GitHub (May 13, 2024):

Maybe you can try method DeleteTask

<!-- gh-comment-id:2106863424 --> @thucnq commented on GitHub (May 13, 2024): Maybe you can try method DeleteTask
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#1438
No description provided.