API
Const/Variables
const B = 1
const KB = 1024
const MB = 1024KB
const GB = 1024MB
const TB = 1024GB
const QUEUING = :queuing
const RUNNING = :running
const DONE = :done
const FAILED = :failed
const CANCELLED = :cancelled
const PAST = :past # super set of DONE, FAILED, CANCELLED
const cron_none = Cron(:none)
const SCHEDULER_TASK = Base.RefValue{Task}()
const SCHEDULER_REACTIVATION_TASK = Base.RefValue{Task}()
Job
JobSchedulers.Job
— TypeJob(command::Base.AbstractCmd; stdout=nothing, stderr=nothing, append::Bool=false, kwargs...)
Job(f::Function; kwargs...)
Job(task::Task; kwargs...)
Arguments
command::Base.AbstractCmd
: the command to run.f::Function
: the function to run without any arguments, likef()
.task::Task
: the task to run. Eg:@task(1+1)
.
Common Keyword Arguments (kwargs...)
name::String = ""
: job name.user::String = ""
: user that job belongs to.ncpu::Real = 1.0
: number of CPU this job is about to use (can beFloat64
, eg:1.5
will use 150% CPU).mem::Int64 = 0
: number of memory this job is about to use (supports TB, GB, MB, KB, B=1).schedule_time::Union{DateTime,Period} = DateTime(0)
: The expected time to run.dependency
: defer job until specified jobs reach specified state (QUEUING, RUNNING, DONE, FAILED, CANCELLED, PAST). PAST is the super set of DONE, FAILED, CANCELLED, which means the job will not run in the future. Eg:DONE => job
,[DONE => job1; PAST => job2]
.
The default state is DONE, so DONE => job
can be simplified to job
. To be compatible with old versions, you can also use job id (Int): [DONE => job.id]
. JobSchedulers will remove jobs that reached their states in the dependency list.
wall_time::Period = Year(1)
: wall clock time limit. Jobs will be terminated after running for this period.priority::Int = 20
: lower means higher priority.cron::Cron = Cron(:none)
: job recurring at specfic date and time. See more atCron
.until::Union{DateTime,Period} = DateTime(9999,1,1)
: stop job recurringuntil
date and time.
Experimental Keyword Arguments - Output Redirection:
stdout=nothing
: redirect stdout to the file.stderr=nothing
: redirect stderr to the file.append::Bool=false
: append the stdout or stderr or not.
Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different Tasks
simultaneously (multi-threading).
JobSchedulers.submit!
— Functionsubmit!(job::Job)
submit!(args_of_Job...; kwargs_of_Job...)
Submit the job to queue.
submit!(Job(...))
can be simplified tosubmit!(...)
. They are equivalent.
JobSchedulers.@submit
— Macro@submit [option=value]... expr
Submit a job from expr
. If a Job
is explicitly shown in expr
, DONE => job
will be automatically added to the dependency list.
expr
: any type ofExpr
ession is supported.option = value
: kwargs ofJob
. Ifexpr
is parsed to be aPipelines.Program
,option
s also include its inputs, outputs and run kwargs.
Example
j = @submit 1+1
wait(j)
@assert result(j) == 2
# you can use any keyword arguments that `Job` supports, such as `name`, `ncpu`:
j_2sec = @submit name = "run after 2 sec" begin sleep(2); 32 end
# because `j_2sec isa Job`, `DONE => j_2sec` is pushed to `j2.dependency`.
j2 = @submit mem=2KB begin
1 + result(j_2sec)
end
wait(j2)
@assert result(j2) == 1 + 32
# you can also manually add dependencies not in the `expr`:
j3 = @submit dependency = [PAST => j] println("j3 finished. result of j2 = ", result(j2))
# Note: j3.dependency might be empty after submit, because JobScheduler will remove jobs that reached their states in the dependency list.
@submit
cannot know the elements in a container, so it is unable to walk through and add Job dependencies in a container.
jobs = Job[] # the job container
for i in 1:2
push!(jobs, @submit begin sleep(30);i end) # 10 jobs will be added to `jobs`
end
x = 0
j_something_wrong = @submit for j in jobs
# have to use global x
global x += result(j)
end
# ┌ Warning: Getting result from a running job: returned value might be unexpected.
# └ @ JobSchedulers ~/projects/JobSchedulers.jl/src/jobs.jl:318
result(j_something_wrong)
# MethodError(+, (nothing, nothing), 0x0000000000007b16)
To avoid it, we can (1) use submit!
, or (2) explicitly add dependency = jobs
to @submit
.
x = 0
j_ok = submit!(dependency = jobs) do
for j in jobs
# have to use global x
global x += result(j)
end
end
wait(j_ok)
@assert x == 55
x = 100
j_ok_too = @submit dependency = jobs for j in jobs
# have to use global x
global x += result(j)
end
wait(j_ok_too)
@assert x == 155
JobSchedulers.cancel!
— Functioncancel!(job::Job)
Cancel job
, stop queuing or running.
JobSchedulers.result
— Functionresult(job::Job)
Return the result of job
. If the job is not done, a warning message will also show.
Base.fetch
— Methodfetch(x::Job)
Wait for a Job
to finish, then return its result value. If the task fails with an exception, a TaskFailedException
(which wraps the failed task) is thrown.
fetch(x::Job)
is available from JobSchedulers v0.10.2.
JobSchedulers.isqueuing
— Functionisqueuing(j::Job) :: Bool
JobSchedulers.isrunning
— Functionisrunning(j::Job) :: Bool
JobSchedulers.isdone
— Functionisdone(j::Job) :: Bool
JobSchedulers.iscancelled
— Functioniscancelled(j::Job) :: Bool
JobSchedulers.isfailed
— Functionisfailed(j::Job) :: Bool
JobSchedulers.ispast
— Functionispast(j::Job) :: Bool = j.state === DONE || j.state === CANCELLED || j.state === FAILED
Cron: Job Recur/Repeat
JobSchedulers.Cron
— TypeCron(second, minute, hour, day_of_month, month, day_of_week)
Cron(;
second = 0,
minute = '*',
hour = '*',
day_of_month = '*',
month = '*',
day_of_week = '*',
)
Cron
stores the schedule of a repeative Job
, inspired by Linux-based crontab
(5) table.
Jobs are executed by JobScheduler when the second, minute, and hour fields match the current time, and when at least one of the two day fields (day of month & month, or day of week) match the current time.
When an argument is an Int64
:
Field | Allowed values |
---|---|
second | 0-59 |
minute | 0-59 |
hour | 0-23 |
day_of_month | 1-31 |
month | 1-12 |
day_of_week | 1-7 (1 is Monday) |
When an argument is a String
or Char
:
An argument may be an asterisk (*
), which always stands for $first-last$.
Ranges of numbers are allowed. Ranges are two numbers separated with a hyphen. The specified range is inclusive. For example, 8-11 for an $hours$ entry specifies execution at hours 8, 9, 10 and 11.
Lists are allowed. A list is a set of numbers (or ranges) separated by commas. Examples: "1,2,5,9"
, "0-4,8-12"
.
Step values can be used in conjunction with ranges. Following a range with /<number>
specifies skips of the number's value through the range. For example, "0-23/2"
can be used in the hour
argument to specify Job execution every other hour (the alternative is "0,2,4,6,8,10,12,14,16,18,20,22"
). Steps are also permitted after an asterisk, so if you want to say $every two hours$, just use "*/2"
.
When an argument is a Vector
:
Vector
works like lists mentioned above. For example, [1,2,5,9]
is equivalent to "1,2,5,9"
.
When an argument is a UInt64
:
UInt64
is the internal type of Cron
fileds. All the previous types will be converted to a UInt64
bit array. The start index of the bit array is 0. Bits outside of the allowed values (see the table above) are ignored.
Cron(special::Symbol)
Instead of the six arguments of Cron
, one of the following special symbols may appear instead:
special | Meaning |
---|---|
:yearly | Run once a year, Cron(0,0,0,1,1,0) |
:annually | (same as :yearly ) |
:monthly | Run once a month, Cron(0,0,0,1,'*','*') |
:weekly | Run once a week, Cron(0,0,0,'*','*',1) |
:daily | Run once a day, Cron(0,0,0,'*','*','*') |
:midnight | (same as :daily ) |
:hourly | Run once an hour, Cron(0,0,'*','*','*','*') |
:none | Never repeat, Cron(0,0,0,0,0,0) |
Caution: Linux crontab's special :reboot
is not supported here.
To run every minute, just use Cron()
.
JobSchedulers.cron_value_parse
— Functioncron_value_parse(value::UInt64)
cron_value_parse(value::Int64)
cron_value_parse(value::String)
cron_value_parse(value::Char)
cron_value_parse(value::Vector)
cron_value_parse(*) = cron_value_parse('*')
Parse crontab-like value to UInt64
. See details: Cron
.
Dates.tonext
— MethodDates.tonext(dt::DateTime, c::Cron) -> Union{DateTime, Nothing}
Dates.tonext(t::Time, c::Cron; same::Bool = false) -> Time
Dates.tonext(d::Date, c::Cron; same::Bool = false) -> Union{DateTime, Nothing}
Jobs are executed by JobScheduler when the second, minute, hour, and month of year fields match the current time, and when at least one of the two day fields (day of month, or day of week) match the current time.
Queue
JobSchedulers.queue
— Functionqueue(; all::Bool = false) -> Vector{Job}
queue(state::Symbol ) -> Vector{Job}
queue(needle) -> Vector{Job}
queue(state::Symbol , needle) -> Vector{Job}
queue(needle, state::Symbol ) -> Vector{Job}
queue(id::Int) -> Job
all::Bool
: if true, get all jobs. if false, get only running and queuing jobs.state::Symbol
: get jobs with a specific state, including:all
,QUEUING
,RUNNING
,DONE
,FAILED
,CANCELLED
,PAST
.PAST
is the superset ofDONE
,FAILED
,CANCELLED
.needle::Union{AbstractString,AbstractPattern,AbstractChar}
: get jobs if they containneedle
in their name or user.id::Int
: get the job with the specificid
.
JobSchedulers.all_queue
— Functionall_queue()
all_queue(id::Int64)
all_queue(state::Symbol)
all_queue(needle::Union{AbstractString,AbstractPattern,AbstractChar})
state::Symbol
: get jobs with a specific state, including:all
,QUEUING
,RUNNING
,DONE
,FAILED
,CANCELLED
,PAST
.PAST
is the superset ofDONE
,FAILED
,CANCELLED
.needle::Union{AbstractString,AbstractPattern,AbstractChar}
: get jobs if they containneedle
in their name or user.id::Int
: get the job with the specificid
.
JobSchedulers.job_query
— Functionjob_query_by_id(id::Int64)
Search job by job.id
in the queue.
Return job::Job
if found, nothing
if not found.
Wait For Jobs
JobSchedulers.wait_queue
— Functionwait_queue(;show_progress::Bool = false, exit_num_jobs::Int = 0)
Wait for all jobs in queue()
become finished.
show_progress = true
, job progress will show.exit_num_jobs::Int
: exit whenqueue()
has less thanInt
number of jobs. It is useful to ignore some jobs that are always running or recurring.
See also: queue_progress
.
Base.wait
— Methodwait(j::Job)
wait(js::Vector{Job})
Wait for the job(s) to be finished.
Scheduler Settings
JobSchedulers.set_scheduler
— Functionset_scheduler(;
max_cpu::Union{Int,Float64} = JobSchedulers.SCHEDULER_MAX_CPU,
max_mem::Union{Int,Float64} = JobSchedulers.SCHEDULER_MAX_MEM,
max_job::Int = JobSchedulers.JOB_QUEUE.max_done,
max_cancelled_job::Int = JobSchedulers.JOB_QUEUE.max_cancelled_job
)
max_job
: the number of jobs done. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.max_cancelled_job
: the number of cancelled jobs. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.
See details: set_scheduler_max_cpu
, set_scheduler_max_mem
, set_scheduler_max_job
JobSchedulers.set_scheduler_max_cpu
— Functionset_scheduler_max_cpu(ncpu::Int = default_ncpu())
set_scheduler_max_cpu(percent::Float64)
Set the maximum CPU (thread) the scheduler can use. If starting Julia with multi-threads, the maximum CPU is nthreads() - 1
.
Example
set_scheduler_max_cpu() # use all available CPUs
set_scheduler_max_cpu(4) # use 4 CPUs
set_scheduler_max_cpu(0.5) # use 50% of CPUs
JobSchedulers.set_scheduler_max_mem
— Functionset_scheduler_max_mem(mem::Int = default_mem())
set_scheduler_max_mem(percent::Float64)
Set the maximum RAM the scheduler can use.
Example
set_scheduler_max_mem() # use 80% of total memory
set_scheduler_max_mem(4GB) # use 4GB memory
set_scheduler_max_mem(4096MB)
set_scheduler_max_mem(4194304KB)
set_scheduler_max_mem(4294967296B)
set_scheduler_max_mem(0.5) # use 50% of total memory
JobSchedulers.set_scheduler_max_job
— Functionset_scheduler_max_job(max_done::Int = 10000, max_cancelled::Int = max_done)
Set the number of finished jobs. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.
JobSchedulers.destroy_unnamed_jobs_when_done
— FunctionJobSchedulers.destroy_unnamed_jobs_when_done(b::Bool)
Before v0.10, all jobs will be saved to queue. However, from v0.10, unnamed jobs (job.name == ""
) will not be saved if it successfully ran. If you want to save unnamed jobs, you can set using JobSchedulers.destroy_unnamed_jobs_when_done(false)
.
JobSchedulers.set_group_seperator
— Functionset_group_seperator(group_seperator::Regex) = global GROUP_SEPERATOR = group_seperator
Set the group seperator. Group seperator is used to group the names of Jobs. Used when display the progress meter using wait_queue(show_progress=true)
Default is r": *"
.
JobSchedulers.GROUP_SEPERATOR
— ConstantGROUP_SEPERATOR::Regex = r": *"
Group seperator is used to group the names of Jobs. Used when display the progress meter using wait_queue(show_progress=true)
To set it, use set_group_seperator(group_seperator::Regex)
.
Scheduler Control
Scheduler is automatically started, so it is not necessary to start/stop it.
JobSchedulers.scheduler_status
— Functionscheduler_status() :: Symbol
Print the settings and status of job scheduler. Return :not_running
or :running
.
JobSchedulers.scheduler_start
— Functionscheduler_start()
Start the job scheduler.
JobSchedulers.scheduler_stop
— Functionscheduler_stop()
Stop the job scheduler.
Optimize CPU Usage
JobSchedulers.solve_optimized_ncpu
— Functionsolve_optimized_ncpu(default::Int;
ncpu_range::UnitRange{Int64} = 1:total_cpu,
njob::Int = 1,
total_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU,
side_jobs_cpu::Int = 0)
Find the optimized number of CPU for a job.
default
: default ncpu of the job.ncpu_range
: the possible ncpu range of the job.njob
: number of the same job.total_cpu
: the total CPU that can be used by JobSchedulers.side_jobs_cpu
: some small jobs that might be run when the job is running, so the job won't use up all of the resources and stop small tasks.
Backup
JobSchedulers.set_scheduler_backup
— Functionset_scheduler_backup(
filepath::AbstractString = "";
migrate::Bool = false,
delete_old::Bool = false,
recover_settings::Bool = true,
recover_queue::Bool = true
)
Set the backup file of job scheduler.
If filepath
was set to ""
, stop backup at exit.
If filepath
was set to an existing file, recover_settings
or recover_queue
from filepath
immediately.
If filepath
was set to a new file, the backup file will be created at exit.
If migrate=true
and the old JobSchedulers.SCHEDULER_BACKUP_FILE
exists, the old backup file will be recovered before recovering from filepath
.
JobSchedulers.backup
— Functionbackup()
Manually backup job scheduler settings and queues. The function is automatically triggered at exit.
Internal
Internal - Scheduling
JobSchedulers.JobQueue
— TypeJobQueue(; max_done::Int = 10000, max_cancelled::Int = 10000)
mutable struct JobQueue
const queuing::SortedDict{Int,Vector{Job},Base.Order.ForwardOrdering} # priority => Job List
const queuing_0cpu::Vector{Job} # ncpu = 0, can run immediately
const future::Vector{Job} # all jobs with schedule_time > now()
const running::Vector{Job}
const done::Vector{Job}
const failed::Vector{Job}
const cancelled::Vector{Job}
max_done::Int
max_cancelled::Int
const lock_queuing::ReentrantLock
const lock_running::ReentrantLock
const lock_past::ReentrantLock
end
JobSchedulers.scheduler
— Methodscheduler()
The function of running Job's scheduler. It needs to be called by scheduler_start()
, rather than calling directly.
JobSchedulers.istaskfailed2
— Functionistaskfailed2(t::Task)
Extend Base.istaskfailed
to fit Pipelines and JobSchedulers packages, which will return a StackTraceVector
in t.result
, while Base considered it as :done
. The function checks the situation and modifies the real task status and other properties.
JobSchedulers.unsafe_run!
— Functionunsafe_run!(job::Job, current::DateTime=now()) :: Bool
Jump the queue and run job
immediately, no matter what other jobs are running or waiting. If successful initiating to run, return true
, else false
.
Caution: it will not trigger scheduler_need_action()
.
JobSchedulers.unsafe_cancel!
— Functionunsafe_cancel!(job::Job, current::DateTime=now())
Caution: it is unsafe and should only be called within lock. Do not call from other module.
Caution: it will not trigger scheduler_need_action()
.
JobSchedulers.unsafe_update_state!
— Functionunsafe_update_state!(job::Job)
Update the state of job
from job.task
when job.state === :running
.
If a repeative job is PAST, submit a new job.
Caution: it is unsafe and should only be called within lock.
JobSchedulers.is_dependency_ok
— Functionis_dependency_ok(job::Job)::Bool
Caution: run it within lock only.
Algorithm: Break for loop when found a dep not ok, and delete previous ok deps.
If dep is provided as Int, query Int for job and then replace Int with the job.
JobSchedulers.set_scheduler_while_loop
— Functionset_scheduler_while_loop(b::Bool)
if set to false, the scheduler will stop.
JobSchedulers.get_priority
— Functionget_priority(job::Job) = job.priority
JobSchedulers.get_thread_id
— Functionget_thread_id(job::Job) = job._thread_id
JobSchedulers.date_based_on
— Functiondate_based_on(c::Cron) -> Symbol
Whether date of c
is based on :dayofweek
, :monthday
, :everyday
, :both
, or :none
.
JobSchedulers.next_recur_job
— Functionnext_recur_job(j::Job) -> Union{Job, Nothing}
Based on j.cron
and j.until
, return a new recurring Job
or nothing
.
Internal - Progress Meter
To display a progress meter, please use wait_queue(show_progress = true)
.
JobSchedulers.JobGroup
— Typemutable struct JobGroup
name::String
total::Int
queuing::Int
running::Int
done::Int
failed::Int
cancelled::Int
end
JobGroup
is computed when displaying a progress meter.
JobSchedulers.get_group
— Functionget_group(job::Job, group_seperator = GROUP_SEPERATOR)
get_group(name::AbstractString, group_seperator = GROUP_SEPERATOR)
Return nested_group_names::Vector{String}
.
Eg: If job.name
is "A: B: 1232"
, return ["A", "A: B", "A: B: 1232"]
JobSchedulers.progress_bar
— Functionprogress_bar(percent::Float64, width::Int = 20)
Return ::String for progress bar whose char length is width
.
percent
: range from 0.0 - 1.0, or to be truncated.width
: should be > 3. If <= 10, percentage will not show. If > 10, percentage will show.
JobSchedulers.queue_progress
— Functionqueue_progress(;remove_tmp_files::Bool = true, kwargs...)
queue_progress(stdout_tmp::IO, stderr_tmp::IO;
group_seperator = GROUP_SEPERATOR, wait_second_for_new_jobs::Int = 1, loop::Bool = true, exit_num_jobs::Int = 0)
group_seperator
: delim to split(job::Job).name
to group and specific job names.wait_second_for_new_jobs::Int
: ifauto_exit
, and all jobs are PAST, not quitingqueue_progress
immediately but wait for a period. If new jobs are submitted, not quitingqueue_progress
.loop::Bool
: if false, only show the current progress and exit.exit_num_jobs::Int
: exit whenqueue()
has less thanInt
number of jobs. It is useful to ignore some jobs that are always running or recurring.
JobSchedulers.view_update
— Functionview_update(h, w; row = 1, groups_shown::Vector{JobGroup} = JobGroup[], is_in_terminal::Bool = true, is_interactive = true, group_seperator_at_begining = Regex("^" * GROUP_SEPERATOR.pattern))
Update the whole screen view.
JobSchedulers.PROGRESS_METER
— ConstantBool. Showing progress meter? Related to progress computation and display. true when waitqueue(showprogress=true)
JobSchedulers.update_group_state!
— Functionupdate_group_state!(job::Job)
This should only be called if JobSchedulers.PROGRESS_METER == true
. Update the job's group state, which will be used in Progress Meter.
JobSchedulers.init_group_state!
— Methodinit_group_state!()
Prepare group state for existing jobs
Internal - Const/Variable
const SCHEDULER_ACTION = Base.RefValue{Channel{Int}}() # defined in __init__()
const SCHEDULER_ACTION_LOCK = ReentrantLock()
const SCHEDULER_PROGRESS_ACTION = Base.RefValue{Channel{Int}}() # defined in __init__()
SCHEDULER_MAX_CPU::Int = nthreads() > 1 ? nthreads()-1 : Sys.CPU_THREADS
SCHEDULER_MAX_MEM::Int = round(Int, Sys.total_memory() * 0.9)
const JOB_QUEUE = JobQueue(; max_done = JOB_QUEUE_MAX_LENGTH, max_cancelled = max_done = JOB_QUEUE_MAX_LENGTH)
SCHEDULER_BACKUP_FILE::String = ""
SCHEDULER_WHILE_LOOP::Bool = true
SLEEP_HANDELED_TIME::Int = 10
DESTROY_UNNAMED_JOBS_WHEN_DONE::Bool = true
const ALL_JOB_GROUP = JobGroup("ALL JOBS")
const JOB_GROUPS = OrderedDict{String, JobGroup}()
const OTHER_JOB_GROUP = JobGroup("OTHERS")