[GH-ISSUE #168] [FEATURE REQUEST] CLI/server for running exec.Command #2077

Closed
opened 2026-03-15 19:03:28 +03:00 by kerem · 6 comments
Owner

Originally created by @xkortex on GitHub (Jun 11, 2020).
Original GitHub issue: https://github.com/hibiken/asynq/issues/168

Originally assigned to: @xkortex on GitHub.

Is your feature request related to a problem? Please describe.
Running distributed bash scripts / other executables often involves a ton of overhead, job managers, complex pieces of software like Slurm or Kubernetes.

Describe the solution you'd like
A binary for a client and one for a server implementing Asynq protocol basically wrapping exec calls.

There'd be an asynqexec binary with the verbs serve and submit. Serve would allow setting the concurrency number, as well as configure a whitelist of allowable executable names.

E.g. really simple example of the serve logic I just threw together:

func HandleExecCommand(ctx context.Context, t *asynq.Task) error {
    name, err := t.Payload.GetString("name")
    if err != nil {
        return err
    }
    args, err := t.Payload.GetStringSlice("args")
    if err != nil {
        return err
    }
    log.Printf("Running command: `%s %v`", name, args)
    cmd := exec.Command(name, args...)
    cmd.Start()
    if err := cmd.Wait(); err != nil {
		if exiterr, ok := err.(*exec.ExitError); ok {
			// The program has exited with an exit code != 0
			if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
				log.Printf("Exit Status: %d\n", status.ExitStatus())
			}
		} else {
			log.Printf("cmd.Wait error: %v\n", err)
		}
	}
    return nil
}

asnyqexec submit

I think this could double as both a practical example of using the library, and a useful tool in its own right.

If you think this could be a valuable contribution directly to this project, I'd be willing to PR it when I have a bit of time (a lot of this logic is based on a similar project I have), otherwise if you think it better belongs as a separate library, I could do it as such instead. Basically, would you like this added, or should I just do this in my own repo?

Example use case:

I have 4459 videos I want to transcode with ffmpeg and a loosely-organized cluster at my disposal. I'd love to be able to do something like

# spin up my worker process on worker nodes
seq 0 9 | rush -j 10 -- ssh node{} 'asynqexec serve --jobs 1 --allow ffmpeg &'

# load the queue
find /mnt/unprocessed/ -wholename "/*/vid*/*.mp4" | rush -j 10 -- \
'asyncexec submit ffmpeg  -hide_banner \
  -loglevel error\
  -hwaccel cuvid \
  -c:v h264_cuvid \
  -i "{}" \
  -movflags faststart  \
  -c:v hevc_nvenc \
  -b:v "${BITRATE}" \
  -n \
  "/mnt/processed/$(basename ${1})"

Easy peasy distributed batch jobs!

Originally created by @xkortex on GitHub (Jun 11, 2020). Original GitHub issue: https://github.com/hibiken/asynq/issues/168 Originally assigned to: @xkortex on GitHub. **Is your feature request related to a problem? Please describe.** Running distributed bash scripts / other executables often involves a ton of overhead, job managers, complex pieces of software like Slurm or Kubernetes. **Describe the solution you'd like** A binary for a client and one for a server implementing Asynq protocol basically wrapping `exec` calls. There'd be an `asynqexec` binary with the verbs `serve` and `submit`. Serve would allow setting the concurrency number, as well as configure a whitelist of allowable executable names. E.g. really simple example of the serve logic I just threw together: ```golang func HandleExecCommand(ctx context.Context, t *asynq.Task) error { name, err := t.Payload.GetString("name") if err != nil { return err } args, err := t.Payload.GetStringSlice("args") if err != nil { return err } log.Printf("Running command: `%s %v`", name, args) cmd := exec.Command(name, args...) cmd.Start() if err := cmd.Wait(); err != nil { if exiterr, ok := err.(*exec.ExitError); ok { // The program has exited with an exit code != 0 if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { log.Printf("Exit Status: %d\n", status.ExitStatus()) } } else { log.Printf("cmd.Wait error: %v\n", err) } } return nil } ``` `asnyqexec submit` I think this could double as both a practical example of using the library, and a useful tool in its own right. If you think this could be a valuable contribution directly to this project, I'd be willing to PR it when I have a bit of time (a lot of this logic is based on a similar project I have), otherwise if you think it better belongs as a separate library, I could do it as such instead. Basically, would you like this added, or should I just do this in my own repo? ## Example use case: I have 4459 videos I want to transcode with ffmpeg and a loosely-organized cluster at my disposal. I'd love to be able to do something like ```bash # spin up my worker process on worker nodes seq 0 9 | rush -j 10 -- ssh node{} 'asynqexec serve --jobs 1 --allow ffmpeg &' # load the queue find /mnt/unprocessed/ -wholename "/*/vid*/*.mp4" | rush -j 10 -- \ 'asyncexec submit ffmpeg -hide_banner \ -loglevel error\ -hwaccel cuvid \ -c:v h264_cuvid \ -i "{}" \ -movflags faststart \ -c:v hevc_nvenc \ -b:v "${BITRATE}" \ -n \ "/mnt/processed/$(basename ${1})" ``` Easy peasy distributed batch jobs!
kerem 2026-03-15 19:03:28 +03:00
Author
Owner

