API

Const/Variables

const B = 1
const KB = 1024
const MB = 1024KB
const GB = 1024MB
const TB = 1024GB

const QUEUING = :queuing
const RUNNING = :running
const DONE = :done
const FAILED = :failed
const CANCELLED = :cancelled
const PAST = :past # super set of DONE, FAILED, CANCELLED

const cron_none = Cron(:none)

const SCHEDULER_TASK = Base.RefValue{Task}()
const SCHEDULER_REACTIVATION_TASK = Base.RefValue{Task}()

Job

JobSchedulers.JobType
Job(command::Base.AbstractCmd; stdout=nothing, stderr=nothing, append::Bool=false, kwargs...)
Job(f::Function; kwargs...)
Job(task::Task; kwargs...)

Arguments

  • command::Base.AbstractCmd: the command to run.
  • f::Function: the function to run without any arguments, like f().
  • task::Task: the task to run. Eg: @task(1+1).

Common Keyword Arguments (kwargs...)

  • name::String = "": job name.
  • user::String = "": user that job belongs to.
  • ncpu::Real = 1.0: number of CPU this job is about to use (can be Float64, eg: 1.5 will use 150% CPU).
  • mem::Int64 = 0: number of memory this job is about to use (supports TB, GB, MB, KB, B=1).
  • schedule_time::Union{DateTime,Period} = DateTime(0): The expected time to run.
  • dependency: defer job until specified jobs reach specified state (QUEUING, RUNNING, DONE, FAILED, CANCELLED, PAST). PAST is the super set of DONE, FAILED, CANCELLED, which means the job will not run in the future. Eg: DONE => job, [DONE => job1; PAST => job2].
Dependency

The default state is DONE, so DONE => job can be simplified to job. To be compatible with old versions, you can also use job id (Int): [DONE => job.id]. JobSchedulers will remove jobs that reached their states in the dependency list.

  • wall_time::Period = Year(1): wall clock time limit. Jobs will be terminated after running for this period.

  • priority::Int = 20: lower means higher priority.

  • cron::Cron = Cron(:none): job recurring at specfic date and time. See more at Cron.

  • until::Union{DateTime,Period} = DateTime(9999,1,1): stop job recurring until date and time.

Experimental Keyword Arguments - Output Redirection:

  • stdout=nothing: redirect stdout to the file.
  • stderr=nothing: redirect stderr to the file.
  • append::Bool=false: append the stdout or stderr or not.
Note

Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different Tasks simultaneously (multi-threading).

See also submit!, @submit, Cron

source
JobSchedulers.submit!Function
submit!(job::Job)
submit!(args_of_Job...; kwargs_of_Job...)

Submit the job to queue.

submit!(Job(...)) can be simplified to submit!(...). They are equivalent.

See also Job, @submit

source
JobSchedulers.@submitMacro
@submit [option=value]... expr

Submit a job from expr. If a Job is explicitly shown in expr, DONE => job will be automatically added to the dependency list.

  • expr: any type of Expression is supported.

  • option = value: kwargs of Job. If expr is parsed to be a Pipelines.Program, options also include its inputs, outputs and run kwargs.

See also Job, submit!

Example

j = @submit 1+1
wait(j)
@assert result(j) == 2

# you can use any keyword arguments that `Job` supports, such as `name`, `ncpu`:
j_2sec = @submit name = "run after 2 sec" begin sleep(2); 32 end

# because `j_2sec isa Job`, `DONE => j_2sec` is pushed to `j2.dependency`.
j2 = @submit mem=2KB begin
    1 + result(j_2sec)
end

wait(j2)
@assert result(j2) == 1 + 32

# you can also manually add dependencies not in the `expr`:
j3 = @submit dependency = [PAST => j] println("j3 finished. result of j2 = ", result(j2))

# Note: j3.dependency might be empty after submit, because JobScheduler will remove jobs that reached their states in the dependency list.
Only explicit jobs can be automatically added to dependency

@submit cannot know the elements in a container, so it is unable to walk through and add Job dependencies in a container.

jobs = Job[]  # the job container
for i in 1:2
    push!(jobs, @submit begin sleep(30);i end) # 10 jobs will be added to `jobs`
end

