API
Const/Variables
const B = 1
const KB = 1024
const MB = 1024KB
const GB = 1024MB
const TB = 1024GB
const QUEUING = :queuing
const RUNNING = :running
const DONE = :done
const FAILED = :failed
const CANCELLED = :cancelled
const PAST = :past # super set of DONE, FAILED, CANCELLED
const cron_none = Cron(:none)
const SCHEDULER_TASK = Base.RefValue{Task}()
const SCHEDULER_REACTIVATION_TASK = Base.RefValue{Task}()
Job
JobSchedulers.Job
— TypeJob(command::Base.AbstractCmd; stdout=nothing, stderr=nothing, append::Bool=false, kwargs...)
Job(f::Function; kwargs...)
Job(task::Task; kwargs...)
Arguments
command::Base.AbstractCmd
: the command to run.f::Function
: the function to run without any arguments, likef()
.task::Task
: the task to run. Eg:@task(1+1)
.
Common Keyword Arguments (kwargs...)
name::String = ""
: job name.user::String = ""
: user that job belongs to.ncpu::Real = 1.0
: number of CPU this job is about to use (can beFloat64
, eg:1.5
will use 150% CPU).mem::Integer = 0
: number of memory this job is about to use (supports TB, GB, MB, KB, B=1).schedule_time::Union{DateTime,Period} = DateTime(0)
: The expected time to run.dependency
: defer job until specified jobs reach specified state (QUEUING, RUNNING, DONE, FAILED, CANCELLED, PAST). PAST is the super set of DONE, FAILED, CANCELLED, which means the job will not run in the future. Eg:DONE => job
,[DONE => job1; PAST => job2]
.
The default state is DONE, so DONE => job
can be simplified to job
. To be compatible with old versions, you can also use job id (Int): [DONE => job.id]
.
wall_time::Period = Year(1)
: wall clock time limit. Jobs will be terminated after running for this period.priority::Int = 20
: lower means higher priority.cron::Cron = Cron(:none)
: job recurring at specfic date and time. See more atCron
.until::Union{DateTime,Period} = DateTime(9999,1,1)
: stop job recurringuntil
date and time.
Experimental Keyword Arguments - Output Redirection:
stdout=nothing
: redirect stdout to the file.stderr=nothing
: redirect stderr to the file.append::Bool=false
: append the stdout or stderr or not.
Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different Tasks
simultaneously (multi-threading).
JobSchedulers.submit!
— Functionsubmit!(job::Job)
submit!(args_of_Job...; kwargs_of_Job...)
Submit the job to queue.
submit!(Job(...))
can be simplified tosubmit!(...)
. They are equivalent.
JobSchedulers.@submit
— Macro@submit [option=value]... expr
Submit a job from expr
. If a Job
is explicitly shown in expr
, DONE => job
will be automatically added to the dependency list.
expr
: any type ofExpr
ession is supported.option = value
: kwargs ofJob
. Ifexpr
is parsed to be aPipelines.Program
,option
s also include its inputs, outputs and run kwargs.
Example
j = @submit 1+1
wait(j)
@assert result(j) == 2
# you can use any keyword arguments that `Job` supports, such as `name`, `ncpu`:
j_2sec = @submit name = "run after 2 sec" begin sleep(2); 32 end
# because `j_2sec isa Job`, `DONE => j_2sec` is pushed to `j2.dependency`.
j2 = @submit mem=2KB begin
1 + result(j_2sec)
end
wait(j2)
@assert result(j2) == 1 + 32
# you can also manually add dependencies not in the `expr`:
j3 = @submit dependency = [PAST => j] println("j3 finished. result of j2 = ", result(j2))
# Note: j3.dependency might be empty after submit, because JobScheduler will remove jobs that reached their states in the dependency list.
@submit
cannot know the elements in a container, so it is unable to walk through and add Job dependencies in a container.
jobs = Job[] # the job container
for i in 1:2
push!(jobs, @submit begin sleep(30);i end) # 10 jobs will be added to `jobs`
end
x = 0
j_something_wrong = @submit for j in jobs
# have to use global x
global x += result(j)
end
# ┌ Warning: Getting result from a running job: returned value might be unexpected.
# └ @ JobSchedulers ~/projects/JobSchedulers.jl/src/jobs.jl:318
result(j_something_wrong)
# MethodError(+, (nothing, nothing), 0x0000000000007b16)
To avoid it, we can (1) use submit!
, or (2) explicitly add dependency = jobs
to @submit
.
x = 0
j_ok = submit!(dependency = jobs) do
for j in jobs
# have to use global x
global x += result(j)
end
end
wait(j_ok)
@assert x == 55
x = 100
j_ok_too = @submit dependency = jobs for j in jobs
# have to use global x
global x += result(j)
end
wait(j_ok_too)
@assert x == 155
JobSchedulers.cancel!
— Functioncancel!(job::Job)
Cancel job
, stop queuing or running.
JobSchedulers.result
— Functionresult(job::Job)
Return the result of job
. If the job is not done, a warning message will also show.
Base.fetch
— Methodfetch(x::Job)
Wait for a Job
to finish, then return its result value. If the task fails with an exception, a TaskFailedException
(which wraps the failed task) is thrown.
fetch(x::Job)
is available from JobSchedulers v0.10.2.
JobSchedulers.isqueuing
— Functionisqueuing(j::Job) :: Bool
JobSchedulers.isrunning
— Functionisrunning(j::Job) :: Bool
JobSchedulers.isdone
— Functionisdone(j::Job) :: Bool
JobSchedulers.iscancelled
— Functioniscancelled(j::Job) :: Bool
JobSchedulers.isfailed
— Functionisfailed(j::Job) :: Bool
JobSchedulers.ispast
— Functionispast(j::Job) :: Bool = j.state === DONE || j.state === CANCELLED || j.state === FAILED
Cron: Job Recur/Repeat
JobSchedulers.Cron
— TypeCron(second, minute, hour, day_of_month, month, day_of_week)
Cron(;
second = 0,
minute = '*',
hour = '*',
day_of_month = '*',
month = '*',
day_of_week = '*',
)
Cron
stores the schedule of a repeative Job
, implemented according to Linux-based crontab
(5) table.
Jobs are executed when the second, minute, hour and month fields match the current time. If neither day_of_month
nor day_of_week
starts with *
, cron takes the union (∪) of their values day_of_month ∪ day_of_week
. Otherwise cron takes the intersection (∩) of their values day_of_month ∩ day_of_week
.
When an argument is an Int
:
Field | Allowed values |
---|---|
second | 0-59 |
minute | 0-59 |
hour | 0-23 |
day_of_month | 1-31 |
month | 1-12 |
day_of_week | 1-7 (1 is Monday) |
- Typical Linux distributions do not have
second
filed as JobSchedulers. - Sunday is only coded
7
in JobSchedulers, while it is0
or7
in Linux, so the behaviors likeday_of_week = "*/2"
are different in two systems. - From JobSchedulers v0.11,
Cron
has been rewritten based on the standard crontab, including its bug described here.
When an argument is a String
or Char
:
An argument may be an asterisk (*
), which always stands for $first-last$.
Ranges of numbers are allowed. Ranges are two numbers separated with a hyphen. The specified range is inclusive. For example, 8-11 for an $hours$ entry specifies execution at hours 8, 9, 10 and 11.
Lists are allowed. A list is a set of numbers (or ranges) separated by commas. Examples: "1,2,5,9"
, "0-4,8-12"
.
Step values can be used in conjunction with ranges. Following a range with /<number>
specifies skips of the number's value through the range. For example, "0-23/2"
can be used in the hour
argument to specify Job execution every other hour (the alternative is "0,2,4,6,8,10,12,14,16,18,20,22"
). Steps are also permitted after an asterisk, so if you want to say $every two hours$, just use "*/2"
.
When an argument is a Vector
:
Vector
works like lists mentioned above. For example, [1,2,5,9]
is equivalent to "1,2,5,9"
.
When an argument is a UInt64
:
UInt64
is the internal type of Cron
fileds. All the previous types will be converted to a UInt64
bit array. The start index of the bit array is 0. Bits outside of the allowed values (see the table above) are ignored.
Cron(special::Symbol)
Instead of the six arguments of Cron
, one of the following special symbols may appear instead:
special | Meaning |
---|---|
:yearly | Run once a year, Cron(0,0,0,1,1,0) |
:annually | (same as :yearly ) |
:monthly | Run once a month, Cron(0,0,0,1,'*','*') |
:weekly | Run once a week, Cron(0,0,0,'*','*',1) |
:daily | Run once a day, Cron(0,0,0,'*','*','*') |
:midnight | (same as :daily ) |
:hourly | Run once an hour, Cron(0,0,'*','*','*','*') |
:none | Never repeat, Cron(0,0,0,0,0,0) |
Caution: Linux crontab's special :reboot
is not supported here.
To run every minute, just use Cron()
.
JobSchedulers.cron_value_parse
— Functioncron_value_parse(value::UInt64)
cron_value_parse(value::Signed)
cron_value_parse(value::AbstractString)
cron_value_parse(value::Char)
cron_value_parse(value::Vector)
cron_value_parse(*) = cron_value_parse('*')
Parse crontab-like value to UInt64
. See details: Cron
.
Dates.tonext
— MethodDates.tonext(dt::DateTime, c::Cron; same::Bool = false) -> Union{DateTime, Nothing}
Dates.tonext(t::Time, c::Cron; same::Bool = false) -> Time
Dates.tonext(d::Date, c::Cron; same::Bool = false, limit::Date = d + Day(3000)) -> Union{DateTime, Nothing}
Adjust date or time to the next one corresponding to c::Cron
. Setting same=true
allows the current date or time to be considered as the next one, allowing for no adjustment to occur.
Queue
JobSchedulers.queue
— Functionqueue(; all::Bool = false) -> Vector{Job}
queue(state::Symbol ) -> Vector{Job}
queue(needle) -> Vector{Job}
queue(state::Symbol , needle) -> Vector{Job}
queue(needle, state::Symbol ) -> Vector{Job}
queue(id::Integer) -> Job
all::Bool
: if true, get all jobs. if false, get only running and queuing jobs.state::Symbol
: get jobs with a specific state, including:all
,QUEUING
,RUNNING
,DONE
,FAILED
,CANCELLED
,PAST
.PAST
is the superset ofDONE
,FAILED
,CANCELLED
.needle::Union{AbstractString,AbstractPattern,AbstractChar}
: get jobs if they containneedle
in their name or user.id::Integer
: get the job with the specificid
.
JobSchedulers.all_queue
— Functionall_queue()
all_queue(id::Integer)
all_queue(state::Symbol)
all_queue(needle::Union{AbstractString,AbstractPattern,AbstractChar})
state::Symbol
: get jobs with a specific state, including:all
,QUEUING
,RUNNING
,DONE
,FAILED
,CANCELLED
,PAST
.PAST
is the superset ofDONE
,FAILED
,CANCELLED
.needle::Union{AbstractString,AbstractPattern,AbstractChar}
: get jobs if they containneedle
in their name or user.id::Integer
: get the job with the specificid
.
JobSchedulers.job_query
— Functionjob_query_by_id(id::Integer)
Search job by job.id
in the queue.
Return job::Job
if found, nothing
if not found.
Wait For Jobs
JobSchedulers.wait_queue
— Functionwait_queue(;show_progress::Bool = false, exit_num_jobs::Int = 0)
Wait for all jobs in queue()
become finished.
show_progress = true
, job progress will show.exit_num_jobs::Int
: exit whenqueue()
has less thanInt
number of jobs. It is useful to ignore some jobs that are always running or recurring.
See also: queue_progress
.
Base.wait
— Methodwait(j::Job)
wait(js::Vector{Job})
Wait for the job(s) to be finished.
Scheduler Settings
JobSchedulers.set_scheduler
— Functionset_scheduler(;
max_cpu::Real = JobSchedulers.SCHEDULER_MAX_CPU,
max_mem::Real = JobSchedulers.SCHEDULER_MAX_MEM,
max_job::Int = JobSchedulers.JOB_QUEUE.max_done,
max_cancelled_job::Int = JobSchedulers.JOB_QUEUE.max_cancelled_job
)
max_job
: the number of jobs done. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.max_cancelled_job
: the number of cancelled jobs. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.
See details: set_scheduler_max_cpu
, set_scheduler_max_mem
, set_scheduler_max_job
JobSchedulers.set_scheduler_max_cpu
— Functionset_scheduler_max_cpu(ncpu::Int = default_ncpu())
set_scheduler_max_cpu(percent::Float64)
Set the maximum CPU (thread) the scheduler can use. If starting Julia with multiple threads in the default thread pool, the maximum CPU is the number of tids in the default thread pool not equal to 1.
Example
set_scheduler_max_cpu() # use all available CPUs
set_scheduler_max_cpu(4) # use 4 CPUs
set_scheduler_max_cpu(0.5) # use 50% of CPUs
JobSchedulers.set_scheduler_max_mem
— Functionset_scheduler_max_mem(mem::Integer = default_mem())
set_scheduler_max_mem(percent::AbstractFloat)
Set the maximum RAM the scheduler can use.
Example
set_scheduler_max_mem() # use 80% of total memory
set_scheduler_max_mem(4GB) # use 4GB memory
set_scheduler_max_mem(4096MB)
set_scheduler_max_mem(4194304KB)
set_scheduler_max_mem(4294967296B)
set_scheduler_max_mem(0.5) # use 50% of total memory
JobSchedulers.set_scheduler_max_job
— Functionset_scheduler_max_job(max_done::Int = 10000, max_cancelled::Int = max_done)
Set the number of finished jobs. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.
JobSchedulers.destroy_unnamed_jobs_when_done
— FunctionJobSchedulers.destroy_unnamed_jobs_when_done(b::Bool)
Before v0.10, all jobs will be saved to queue. However, from v0.10, unnamed jobs (job.name == ""
) will not be saved if it successfully ran. If you want to save unnamed jobs, you can set using JobSchedulers.destroy_unnamed_jobs_when_done(false)
.
JobSchedulers.set_group_seperator
— Functionset_group_seperator(group_seperator::Regex) = global GROUP_SEPERATOR = group_seperator
Set the group seperator. Group seperator is used to group the names of Jobs. Used when display the progress meter using wait_queue(show_progress=true)
Default is r": *"
.
JobSchedulers.GROUP_SEPERATOR
— ConstantGROUP_SEPERATOR::Regex = r": *"
Group seperator is used to group the names of Jobs. Used when display the progress meter using wait_queue(show_progress=true)
To set it, use set_group_seperator(group_seperator::Regex)
.
Scheduler Control
Scheduler is automatically started, so it is not necessary to start/stop it.
JobSchedulers.scheduler_status
— Functionscheduler_status() :: Symbol
Print the settings and status of job scheduler. Return :not_running
or :running
.
JobSchedulers.scheduler_start
— Functionscheduler_start()
Start the job scheduler.
JobSchedulers.scheduler_stop
— Functionscheduler_stop()
Stop the job scheduler.
Optimize CPU Usage
JobSchedulers.solve_optimized_ncpu
— Functionsolve_optimized_ncpu(default::Int;
ncpu_range::UnitRange{Int} = 1:total_cpu,
njob::Int = 1,
total_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU,
side_jobs_cpu::Int = 0)
Find the optimized number of CPU for a job.
default
: default ncpu of the job.ncpu_range
: the possible ncpu range of the job.njob
: number of the same job.total_cpu
: the total CPU that can be used by JobSchedulers.side_jobs_cpu
: some small jobs that might be run when the job is running, so the job won't use up all of the resources and stop small tasks.
Backup
JobSchedulers.set_scheduler_backup
— Functionset_scheduler_backup(
filepath::AbstractString = "";
migrate::Bool = false,
delete_old::Bool = false,
recover_settings::Bool = true,
recover_queue::Bool = true
)
Set the backup file of job scheduler.
If filepath
was set to ""
, stop backup at exit.
If filepath
was set to an existing file, recover_settings
or recover_queue
from filepath
immediately.
If filepath
was set to a new file, the backup file will be created at exit.
If migrate=true
and the old JobSchedulers.SCHEDULER_BACKUP_FILE
exists, the old backup file will be recovered before recovering from filepath
.
JobSchedulers.backup
— Functionbackup()
Manually backup job scheduler settings and queues. The function is automatically triggered at exit.
Internal
Internal - Scheduling
JobSchedulers.JobQueue
— TypeJobQueue(; max_done::Int = 10000, max_cancelled::Int = 10000)
mutable struct JobQueue
const queuing::SortedDict{Int,MutableLinkedList{Job},Base.Order.ForwardOrdering} # priority => Job List
const queuing_0cpu::MutableLinkedList{Job} # ncpu = 0, can run immediately
const future::MutableLinkedList{Job} # all jobs with schedule_time > now()
const running::MutableLinkedList{Job} const running::Vector{Job}
const done::Vector{Job}
const failed::Vector{Job}
const cancelled::Vector{Job}
max_done::Int
max_cancelled::Int
const lock_queuing::ReentrantLock
const lock_running::ReentrantLock
const lock_past::ReentrantLock
end
JobSchedulers.scheduler
— Methodscheduler()
The function of running Job's scheduler. It needs to be called by scheduler_start()
, rather than calling directly.
JobSchedulers.istaskfailed2
— Functionistaskfailed2(t::Task)
Extend Base.istaskfailed
to fit Pipelines and JobSchedulers packages, which will return a StackTraceVector
in t.result
, while Base considered it as :done
. The function checks the situation and modifies the real task status and other properties.
JobSchedulers.unsafe_run!
— Functionunsafe_run!(job::Job, current::DateTime=now()) :: Bool
Jump the queue and run job
immediately, no matter what other jobs are running or waiting. If successful initiating to run, return true
, else false
.
Caution: it will not trigger scheduler_need_action()
.
JobSchedulers.unsafe_cancel!
— Functionunsafe_cancel!(job::Job, current::DateTime=now())
Caution: it is unsafe and should only be called within lock. Do not call from other module.
Caution: it will not trigger scheduler_need_action()
.
JobSchedulers.unsafe_update_state!
— Functionunsafe_update_state!(job::Job)
Update the state of job
from job.task
when job.state === :running
.
If a repeative job is PAST, submit a new job.
Caution: it is unsafe and should only be called within lock.
JobSchedulers.is_dependency_ok
— Functionis_dependency_ok(job::Job)::Bool
Caution: run it within lock only.
Algorithm: Break while loop when found dep not ok, and change job._dep_check_id
to the current id.
If dep is provided as Integer, query Integer for job and then replace Integer with the job.
JobSchedulers.set_scheduler_while_loop
— Functionset_scheduler_while_loop(b::Bool)
if set to false, the scheduler will stop.
JobSchedulers.get_priority
— Functionget_priority(job::Job) = job.priority
JobSchedulers.get_thread_id
— Functionget_thread_id(job::Job) = job._thread_id
JobSchedulers.date_based_on
— Functiondate_based_on(c::Cron) -> Symbol
Whether date of c
is based on :day_of_week
, :day_of_month
, :union
, :intersect
, :everyday
, :none
, or :undefined
.
JobSchedulers.next_recur_job
— Functionnext_recur_job(j::Job) -> Union{Job, Nothing}
Based on j.cron
and j.until
, return a new recurring Job
or nothing
.
Internal - Progress Meter
To display a progress meter, please use wait_queue(show_progress = true)
.
JobSchedulers.JobGroup
— Typemutable struct JobGroup
name::String
total::Int
queuing::Int
running::Int
done::Int
failed::Int
cancelled::Int
end
JobGroup
is computed when displaying a progress meter.
JobSchedulers.get_group
— Functionget_group(job::Job, group_seperator = GROUP_SEPERATOR)
get_group(name::AbstractString, group_seperator = GROUP_SEPERATOR)
Return nested_group_names::Vector{String}
.
Eg: If job.name
is "A: B: 1232"
, return ["A", "A: B", "A: B: 1232"]
JobSchedulers.progress_bar
— Functionprogress_bar(percent::Float64, width::Integer = 20)
Return ::String for progress bar whose char length is width
.
percent
: range from 0.0 - 1.0, or to be truncated.width
: should be > 3. If <= 10, percentage will not show. If > 10, percentage will show.
JobSchedulers.queue_progress
— Functionqueue_progress(;remove_tmp_files::Bool = true, kwargs...)
queue_progress(stdout_tmp::IO, stderr_tmp::IO;
group_seperator = GROUP_SEPERATOR, wait_second_for_new_jobs::Int = 1, loop::Bool = true, exit_num_jobs::Int = 0)
group_seperator
: delim to split(job::Job).name
to group and specific job names.wait_second_for_new_jobs::Int
: ifauto_exit
, and all jobs are PAST, not quitingqueue_progress
immediately but wait for a period. If new jobs are submitted, not quitingqueue_progress
.loop::Bool
: if false, only show the current progress and exit.exit_num_jobs::Int
: exit whenqueue()
has less thanInt
number of jobs. It is useful to ignore some jobs that are always running or recurring.
JobSchedulers.view_update
— Functionview_update(h, w; row = 1, groups_shown::Vector{JobGroup} = JobGroup[], is_in_terminal::Bool = true, is_interactive = true, group_seperator_at_begining = Regex("^" * GROUP_SEPERATOR.pattern))
Update the whole screen view.
JobSchedulers.PROGRESS_METER
— ConstantBool. Showing progress meter? Related to progress computation and display. true when waitqueue(showprogress=true)
JobSchedulers.update_group_state!
— Functionupdate_group_state!(job::Job)
This should only be called if JobSchedulers.PROGRESS_METER == true
. Update the job's group state, which will be used in Progress Meter.
JobSchedulers.init_group_state!
— Methodinit_group_state!()
Prepare group state for existing jobs
Internal - Const/Variable
JobSchedulers.THREAD_POOL
— Constantconst THREAD_POOL = Base.RefValue{Channel{Int}}()
Defined in __init__()
.
If version > 1.9, THREAD_POOL
contains only tids in Threads.threadpooltids(:default)
.
Also, the thread 1 is reserved for JobScheduler.
JobSchedulers.SINGLE_THREAD_MODE
— Constantconst SINGLE_THREAD_MODE = Base.RefValue{Bool}()
Defined in __init__()
. Whether Threads.threadpooltids(:default)
are empty or == [1]
.
JobSchedulers.TIDS
— Constantconst TIDS = Vector{Int}()
Defined in __init__()
. All tids in the default thread pool, excluding tid 1.
const SCHEDULER_ACTION = Base.RefValue{Channel{Int}}() # defined in __init__()
const SCHEDULER_ACTION_LOCK = ReentrantLock()
const SCHEDULER_PROGRESS_ACTION = Base.RefValue{Channel{Int}}() # defined in __init__()
SCHEDULER_MAX_CPU::Int = -1 # set in __init__
SCHEDULER_MAX_MEM::Int64 = Int64(-1) # set in __init__
SCHEDULER_UPDATE_SECOND::Float64 = 0.05 # set in __init__
const JOB_QUEUE = JobQueue(; max_done = JOB_QUEUE_MAX_LENGTH, max_cancelled = max_done = JOB_QUEUE_MAX_LENGTH)
SCHEDULER_BACKUP_FILE::String = ""
SCHEDULER_WHILE_LOOP::Bool = true
SLEEP_HANDELED_TIME::Int = 10
DESTROY_UNNAMED_JOBS_WHEN_DONE::Bool = true
const ALL_JOB_GROUP = JobGroup("ALL JOBS")
const JOB_GROUPS = OrderedDict{String, JobGroup}()
const OTHER_JOB_GROUP = JobGroup("OTHERS")