@hibiken commented on GitHub (Jun 11, 2020):

@xkortex Thank you for opening this feature request!
This sounds awesome, it'd be a great addition to the project 🎉

I think we can put this binary under /tools (or you could create a separate directory like /contrib or /x if you'd like). It'll make a very cool demo as well.

Would you mind describing the above example in more detail? (I'm not familiar with the rush command so I'm not 100% clear on what each command is doing).

The first line:

seq 0 9 | rush -j 10 -- ssh node{} 'asynqexec serve --jobs 1 --allow ffmpeg &'

I'm guessing it'll ssh into 10 different machines and on each machine run Asynq server with concurrency set to 1 and only allow ffmpeg command to be run, is that right?

The second line (I'm a bit more confused about this one):

find /mnt/unprocessed/ -wholename "/*/vid*/*.mp4" | rush -j 10 -- \
'asyncexec submit ffmpeg  -hide_banner \
  -loglevel error\
  -hwaccel cuvid \
  -c:v h264_cuvid \
  -i "{}" \
  -movflags faststart  \
  -c:v hevc_nvenc \
  -b:v "${BITRATE}" \
  -n \
  "/mnt/processed/$(basename ${1})"

Will this run asynqexec submit on each machine that's running Asynq server?
Isn't it sufficient to just enqueue tasks in Redis so that those 10 servers pulling tasks from redis will execute the whitelisted commands?

<!-- gh-comment-id:642896168 --> @hibiken commented on GitHub (Jun 11, 2020): @xkortex Thank you for opening this feature request! This sounds awesome, it'd be a great addition to the project 🎉 I think we can put this binary under `/tools` (or you could create a separate directory like `/contrib` or `/x` if you'd like). It'll make a very cool demo as well. Would you mind describing the above example in more detail? (I'm not familiar with the `rush` command so I'm not 100% clear on what each command is doing). The first line: ``` seq 0 9 | rush -j 10 -- ssh node{} 'asynqexec serve --jobs 1 --allow ffmpeg &' ``` I'm guessing it'll ssh into 10 different machines and on each machine run Asynq server with concurrency set to 1 and only allow `ffmpeg` command to be run, is that right? The second line (I'm a bit more confused about this one): ``` find /mnt/unprocessed/ -wholename "/*/vid*/*.mp4" | rush -j 10 -- \ 'asyncexec submit ffmpeg -hide_banner \ -loglevel error\ -hwaccel cuvid \ -c:v h264_cuvid \ -i "{}" \ -movflags faststart \ -c:v hevc_nvenc \ -b:v "${BITRATE}" \ -n \ "/mnt/processed/$(basename ${1})" ``` Will this run `asynqexec submit` on each machine that's running Asynq server? Isn't it sufficient to just enqueue tasks in Redis so that those 10 servers pulling tasks from redis will execute the whitelisted commands?
Author
Owner

@xkortex commented on GitHub (Jun 12, 2020):

Sorry, I probably picked a bit too complex of an example :p
rush is a neat little app that just parallelizes jobs.

I'm guessing it'll ssh into 10 different machines and on each machine run Asynq server with concurrency set to 1 and only allow ffmpeg command to be run, is that right?

Precisely :)

The last command just submits a job for each filename found. Basically

for fn in $(find /mydir "*files*.mp4" ); do 
asyncexec submit ffmpeg  -i "${fn}" 
done

will load up the queue and any server which is able to run that type of job will pick one from the queue and start running it.

Isn't it sufficient to just enqueue tasks in Redis so that those 10 servers pulling tasks from redis will execute the whitelisted commands?

Exactly.

Unless I'm missing something and there is an even easier way to submit the jobs to Redis directly.