x = 0
j_something_wrong = @submit for j in jobs
    # have to use global x
    global x += result(j)
end
# ┌ Warning: Getting result from a running job: returned value might be unexpected.
# └ @ JobSchedulers ~/projects/JobSchedulers.jl/src/jobs.jl:318

result(j_something_wrong)
# MethodError(+, (nothing, nothing), 0x0000000000007b16)

To avoid it, we can (1) use submit!, or (2) explicitly add dependency = jobs to @submit.

x = 0
j_ok = submit!(dependency = jobs) do
    for j in jobs
        # have to use global x
        global x += result(j)
    end
end
wait(j_ok)
@assert x == 55

x = 100
j_ok_too = @submit dependency = jobs for j in jobs
    # have to use global x
    global x += result(j)
end
wait(j_ok_too)
@assert x == 155
source
JobSchedulers.resultFunction
result(job::Job)

Return the result of job. If the job is not done, a warning message will also show.

source
Base.fetchMethod
fetch(x::Job)

Wait for a Job to finish, then return its result value. If the task fails with an exception, a TaskFailedException (which wraps the failed task) is thrown.

Compat

fetch(x::Job) is available from JobSchedulers v0.10.2.

source

Cron: Job Recur/Repeat

JobSchedulers.CronType
Cron(second, minute, hour, day_of_month, month, day_of_week)
Cron(;
    second = 0,
    minute = '*',
    hour = '*',
    day_of_month = '*',
    month = '*',
    day_of_week = '*',
)

Cron stores the schedule of a repeative Job, inspired by Linux-based crontab(5) table.

Jobs are executed by JobScheduler when the second, minute, and hour fields match the current time, and when at least one of the two day fields (day of month & month, or day of week) match the current time.

When an argument is an Int64:

FieldAllowed values
second0-59
minute0-59
hour0-23
day_of_month1-31
month1-12
day_of_week1-7 (1 is Monday)

When an argument is a String or Char:

An argument may be an asterisk (*), which always stands for $first-last$.

Ranges of numbers are allowed. Ranges are two numbers separated with a hyphen. The specified range is inclusive. For example, 8-11 for an $hours$ entry specifies execution at hours 8, 9, 10 and 11.

Lists are allowed. A list is a set of numbers (or ranges) separated by commas. Examples: "1,2,5,9", "0-4,8-12".

Step values can be used in conjunction with ranges. Following a range with /<number> specifies skips of the number's value through the range. For example, "0-23/2" can be used in the hour argument to specify Job execution every other hour (the alternative is "0,2,4,6,8,10,12,14,16,18,20,22"). Steps are also permitted after an asterisk, so if you want to say $every two hours$, just use "*/2".

When an argument is a Vector:

Vector works like lists mentioned above. For example, [1,2,5,9] is equivalent to "1,2,5,9".

When an argument is a UInt64:

UInt64 is the internal type of Cron fileds. All the previous types will be converted to a UInt64 bit array. The start index of the bit array is 0. Bits outside of the allowed values (see the table above) are ignored.

source
Cron(special::Symbol)

Instead of the six arguments of Cron, one of the following special symbols may appear instead:

specialMeaning
:yearlyRun once a year, Cron(0,0,0,1,1,0)
:annually(same as :yearly)
:monthlyRun once a month, Cron(0,0,0,1,'*','*')
:weeklyRun once a week, Cron(0,0,0,'*','*',1)
:dailyRun once a day, Cron(0,0,0,'*','*','*')
:midnight(same as :daily)
:hourlyRun once an hour, Cron(0,0,'*','*','*','*')
:noneNever repeat, Cron(0,0,0,0,0,0)

Caution: Linux crontab's special :reboot is not supported here.

To run every minute, just use Cron().

source
JobSchedulers.cron_value_parseFunction
cron_value_parse(value::UInt64)
cron_value_parse(value::Int64)
cron_value_parse(value::String)
cron_value_parse(value::Char)
cron_value_parse(value::Vector)
cron_value_parse(*) = cron_value_parse('*')

Parse crontab-like value to UInt64. See details: Cron.

