Manual
JobSchedulers.jl can used to glue commands in a pipeline/workflow, and can also be used to schedule small Julia tasks.
If you need to run multiple heavy Julia tasks, it is recommended to start Julia with multi-threads.
using JobSchedulers
Create a Job
A Job
is the wrapper of AbstractCmd
, Function
or Task
:
command_job = Job(
`echo command job done` # AbstractCmd to run
)
function_job = Job() do # the function should have no arguments
println("function job done")
end
task_job = Job(
@task(println("task job done")) # Task to run
)
job_with_args = Job(
@task(begin println("job_with_args done"); "result" end); # Task to run
name = "job with args", # job name.
user = "me", # Job owner.
ncpu = 1, # Number of CPU required.
mem = 1KB, # Number of memory required (unit: TB, GB, MB, KB, B).
schedule_time = Second(3), # Run after 3 seconds; can be ::DateTime or ::Period.
wall_time = Hour(1), # The maximum time to run the job. (Cancel job after reaching wall time.)
priority = 20, # Lower number = higher priority.
dependency = [ # Defer job until some jobs reach some states.
command_job,
DONE => task_job
]
)
# Job:
# id → 7290168730386436
# name → "job with args"
# user → "me"
# ncpu → 1.0
# mem → 1.0 KB
# schedule_time → 11:46:12
# submit_time → na
# start_time → na
# stop_time → na
# wall_time → 1 hour
# cron → Cron(:none)
# until → forever
# state → :queuing
# priority → 20
# dependency → 2 jobs
# task → Task
# stdout → nothing
# stderr → nothing
dependency
argument inJob
controls when to start a job.It is a vector with element
STATE => job
orSTATE => job.id
.STATE is one of
DONE
,FAILED
,CANCELLED
,QUEUING
,RUNNING
,PAST
. The first 5 states are real job states.PAST
is the super set ofDONE
,FAILED
,CANCELLED
, which means the job will not run in the future.
DONE => job
can be simplified tojob
from v0.8.
Submit a Job
Submit a job to queue:
submit!(command_job)
submit!(task_job)
submit!(job_with_args)
Details:
submit!
Create and submit a Job
submit!(Job(...))
can be simplified to submit!(...)
from v0.8.
job = submit!(@task(println("job")), priority = 0)
Macro @submit [args...] expression
is available from v0.10.2. It will automatically add explictly referred Job
dependencies by walking through the symbols in the expression
.
job = @submit ncpu=1 1+1
job_auto_dependency = @submit 1 + result(job)
# equivalent to submit!(() -> 1 + result(job); dependency=job)
@submit
supports any type of Expr
ession, including a code block:
x = 5
job_block = @submit begin
y = x + 1
y^2
end
@assert fetch(job_block) == (5+1)^2
Get a Job's Result
Get the returned result
imediately. If job is not finished, show a warning message and return nothing:
result(job)
# "result"
You can also use fetch
to wait for job to finish and return its result from JobSchedulers v0.10.2.
fetch(job)
Cancel a Job
Interrupt or cancel!
a job:
cancel!(job)
Recurring/repetitive Job
From JobSchedulers v0.8, users can submit recurring jobs using Linux-based Crontab-like methods.
Two new fields (arguments) of Job
is introduced: Job(cron::Cron, until::Union{DateTime,Period})
.
cron::Cron
creates aCron
object. It extends Linux's crontab and allows repeat every XX seconds. You can use your favorate*
,-
,,
syntax just like crontab. Other features please seeCron
.until::Union{DateTime,Period})
: stop job recurring until date and time.
Construction:
Cron(second, minute, hour, day_of_month, month, day_of_week)
Examples:
Cron()
# Cron(every minute at 0 second)
Cron(0,0,0,1,1,0)
Cron(:yearly)
Cron(:annually)
# Cron(at 0:0:0 on day-of-month 1 in Jan)
Cron(0,0,0,1,*,*)
Cron(:monthly)
# Cron(at 0:0:0 on day-of-month 1)
Cron(0,0,0,*,*,1)
Cron(:weekly)
# Cron(at 0:0:0 on Mon)
Cron(0,0,0,*,'*',"*") # * is equivalent to '*', and "*" in Cron.
Cron(:daily)
Cron(:midnight)
# Cron(at 0:0:0)
Cron(0,0,*,*,*,*)
Cron(:hourly)
# Cron(at 0 minute, 0 second)
Cron(0,0,0,0,0,0) # never repeat
Cron(:none) # never repeat
# Cron(:none)
Cron(0,0,0,*,*,"*/2")
# Cron(at 0:0:0 on Tue,Thu,Sat)
Cron(0,0,0,*,*,"1-7/2")
Cron(0,0,0,0,0,"1-7/2")
# Cron(at 0:0:0 on Mon,Wed,Fri,Sun)
Cron(0,0,0,1,"1-12/3",*)
# Cron(at 0:0:0 on day-of-month 1 in Jan,Apr,Jul,Oct)
Cron(30,4,"1,3-5",1,*,*)
# Cron(at 4 minute, 30 second past 1,3,4,5 hours on day-of-month 1)
# repeatly print time every 5 seconds, until current time plus 20 seconds
recurring_job = submit!(cron = Cron("*/5", *, *, *, *, *), until = Second(20)) do
println(now())
end
# 2024-03-27T13:14:00.060
# 2024-03-27T13:14:05.010
# 2024-03-27T13:14:10.023
# 2024-03-27T13:14:15.044
Details:
Cron
Queue
Show all jobs:
queue(:all) # or:
queue(all=true) # or:
all_queue()
# 1-element Vector{Job}:
# ┌─────┬───────┬──────────────────┬─────────────────┬──────┬──────┬─────────
# │ Row │ state │ id │ name │ user │ ncpu │ mem ⋯
# ├─────┼───────┼──────────────────┼─────────────────┼──────┼──────┼─────────
# │ 1 │ :done │ 6407186212753787 │ "job with args" │ "me" │ 1.0 │ 1.0 KB ⋯
# └─────┴───────┴──────────────────┴─────────────────┴──────┴──────┴─────────
# 9 columns omitted
Before v0.10, all jobs will be saved to queue. However, from v0.10, unnamed jobs (job.name == ""
) will not be saved if it successfully ran. If you want to save unnamed jobs, you can set using JobSchedulers.destroy_unnamed_jobs_when_done(false)
.
Show queue (running and queuing jobs only):
queue()
# 0-element Vector{Job}:
# ┌─────┬───────┬────┬──────┬──────┬──────┬─────┬──────────┬────────────┬────
# │ Row │ state │ id │ name │ user │ ncpu │ mem │ priority │ dependency │ s ⋯
# └─────┴───────┴────┴──────┴──────┴──────┴─────┴──────────┴────────────┴────
# 7 columns omitted
Show queue using a job state (QUEUING
, RUNNING
, DONE
, FAILED
, CANCELLED
, or PAST
):
queue(DONE)
# 1-element Vector{Job}:
# ┌─────┬───────┬──────────────────┬─────────────────┬──────┬──────┬─────────
# │ Row │ state │ id │ name │ user │ ncpu │ mem ⋯
# ├─────┼───────┼──────────────────┼─────────────────┼──────┼──────┼─────────
# │ 1 │ :done │ 6407186212753787 │ "job with args" │ "me" │ 1.0 │ 1.0 KB ⋯
# └─────┴───────┴──────────────────┴─────────────────┴──────┴──────┴─────────
# 9 columns omitted
Show queue using a String
or Regex
to match job name or user:
queue("me")
queue("with args")
queue(r"job.*")
# 1-element Vector{Job}:
# ┌─────┬───────┬──────────────────┬─────────────────┬──────┬──────┬─────────
# │ Row │ state │ id │ name │ user │ ncpu │ mem ⋯
# ├─────┼───────┼──────────────────┼─────────────────┼──────┼──────┼─────────
# │ 1 │ :done │ 6407186212753787 │ "job with args" │ "me" │ 1.0 │ 1.0 KB ⋯
# └─────┴───────┴──────────────────┴─────────────────┴──────┴──────┴─────────
# 9 columns omitted
Job query
Get Job
object by providing job ID, or access the index of queue:
job_query(6407186212753787) # or:
queue(6407186212753787)
queue(:all)[1]
# Job:
# id → 6407186212753787
# name → "job with args"
# user → "me"
# ncpu → 1.0
# mem → 1.0 KB
# schedule_time → 13:11:45
# submit_time → 13:12:46
# start_time → 13:12:46
# stop_time → 13:12:46
# wall_time → 1 hour
# cron → Cron(:none)
# until → forever
# state → :done
# priority → 20
# dependency → []
# task → Task
# stdout → nothing
# stderr → nothing
Wait for jobs and progress meter
wait
for a specific job(s):
wait(j::Job)
wait(js::Vector{Job})
Wait for jobs finished using wait_queue
.
wait_queue()
# no output
# If `show_progress = true`, a fancy progress meter will display.
wait_queue(show_progress = true)
# stop waiting when <= 2 jobs are queuing or running.
wait_queue(show_progress = true, exit_num_jobs = 2)
Scheduler control
Scheduler is automatically started after v0.7.11.
scheduler_stop()
# [ Info: Scheduler task stops.
# ┌ Warning: Scheduler reactivation task is not running.
# └ @ JobSchedulers ~/projects/JobSchedulers.jl/src/control.jl:92
scheduler_start()
# ┌ Warning: Scheduler task was interrupted or done. Restart.
# └ @ JobSchedulers ~/projects/JobSchedulers.jl/src/control.jl:61
# ┌ Warning: Scheduler reactivation task was interrupted or done. Restart.
# └ @ JobSchedulers ~/projects/JobSchedulers.jl/src/control.jl:61
scheduler_status()
# ┌ Info: Scheduler is running.
# │ SCHEDULER_MAX_CPU = 23
# │ SCHEDULER_MAX_MEM = "169.7 GB"
# │ JOB_QUEUE.max_done = 10000
# │ JOB_QUEUE.max_cancelled = 10000
# │ SCHEDULER_TASK[] = Task (runnable) @0x00007d4160031dc0
# └ SCHEDULER_REACTIVATION_TASK[] = Task (runnable) @0x00007d4160031f50
# :running
Find optimized ncpu
that a Job can use
Only available from JobSchedulers v0.7.8.
solve_optimized_ncpu(default::Int;
ncpu_range::UnitRange{Int64} = 1:total_cpu,
njob::Int = 1,
total_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU,
side_jobs_cpu::Int = 0)
Find the optimized number of CPU for a job.
default
: default ncpu of the job.ncpu_range
: the possible ncpu range of the job.njob
: number of the same job.total_cpu
: the total CPU that can be used by JobSchedulers.side_jobs_cpu
: some small jobs that might be run when the job is running, so the job won't use up all of the resources and stop small tasks.
Compatibility with Pipelines.jl
Pipelines.jl: A lightweight Julia package for computational pipelines and workflows.
You can also create a Job
by using Program
types from Pipelines.jl:
Job(p::Program; program_kwargs..., run_kwargs..., job_kwargs...)
Job(p::Program, inputs; run_kwargs..., job_kwargs...)
Job(p::Program, inputs, outputs; run_kwargs..., job_kwargs...)
program_kwargs...
is input and output arguments defined inp::Program
.run_kwargs...
is keyword arguments ofrun(::Program; ...)
job_kwargs...
is keyword arguments ofJob(::Union{Base.AbstractCmd,Task}; ...)
Details can be found by typing
julia> using Pipelines, JobSchedulers
julia> ?run
julia> ?Job
Example
using Pipelines, JobSchedulers
p = CmdProgram(
inputs = ["IN1", "IN2"],
outputs = "OUT",
cmd = pipeline(`echo inputs are: IN1 and IN2` & `echo outputs are: OUT`)
)
# CmdProgram:
# name → Command Program
# id_file →
# info_before → auto
# info_after → auto
# cmd_dependencies → <empty>
# arg_inputs → IN1 :: Any (required)
# IN2 :: Any (required)
# validate_inputs → do_nothing
# prerequisites → do_nothing
# cmd → `echo inputs are: IN1 and IN2` & `echo outputs are: OUT`
# infer_outputs → do_nothing
# arg_outputs → OUT :: Any (required)
# validate_outputs → do_nothing
# wrap_up → do_nothing
# arg_forward → <empty>
### native Pipelines.jl method to run the program
run(p, IN1 = `in1`, IN2 = 2, OUT = "out", touch_run_id_file = false)
# touch_run_id_file = false means do not create a file which indicates
# the job is done and avoids re-run.
# inputs are: in1 and in2
# outputs are: out
# (true, Dict("OUT" => "out"))
### run the program by submitting to JobSchedulers.jl
program_job = Job(p, IN1 = `in1`, IN2 = 2, OUT = "out", touch_run_id_file = false)
# Job:
# id → 6407224068474142
# name → "Command Program"
# user → ""
# ncpu → 1.0
# mem → 0 B
# schedule_time → na
# submit_time → na
# start_time → na
# stop_time → na
# wall_time → 1 year
# cron → Cron(:none)
# until → forever
# state → :queuing
# priority → 20
# dependency → []
# task → Task
# stdout → nothing
# stderr → nothing
submit!(program_job)
# inputs are: in1 and 2
# outputs are: out
# get the returned result
result(program_job)
# (true, Dict{String, Any}("OUT" => "out"))
@submit
also works with Program
s:
program_job2 = @submit IN1=`in1` IN2=2 OUT="out" touch_run_id_file=false p
Scheduler settings
Check the current status of scheduler:
scheduler_status()
# ┌ Info: Scheduler is running.
# │ SCHEDULER_MAX_CPU = 32
# │ SCHEDULER_MAX_MEM = "169.6 GB"
# │ JOB_QUEUE.max_done = 10000
# │ JOB_QUEUE.max_cancelled = 10000
# │ SCHEDULER_TASK[] = Task (runnable) @0x00007fe205052e60
# └ SCHEDULER_REACTIVATION_TASK[] = Task (runnable) @0x00007d4160031f50
# :running
Set the maximum CPU that the scheduler can use. If starting Julia with multi-threads, the maximum CPU is nthreads() - 1
.
set_scheduler_max_cpu() # use all available CPUs
# 32
set_scheduler_max_cpu(4) # use 4 CPUs
# 4
set_scheduler_max_cpu(0.5) # use 50% of CPUs
# 16
Set the maximum RAM the scheduler can use:
set_scheduler_max_mem() # use 80% of total memory
# 107792089088
set_scheduler_max_mem(4GB) # use 4GB memory
set_scheduler_max_mem(4096MB)
set_scheduler_max_mem(4194304KB)
set_scheduler_max_mem(4294967296B)
# 4294967296
set_scheduler_max_mem(0.5) # use 50% of total memory
# 101166391296
Set the maximum number of finished jobs:
set_scheduler_max_job(max_done::Int = 10000, max_cancelled::Int = max_done)
set_scheduler_max_job(10000) # If number of finished jobs > 10000,
# the oldest ones will be removed.
# 10000 # It does not affect queuing, running, or failed jobs.
Set the previous setting in one function:
set_scheduler(;
max_cpu = JobSchedulers.SCHEDULER_MAX_CPU,
max_mem = JobSchedulers.SCHEDULER_MAX_MEM,
max_job = JobSchedulers.JOB_QUEUE.max_done,
max_cancelled_job = JobSchedulers.JOB_QUEUE.max_cancelled_job,
update_second = JobSchedulers.SCHEDULER_UPDATE_SECOND
)
# ┌ Info: Scheduler is running.
# │ SCHEDULER_MAX_CPU = 32
# │ SCHEDULER_MAX_MEM = "169.6 GB"
# │ JOB_QUEUE.max_done = 10000
# │ JOB_QUEUE.max_cancelled = 10000
# │ SCHEDULER_TASK[] = Task (runnable) @0x00007fe205052e60
# └ SCHEDULER_REACTIVATION_TASK[] = Task (runnable) @0x00007d4160031f50
# :running
Backup
Set backup file:
set_scheduler_backup("/path/to/backup/file")
JobSchedulers writes to the backup file at exit. If the file exists, scheduler settings and job queue will be recovered from it automatically. Recovered jobs are just for query, not run-able.
Stop backup and delete_old
backup:
set_scheduler_backup(delete_old=true)
Backup immediately:
backup()