<!-- gh-comment-id:642993050 --> @xkortex commented on GitHub (Jun 12, 2020): Sorry, I probably picked a bit too complex of an example :p [rush](https://github.com/shenwei356/rush) is a neat little app that just parallelizes jobs. > I'm guessing it'll ssh into 10 different machines and on each machine run Asynq server with concurrency set to 1 and only allow ffmpeg command to be run, is that right? Precisely :) The last command just submits a job for each filename found. Basically ```bash for fn in $(find /mydir "*files*.mp4" ); do asyncexec submit ffmpeg -i "${fn}" done ``` will load up the queue and any server which is able to run that type of job will pick one from the queue and start running it. > Isn't it sufficient to just enqueue tasks in Redis so that those 10 servers pulling tasks from redis will execute the whitelisted commands? Exactly. Unless I'm missing something and there is an even easier way to submit the jobs to Redis directly.
Author
Owner

@xkortex commented on GitHub (Jun 12, 2020):

Skeleton: https://github.com/xkortex/asynq/tree/dev/exeq

terminal 1

exeq serve -P

terminal 2

exeq sub -- pwd

output terminal1:

2020/06/11 22:17:36 Staring exeq server on localhost:6379 with 8 workers
2020/06/11 22:17:36 RUNNING IN PRIVILEGED mode
asynq: pid=88892 2020/06/12 02:17:36.088926 INFO: Starting processing
asynq: pid=88892 2020/06/12 02:17:36.095946 INFO: Send signal TSTP to stop processing new tasks
asynq: pid=88892 2020/06/12 02:17:36.095963 INFO: Send signal TERM or INT to terminate the process
2020/06/11 22:17:41 Running command: `pwd []`
/Users/mike

exeq sub -- ls -lah ~

2020/06/11 22:18:48 Running command: `ls [-lah /Users/mike]`
total 1088
drwx------+  98 mike  staff   3.1K 11 Jun 22:18 .
drwxr-xr-x    6 root  admin   192B 18 Sep  2019 ..
-rw-r--r--@   1 mike  staff    18K 11 Jun 19:15 .DS_Store
drwx------   44 mike  staff   1.4K 28 May 15:08 .Trash
-rw-------    1 mike  staff     0B 20 Apr 11:47 .Xauthority
drwxr-xr-x    5 mike  staff   160B 26 May 15:09 .ansible
...

exeq sub -- ffmpeg -i /path/to/vid_h264.mp4 -c:a copy -c:v copy -t 15 /tmp/out_h264.mp4

Success!

<!-- gh-comment-id:643028263 --> @xkortex commented on GitHub (Jun 12, 2020): Skeleton: https://github.com/xkortex/asynq/tree/dev/exeq terminal 1 ``` exeq serve -P ``` terminal 2 ``` exeq sub -- pwd ``` output terminal1: ``` 2020/06/11 22:17:36 Staring exeq server on localhost:6379 with 8 workers 2020/06/11 22:17:36 RUNNING IN PRIVILEGED mode asynq: pid=88892 2020/06/12 02:17:36.088926 INFO: Starting processing asynq: pid=88892 2020/06/12 02:17:36.095946 INFO: Send signal TSTP to stop processing new tasks asynq: pid=88892 2020/06/12 02:17:36.095963 INFO: Send signal TERM or INT to terminate the process 2020/06/11 22:17:41 Running command: `pwd []` /Users/mike ``` `exeq sub -- ls -lah ~` ``` 2020/06/11 22:18:48 Running command: `ls [-lah /Users/mike]` total 1088 drwx------+ 98 mike staff 3.1K 11 Jun 22:18 . drwxr-xr-x 6 root admin 192B 18 Sep 2019 .. -rw-r--r--@ 1 mike staff 18K 11 Jun 19:15 .DS_Store drwx------ 44 mike staff 1.4K 28 May 15:08 .Trash -rw------- 1 mike staff 0B 20 Apr 11:47 .Xauthority drwxr-xr-x 5 mike staff 160B 26 May 15:09 .ansible ... ``` `exeq sub -- ffmpeg -i /path/to/vid_h264.mp4 -c:a copy -c:v copy -t 15 /tmp/out_h264.mp4` Success!
Author
Owner

@hibiken commented on GitHub (Jun 12, 2020):

Thanks for the explanation. The prototype looks good to me. I also like the command name 😉

Feel free to open a PR whenever you have a merge-able change.
I'm excited to see this command added to the project, and thanks for your contributions!

If you have any questions, you can ping me in this thread or in the Gitter channel 👍

<!-- gh-comment-id:643036588 --> @hibiken commented on GitHub (Jun 12, 2020): Thanks for the explanation. The prototype looks good to me. I also like the command name 😉 Feel free to open a PR whenever you have a merge-able change. I'm excited to see this command added to the project, and thanks for your contributions! If you have any questions, you can ping me in this thread or in the Gitter channel 👍
Author
Owner

@hibiken commented on GitHub (Sep 12, 2020):

@xkortex I'm closing this for now. Let me know if you are still interested. Thanks!

<!-- gh-comment-id:691543034 --> @hibiken commented on GitHub (Sep 12, 2020): @xkortex I'm closing this for now. Let me know if you are still interested. Thanks!
Author
Owner

@xkortex commented on GitHub (Sep 21, 2020):

Yeah, that's fine. I got an implementation up and running but just when I was about to get it polished up for merger, I got slammed with a bunch of things at work. I've been using it to great effect on my system but it's still a bit rickety, so I'll just PR once I'm happy with it :)

<!-- gh-comment-id:696267506 --> @xkortex commented on GitHub (Sep 21, 2020): Yeah, that's fine. I got an implementation up and running but just when I was about to get it polished up for merger, I got slammed with a bunch of things at work. I've been using it to great effect on my system but it's still a bit rickety, so I'll just PR once I'm happy with it :)
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
starred/asynq#2077
No description provided.