source
Dates.tonextMethod
Dates.tonext(dt::DateTime, c::Cron) -> Union{DateTime, Nothing}
Dates.tonext(t::Time, c::Cron; same::Bool = false) -> Time
Dates.tonext(d::Date, c::Cron; same::Bool = false) -> Union{DateTime, Nothing}

Jobs are executed by JobScheduler when the second, minute, hour, and month of year fields match the current time, and when at least one of the two day fields (day of month, or day of week) match the current time.

source

Queue

JobSchedulers.queueFunction
queue(; all::Bool = false)    -> Vector{Job}
queue(state::Symbol )         -> Vector{Job}
queue(needle)                 -> Vector{Job}
queue(state::Symbol , needle) -> Vector{Job}
queue(needle, state::Symbol ) -> Vector{Job}
queue(id::Int)                -> Job
  • all::Bool: if true, get all jobs. if false, get only running and queuing jobs.

  • state::Symbol: get jobs with a specific state, including :all, QUEUING, RUNNING, DONE, FAILED, CANCELLED, PAST.

    PAST is the superset of DONE, FAILED, CANCELLED.

  • needle::Union{AbstractString,AbstractPattern,AbstractChar}: get jobs if they contain needle in their name or user.

  • id::Int: get the job with the specific id.

source
JobSchedulers.all_queueFunction
all_queue()
all_queue(id::Int64)
all_queue(state::Symbol)
all_queue(needle::Union{AbstractString,AbstractPattern,AbstractChar})
  • state::Symbol: get jobs with a specific state, including :all, QUEUING, RUNNING, DONE, FAILED, CANCELLED, PAST.

    PAST is the superset of DONE, FAILED, CANCELLED.

  • needle::Union{AbstractString,AbstractPattern,AbstractChar}: get jobs if they contain needle in their name or user.

  • id::Int: get the job with the specific id.

source
JobSchedulers.job_queryFunction
job_query_by_id(id::Int64)

Search job by job.id in the queue.

Return job::Job if found, nothing if not found.

source

Wait For Jobs

JobSchedulers.wait_queueFunction
wait_queue(;show_progress::Bool = false, exit_num_jobs::Int = 0)

Wait for all jobs in queue() become finished.

  • show_progress = true, job progress will show.

  • exit_num_jobs::Int: exit when queue() has less than Int number of jobs. It is useful to ignore some jobs that are always running or recurring.

See also: queue_progress.

source
Base.waitMethod
wait(j::Job)
wait(js::Vector{Job})

Wait for the job(s) to be finished.

source

Scheduler Settings

JobSchedulers.set_schedulerFunction
set_scheduler(;
    max_cpu::Union{Int,Float64} = JobSchedulers.SCHEDULER_MAX_CPU,
    max_mem::Union{Int,Float64} = JobSchedulers.SCHEDULER_MAX_MEM,
    max_job::Int = JobSchedulers.JOB_QUEUE.max_done,
    max_cancelled_job::Int = JobSchedulers.JOB_QUEUE.max_cancelled_job
)
  • max_job: the number of jobs done. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.
  • max_cancelled_job: the number of cancelled jobs. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.

See details: set_scheduler_max_cpu, set_scheduler_max_mem, set_scheduler_max_job

source
JobSchedulers.set_scheduler_max_cpuFunction
set_scheduler_max_cpu(ncpu::Int = default_ncpu())
set_scheduler_max_cpu(percent::Float64)

Set the maximum CPU (thread) the scheduler can use. If starting Julia with multi-threads, the maximum CPU is nthreads() - 1.

Example

set_scheduler_max_cpu()     # use all available CPUs
set_scheduler_max_cpu(4)    # use 4 CPUs
set_scheduler_max_cpu(0.5)  # use 50% of CPUs
source
JobSchedulers.set_scheduler_max_memFunction
set_scheduler_max_mem(mem::Int = default_mem())
set_scheduler_max_mem(percent::Float64)

Set the maximum RAM the scheduler can use.

Example

set_scheduler_max_mem()             # use 80% of total memory

set_scheduler_max_mem(4GB)          # use 4GB memory
set_scheduler_max_mem(4096MB)
set_scheduler_max_mem(4194304KB)
set_scheduler_max_mem(4294967296B)

set_scheduler_max_mem(0.5)          # use 50% of total memory
source
JobSchedulers.set_scheduler_max_jobFunction
set_scheduler_max_job(max_done::Int = 10000, max_cancelled::Int = max_done)

Set the number of finished jobs. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.

source
JobSchedulers.destroy_unnamed_jobs_when_doneFunction
JobSchedulers.destroy_unnamed_jobs_when_done(b::Bool)
New in v0.10

Before v0.10, all jobs will be saved to queue. However, from v0.10, unnamed jobs (job.name == "") will not be saved if it successfully ran. If you want to save unnamed jobs, you can set using JobSchedulers.destroy_unnamed_jobs_when_done(false).

source
JobSchedulers.set_group_seperatorFunction
set_group_seperator(group_seperator::Regex) = global GROUP_SEPERATOR = group_seperator

Set the group seperator. Group seperator is used to group the names of Jobs. Used when display the progress meter using wait_queue(show_progress=true)

Default is r": *".

source
JobSchedulers.GROUP_SEPERATORConstant
GROUP_SEPERATOR::Regex = r": *"

Group seperator is used to group the names of Jobs. Used when display the progress meter using wait_queue(show_progress=true)

To set it, use set_group_seperator(group_seperator::Regex).

source

Scheduler Control

Scheduler is automatically started, so it is not necessary to start/stop it.

Optimize CPU Usage

JobSchedulers.solve_optimized_ncpuFunction
solve_optimized_ncpu(default::Int; 
    ncpu_range::UnitRange{Int64} = 1:total_cpu, 
    njob::Int = 1, 
    total_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU, 
    side_jobs_cpu::Int = 0)

Find the optimized number of CPU for a job.

  • default: default ncpu of the job.
  • ncpu_range: the possible ncpu range of the job.
  • njob: number of the same job.
  • total_cpu: the total CPU that can be used by JobSchedulers.
  • side_jobs_cpu: some small jobs that might be run when the job is running, so the job won't use up all of the resources and stop small tasks.
source

Backup

JobSchedulers.set_scheduler_backupFunction
set_scheduler_backup(
    filepath::AbstractString = "";
    migrate::Bool = false,
    delete_old::Bool = false,
    recover_settings::Bool = true,
    recover_queue::Bool = true
)

Set the backup file of job scheduler.

If filepath was set to "", stop backup at exit.

If filepath was set to an existing file, recover_settings or recover_queue from filepath immediately.

If filepath was set to a new file, the backup file will be created at exit.

If migrate=true and the old JobSchedulers.SCHEDULER_BACKUP_FILE exists, the old backup file will be recovered before recovering from filepath.

source
JobSchedulers.backupFunction
backup()

Manually backup job scheduler settings and queues. The function is automatically triggered at exit.

source

Internal

Internal - Scheduling

JobSchedulers.JobQueueType
JobQueue(; max_done::Int = 10000, max_cancelled::Int = 10000)
mutable struct JobQueue
    const queuing::SortedDict{Int,Vector{Job},Base.Order.ForwardOrdering}  # priority => Job List
    const queuing_0cpu::Vector{Job}              # ncpu = 0, can run immediately
    const future::Vector{Job}                    # all jobs with schedule_time > now()
    const running::Vector{Job}
    const done::Vector{Job}
    const failed::Vector{Job}
    const cancelled::Vector{Job}
    max_done::Int
    max_cancelled::Int
    const lock_queuing::ReentrantLock
    const lock_running::ReentrantLock
    const lock_past::ReentrantLock
end
source
JobSchedulers.schedulerMethod
scheduler()

The function of running Job's scheduler. It needs to be called by scheduler_start(), rather than calling directly.

source
JobSchedulers.istaskfailed2Function
istaskfailed2(t::Task)

Extend Base.istaskfailed to fit Pipelines and JobSchedulers packages, which will return a StackTraceVector in t.result, while Base considered it as :done. The function checks the situation and modifies the real task status and other properties.

source
JobSchedulers.unsafe_run!Function
unsafe_run!(job::Job, current::DateTime=now()) :: Bool

Jump the queue and run job immediately, no matter what other jobs are running or waiting. If successful initiating to run, return true, else false.

Caution: it will not trigger scheduler_need_action().

source
JobSchedulers.unsafe_cancel!Function
unsafe_cancel!(job::Job, current::DateTime=now())

Caution: it is unsafe and should only be called within lock. Do not call from other module.

Caution: it will not trigger scheduler_need_action().

source
JobSchedulers.unsafe_update_state!Function
unsafe_update_state!(job::Job)

Update the state of job from job.task when job.state === :running.

If a repeative job is PAST, submit a new job.

Caution: it is unsafe and should only be called within lock.

source
JobSchedulers.is_dependency_okFunction
is_dependency_ok(job::Job)::Bool

Caution: run it within lock only.

Algorithm: Break for loop when found a dep not ok, and delete previous ok deps.

If dep is provided as Int, query Int for job and then replace Int with the job.

source

Internal - Progress Meter

Note

To display a progress meter, please use wait_queue(show_progress = true).

JobSchedulers.JobGroupType
mutable struct JobGroup
    name::String
    total::Int
    queuing::Int
    running::Int
    done::Int
    failed::Int
    cancelled::Int
end

JobGroup is computed when displaying a progress meter.

source
JobSchedulers.get_groupFunction
get_group(job::Job, group_seperator = GROUP_SEPERATOR)
get_group(name::AbstractString, group_seperator = GROUP_SEPERATOR)

Return nested_group_names::Vector{String}.

Eg: If job.name is "A: B: 1232", return ["A", "A: B", "A: B: 1232"]

source
JobSchedulers.progress_barFunction
progress_bar(percent::Float64, width::Int = 20)

Return ::String for progress bar whose char length is width.

  • percent: range from 0.0 - 1.0, or to be truncated.

  • width: should be > 3. If <= 10, percentage will not show. If > 10, percentage will show.

source
JobSchedulers.queue_progressFunction
queue_progress(;remove_tmp_files::Bool = true, kwargs...)
queue_progress(stdout_tmp::IO, stderr_tmp::IO;
group_seperator = GROUP_SEPERATOR, wait_second_for_new_jobs::Int = 1, loop::Bool = true, exit_num_jobs::Int = 0)
  • group_seperator: delim to split (job::Job).name to group and specific job names.

  • wait_second_for_new_jobs::Int: if auto_exit, and all jobs are PAST, not quiting queue_progress immediately but wait for a period. If new jobs are submitted, not quiting queue_progress.

  • loop::Bool: if false, only show the current progress and exit.

  • exit_num_jobs::Int: exit when queue() has less than Int number of jobs. It is useful to ignore some jobs that are always running or recurring.

source
JobSchedulers.view_updateFunction
view_update(h, w; row = 1, groups_shown::Vector{JobGroup} = JobGroup[], is_in_terminal::Bool = true, is_interactive = true, group_seperator_at_begining = Regex("^" * GROUP_SEPERATOR.pattern))

Update the whole screen view.

source
JobSchedulers.update_group_state!Function
update_group_state!(job::Job)

This should only be called if JobSchedulers.PROGRESS_METER == true. Update the job's group state, which will be used in Progress Meter.

source

Internal - Const/Variable

const SCHEDULER_ACTION = Base.RefValue{Channel{Int}}()  # defined in __init__()
const SCHEDULER_ACTION_LOCK = ReentrantLock()
const SCHEDULER_PROGRESS_ACTION = Base.RefValue{Channel{Int}}()  # defined in __init__()

SCHEDULER_MAX_CPU::Int = nthreads() > 1 ? nthreads()-1 : Sys.CPU_THREADS
SCHEDULER_MAX_MEM::Int = round(Int, Sys.total_memory() * 0.9)
const JOB_QUEUE = JobQueue(; max_done = JOB_QUEUE_MAX_LENGTH,  max_cancelled = max_done = JOB_QUEUE_MAX_LENGTH)

SCHEDULER_BACKUP_FILE::String = ""

SCHEDULER_WHILE_LOOP::Bool = true

SLEEP_HANDELED_TIME::Int = 10

DESTROY_UNNAMED_JOBS_WHEN_DONE::Bool = true

const ALL_JOB_GROUP = JobGroup("ALL JOBS")
const JOB_GROUPS = OrderedDict{String, JobGroup}()
const OTHER_JOB_GROUP = JobGroup("OTHERS")