API
Const/Variables
const B = 1
const KB = 1024
const MB = 1024KB
const GB = 1024MB
const TB = 1024GB
const QUEUING = :queuing
const RUNNING = :running
const DONE = :done
const FAILED = :failed
const CANCELLED = :cancelled
const PAST = :past # super set of DONE, FAILED, CANCELLED
const cron_none = Cron(:none)
const SCHEDULER_TASK = Base.RefValue{Task}()
const SCHEDULER_REACTIVATION_TASK = Base.RefValue{Task}()
Job
JobSchedulers.Job
— TypeJob(command::Base.AbstractCmd; stdout=nothing, stderr=nothing, append::Bool=false, kwargs...)
Job(f::Function; kwargs...)
Job(task::Task; kwargs...)
Arguments
command::Base.AbstractCmd
: the command to run.f::Function
: the function to run without any arguments, likef()
.task::Task
: the task to run. Eg:@task(1+1)
.
Common Keyword Arguments (kwargs...)
name::String = ""
: job name.user::String = ""
: user that job belongs to.ncpu::Real = 1.0
: number of CPU this job is about to use (can beFloat64
, eg:1.5
will use 150% CPU).mem::Integer = 0
: number of memory this job is about to use (supports TB, GB, MB, KB, B=1).schedule_time::Union{DateTime,Period} = DateTime(0)
: The expected time to run.dependency
: defer job until specified jobs reach specified state (QUEUING, RUNNING, DONE, FAILED, CANCELLED, PAST). PAST is the super set of DONE, FAILED, CANCELLED, which means the job will not run in the future. Eg:DONE => job
,[DONE => job1; PAST => job2]
.
The default state is DONE, so DONE => job
can be simplified to job
. To be compatible with old versions, you can also use job id (Int): [DONE => job.id]
.
wall_time::Period = Year(1)
: wall clock time limit. Jobs will be terminated after running for this period.priority::Int = 20
: lower means higher priority.cron::Cron = Cron(:none)
: job recurring at specfic date and time. See more atCron
.until::Union{DateTime,Period} = DateTime(9999,1,1)
: stop job recurringuntil
date and time.
Experimental Keyword Arguments - Output Redirection:
stdout=nothing
: redirect stdout to the file.stderr=nothing
: redirect stderr to the file.append::Bool=false
: append the stdout or stderr or not.
JobSchedulers.submit!
— Functionsubmit!(job::Job)
submit!(args_of_Job...; kwargs_of_Job...)
Submit the job to queue.
submit!(Job(...))
can be simplified tosubmit!(...)
. They are equivalent.
JobSchedulers.@submit
— Macro@submit [option=value]... expr
Submit a job from expr
. If a Job
is explicitly shown in expr
, DONE => job
will be automatically added to the dependency list.
expr
: any type ofExpr
ession is supported.option = value
: kwargs ofJob
. Ifexpr
is parsed to be aPipelines.Program
,option
s also include its inputs, outputs and run kwargs.
Example
j = @submit 1+1
wait(j)
@assert result(j) == 2
# you can use any keyword arguments that `Job` supports, such as `name`, `ncpu`:
j_2sec = @submit name = "run after 2 sec" begin sleep(2); 32 end
# because `j_2sec isa Job`, `DONE => j_2sec` is pushed to `j2.dependency`.
j2 = @submit mem=2KB begin
1 + result(j_2sec)
end
wait(j2)
@assert result(j2) == 1 + 32
# you can also manually add dependencies not in the `expr`:
j3 = @submit dependency = [PAST => j] println("j3 finished. result of j2 = ", result(j2))
# Note: j3.dependency might be empty after submit, because JobScheduler will remove jobs that reached their states in the dependency list.
@submit
cannot know the elements in a container, so it is unable to walk through and add Job dependencies in a container.
jobs = Job[] # the job container
for i in 1:2
push!(jobs, @submit begin sleep(30);i end) # 10 jobs will be added to `jobs`
end
x = 0
j_something_wrong = @submit for j in jobs
# have to use global x
global x += result(j)
end
# ┌ Warning: Getting result from a running job: returned value might be unexpected.
# └ @ JobSchedulers ~/projects/JobSchedulers.jl/src/jobs.jl:318
result(j_something_wrong)
# MethodError(+, (nothing, nothing), 0x0000000000007b16)
To avoid it, we can (1) use submit!
, or (2) explicitly add dependency = jobs
to @submit
.
x = 0
j_ok = submit!(dependency = jobs) do
for j in jobs
# have to use global x
global x += result(j)
end
end
wait(j_ok)
@assert x == 55
x = 100
j_ok_too = @submit dependency = jobs for j in jobs
# have to use global x
global x += result(j)
end
wait(j_ok_too)
@assert x == 155
JobSchedulers.cancel!
— Functioncancel!(job::Job)
Cancel job
, stop queuing or running.
JobSchedulers.result
— Functionresult(job::Job)
Return the result of job
. If the job is not done, a warning message will also show.
Base.fetch
— Methodfetch(x::Job)
Wait for a Job
to finish, then return its result value. If the task fails with an exception, a TaskFailedException
(which wraps the failed task) is thrown.
JobSchedulers.isqueuing
— Functionisqueuing(j::Job) :: Bool
JobSchedulers.isrunning
— Functionisrunning(j::Job) :: Bool
JobSchedulers.isdone
— Functionisdone(j::Job) :: Bool
JobSchedulers.iscancelled
— Functioniscancelled(j::Job) :: Bool
JobSchedulers.isfailed
— Functionisfailed(j::Job) :: Bool
JobSchedulers.ispast
— Functionispast(j::Job) :: Bool = j.state === DONE || j.state === CANCELLED || j.state === FAILED
Submit Job within Job
JobSchedulers.@yield_current
— Macro@yield_current expr
Used to prevent wasting threads and even blocking JobScheduler when submitting jobs within jobs.
If @yield_current
is called within a Job
's scope, this Job
is considered as the current_job
. If not called within a job's scope, expr
will be evaluated as it is.
Within the expr
block, the current job's ncpu
is temporarily set to 0
, and the thread of the current job can be occupied by any child jobs (including grand-child jobs).
Once leaving the expr
block, the current job's ncpu
is resumed, and its thread is not able to occupied.
Therefore, the child jobs need to be submitted and waited within the expr
block.
Example:
using JobSchedulers
parent_job = Job(ncpu=1) do
# imagine preparing A and B takes lots of time
A = 0
B = 0
# compute A and B in parallel (imagine they take lots of time)
@yield_current begin
child_job_A = @submit A += 100
child_job_B = @submit B += 1000
wait(child_job_A)
wait(child_job_B)
end
# do computation based on results of child jobs
return A + B
end
submit!(parent_job)
res = fetch(parent_job)
# 1100
If not using @yield_current
, the thread taken by the current job is wasted when waiting.
In an extreme condition, the JobScheduler can be blocked if all threads are taken by parent jobs, and if their child jobs do not have any threads to run.
To experience the blockage, you can start Julia with julia -t 1,1
, and run the example code without @yield_current
. You may kill the julia session by pressing Ctrl+C 10 times.
JobSchedulers.yield_current
— Functionyield_current(f::Function) = @yield_current f()
The function version of @yield_current
.
Prevent wasting threads and even blocking JobScheduler when submitting jobs within jobs.
See details at @yield_current
.
JobSchedulers.current_job
— Functioncurrent_job() :: Union{Job, Nothing}
Return the Job
that is running in the current scope, nothing
if the current scope is not within a job.
Cron: Job Recur/Repeat
JobSchedulers.Cron
— TypeCron(second, minute, hour, day_of_month, month, day_of_week)
Cron(;
second = 0,
minute = '*',
hour = '*',
day_of_month = '*',
month = '*',
day_of_week = '*',
)
Cron
stores the schedule of a repeative Job
, implemented according to Linux-based crontab
(5) table.
Jobs are executed when the second, minute, hour and month fields match the current time. If neither day_of_month
nor day_of_week
starts with *
, cron takes the union (∪) of their values day_of_month ∪ day_of_week
. Otherwise cron takes the intersection (∩) of their values day_of_month ∩ day_of_week
.
When an argument is an Int
:
Field | Allowed values |
---|---|
second | 0-59 |
minute | 0-59 |
hour | 0-23 |
day_of_month | 1-31 |
month | 1-12 |
day_of_week | 1-7 (1 is Monday) |
- Typical Linux distributions do not have
second
filed as JobSchedulers. - Sunday is only coded
7
in JobSchedulers, while it is0
or7
in Linux, so the behaviors likeday_of_week = "*/2"
are different in two systems. - From JobSchedulers v0.11,
Cron
has been rewritten based on the standard crontab, including its bug described here.
When an argument is a String
or Char
:
An argument may be an asterisk (*
), which always stands for $first-last$.
Ranges of numbers are allowed. Ranges are two numbers separated with a hyphen. The specified range is inclusive. For example, 8-11 for an $hours$ entry specifies execution at hours 8, 9, 10 and 11.
Lists are allowed. A list is a set of numbers (or ranges) separated by commas. Examples: "1,2,5,9"
, "0-4,8-12"
.
Step values can be used in conjunction with ranges. Following a range with /<number>
specifies skips of the number's value through the range. For example, "0-23/2"
can be used in the hour
argument to specify Job execution every other hour (the alternative is "0,2,4,6,8,10,12,14,16,18,20,22"
). Steps are also permitted after an asterisk, so if you want to say $every two hours$, just use "*/2"
.
When an argument is a Vector
:
Vector
works like lists mentioned above. For example, [1,2,5,9]
is equivalent to "1,2,5,9"
.
When an argument is a UInt64
:
UInt64
is the internal type of Cron
fileds. All the previous types will be converted to a UInt64
bit array. The start index of the bit array is 0. Bits outside of the allowed values (see the table above) are ignored.
Cron(special::Symbol)
Instead of the six arguments of Cron
, one of the following special symbols may appear instead:
special | Meaning |
---|---|
:yearly | Run once a year, Cron(0,0,0,1,1,0) |
:annually | (same as :yearly ) |
:monthly | Run once a month, Cron(0,0,0,1,'*','*') |
:weekly | Run once a week, Cron(0,0,0,'*','*',1) |
:daily | Run once a day, Cron(0,0,0,'*','*','*') |
:midnight | (same as :daily ) |
:hourly | Run once an hour, Cron(0,0,'*','*','*','*') |
:none | Never repeat, Cron(0,0,0,0,0,0) |
Caution: Linux crontab's special :reboot
is not supported here.
To run every minute, just use Cron()
.
JobSchedulers.cron_value_parse
— Functioncron_value_parse(value::UInt64)
cron_value_parse(value::Signed)
cron_value_parse(value::AbstractString)
cron_value_parse(value::Char)
cron_value_parse(value::Vector)
cron_value_parse(*) = cron_value_parse('*')
Parse crontab-like value to UInt64
. See details: Cron
.
Dates.tonext
— MethodDates.tonext(dt::DateTime, c::Cron; same::Bool = false) -> Union{DateTime, Nothing}
Dates.tonext(t::Time, c::Cron; same::Bool = false) -> Time
Dates.tonext(d::Date, c::Cron; same::Bool = false, limit::Date = d + Day(3000)) -> Union{DateTime, Nothing}
Adjust date or time to the next one corresponding to c::Cron
. Setting same=true
allows the current date or time to be considered as the next one, allowing for no adjustment to occur.
Queue
JobSchedulers.queue
— Functionqueue(; all::Bool = false) -> Vector{Job}
queue(state::Symbol ) -> Vector{Job}
queue(needle) -> Vector{Job}
queue(state::Symbol , needle) -> Vector{Job}
queue(needle, state::Symbol ) -> Vector{Job}
queue(id::Integer) -> Job
all::Bool
: if true, get all jobs. if false, get only running and queuing jobs.state::Symbol
: get jobs with a specific state, including:all
,QUEUING
,RUNNING
,DONE
,FAILED
,CANCELLED
,PAST
.PAST
is the superset ofDONE
,FAILED
,CANCELLED
.needle::Union{AbstractString,AbstractPattern,AbstractChar}
: get jobs if they containneedle
in their name or user.id::Integer
: get the job with the specificid
.
JobSchedulers.all_queue
— Functionall_queue()
all_queue(id::Integer)
all_queue(state::Symbol)
all_queue(needle::Union{AbstractString,AbstractPattern,AbstractChar})
state::Symbol
: get jobs with a specific state, including:all
,QUEUING
,RUNNING
,DONE
,FAILED
,CANCELLED
,PAST
.PAST
is the superset ofDONE
,FAILED
,CANCELLED
.needle::Union{AbstractString,AbstractPattern,AbstractChar}
: get jobs if they containneedle
in their name or user.id::Integer
: get the job with the specificid
.
JobSchedulers.job_query
— Functionjob_query_by_id(id::Integer)
Search job by job.id
in the queue.
Return job::Job
if found, nothing
if not found.
JobSchedulers.job_query_by_id
— Functionjob_query_by_id(id::Integer)
Search job by job.id
in the queue.
Return job::Job
if found, nothing
if not found.
Wait For Jobs
JobSchedulers.wait_queue
— Functionwait_queue(;show_progress::Bool = false, exit_num_jobs::Int = 0)
Wait for all jobs in queue()
become finished.
show_progress = true
, job progress will show.exit_num_jobs::Int
: exit whenqueue()
has less thanInt
number of jobs. It is useful to ignore some jobs that are always running or recurring.
See also: queue_progress
.
Base.wait
— Methodwait(j::Job)
wait(js::Vector{Job})
Wait for the job(s) to be finished.
Scheduler Settings
JobSchedulers.set_scheduler
— Functionset_scheduler(;
max_cpu::Real = JobSchedulers.SCHEDULER_MAX_CPU,
max_mem::Real = JobSchedulers.SCHEDULER_MAX_MEM,
max_job::Int = JobSchedulers.JOB_QUEUE.max_done,
max_cancelled_job::Int = JobSchedulers.JOB_QUEUE.max_cancelled_job
)
max_job
: the number of jobs done. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.max_cancelled_job
: the number of cancelled jobs. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.
See details: set_scheduler_max_cpu
, set_scheduler_max_mem
, set_scheduler_max_job
JobSchedulers.set_scheduler_max_cpu
— Functionset_scheduler_max_cpu(ncpu::Int = default_ncpu())
set_scheduler_max_cpu(percent::Float64)
Set the maximum CPU (thread) the scheduler can use. If starting Julia with multiple threads in the default thread pool, the maximum CPU is the number of tids in the default thread pool not equal to 1.
Example
set_scheduler_max_cpu() # use all available CPUs
set_scheduler_max_cpu(4) # use 4 CPUs
set_scheduler_max_cpu(0.5) # use 50% of CPUs
JobSchedulers.set_scheduler_max_mem
— Functionset_scheduler_max_mem(mem::Integer = default_mem())
set_scheduler_max_mem(percent::AbstractFloat)
Set the maximum RAM the scheduler can use.
Example
set_scheduler_max_mem() # use 80% of total memory
set_scheduler_max_mem(4GB) # use 4GB memory
set_scheduler_max_mem(4096MB)
set_scheduler_max_mem(4194304KB)
set_scheduler_max_mem(4294967296B)
set_scheduler_max_mem(0.5) # use 50% of total memory
JobSchedulers.set_scheduler_max_job
— Functionset_scheduler_max_job(max_done::Int = 10000, max_cancelled::Int = max_done)
Set the number of finished jobs. If number of jobs exceed 1.5*NUMBER, old jobs will be delete.
JobSchedulers.destroy_unnamed_jobs_when_done
— FunctionJobSchedulers.destroy_unnamed_jobs_when_done(b::Bool)
JobSchedulers.set_group_seperator
— Functionset_group_seperator(group_seperator::Regex) = global GROUP_SEPERATOR = group_seperator
Set the group seperator. Group seperator is used to group the names of Jobs. Used when display the progress meter using wait_queue(show_progress=true)
Default is r": *"
.
JobSchedulers.GROUP_SEPERATOR
— ConstantGROUP_SEPERATOR::Regex = r": *"
Group seperator is used to group the names of Jobs. Used when display the progress meter using wait_queue(show_progress=true)
To set it, use set_group_seperator(group_seperator::Regex)
.
Scheduler Control
Scheduler is automatically started, so it is not necessary to start/stop it.
JobSchedulers.scheduler_status
— Functionscheduler_status() :: Symbol
Print the settings and status of job scheduler. Return :not_running
or :running
.
JobSchedulers.scheduler_start
— Functionscheduler_start()
Start the job scheduler.
JobSchedulers.scheduler_stop
— Functionscheduler_stop()
Stop the job scheduler.
Optimize CPU Usage
JobSchedulers.solve_optimized_ncpu
— Functionsolve_optimized_ncpu(default::Int;
ncpu_range::UnitRange{Int} = 1:total_cpu,
njob::Int = 1,
total_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU,
side_jobs_cpu::Int = 0)
Find the optimized number of CPU for a job.
default
: default ncpu of the job.ncpu_range
: the possible ncpu range of the job.njob
: number of the same job.total_cpu
: the total CPU that can be used by JobSchedulers.side_jobs_cpu
: some small jobs that might be run when the job is running, so the job won't use up all of the resources and stop small tasks.
Backup
JobSchedulers.set_scheduler_backup
— Functionset_scheduler_backup(
filepath::AbstractString = "";
migrate::Bool = false,
delete_old::Bool = false,
recover_settings::Bool = true,
recover_queue::Bool = true
)
Set the backup file of job scheduler.
If filepath
was set to ""
, stop backup at exit.
If filepath
was set to an existing file, recover_settings
or recover_queue
from filepath
immediately.
If filepath
was set to a new file, the backup file will be created at exit.
If migrate=true
and the old JobSchedulers.SCHEDULER_BACKUP_FILE
exists, the old backup file will be recovered before recovering from filepath
.
JobSchedulers.backup
— Functionbackup()
Manually backup job scheduler settings and queues. The function is automatically triggered at exit.
Internal
Internal - Const/Variable
JobSchedulers.THREAD_POOL
— Constantconst THREAD_POOL = Base.RefValue{Channel{Int}}()
Defined in __init__()
.
If version > 1.9, THREAD_POOL
contains only tids in Threads.threadpooltids(:default)
.
Also, the thread 1 is reserved for JobScheduler.
JobSchedulers.SINGLE_THREAD_MODE
— Constantconst SINGLE_THREAD_MODE = Base.RefValue{Bool}()
Defined in __init__()
. Whether Threads.threadpooltids(:default)
are empty or == [1]
.
JobSchedulers.TIDS
— Constantconst TIDS = Vector{Int}()
Defined in __init__()
. All tids in the default thread pool, excluding tid 1.
JobSchedulers.CURRENT_JOB
— Constantconst CURRENT_JOB = ScopedValue{Union{Job, Nothing}}(nothing)
The Job
that is running in the current scope, nothing
if the current scope is not within a job.
See also current_job
.
JobSchedulers.OCCUPIED_MARK
— Constantconst OCCUPIED_MARK = 1<<30
A large number added to job._thread_id
to mark the thread is yielded.
const SKIP = UInt8(0)
const OK = UInt8(1)
const FAIL = UInt8(2)
const SCHEDULER_ACTION = Base.RefValue{Channel{Int}}() # defined in __init__()
const SCHEDULER_PROGRESS_ACTION = Base.RefValue{Channel{Int}}() # defined in __init__()
SCHEDULER_MAX_CPU::Int = -1 # set in __init__
SCHEDULER_MAX_MEM::Int64 = Int64(-1) # set in __init__
SCHEDULER_UPDATE_SECOND::Float64 = 0.05 # set in __init__
const JOB_QUEUE = JobQueue(; max_done = JOB_QUEUE_MAX_LENGTH, max_cancelled = max_done = JOB_QUEUE_MAX_LENGTH)
SCHEDULER_BACKUP_FILE::String = ""
SCHEDULER_WHILE_LOOP::Bool = true
SLEEP_HANDELED_TIME::Int = 10
DESTROY_UNNAMED_JOBS_WHEN_DONE::Bool = true
const ALL_JOB_GROUP = JobGroup("ALL JOBS")
const JOB_GROUPS = OrderedDict{String, JobGroup}()
const OTHER_JOB_GROUP = JobGroup("OTHERS")
Internal - Thread Utils
JobSchedulers.schedule_thread
— Functionschedule_thread(j::Job) :: UInt8
Internal: assign TID to and run the job.
JobSchedulers.free_thread
— Functionfree_thread(j::Job)
Make thread available again after job is finished.
JobSchedulers.is_tid_ready_to_occupy
— Functionis_tid_ready_to_occupy(j::Job) :: Bool
Return Bool. true
means job is able to occupy (j.ncpu == 0 && j._thread_id > 0
) and not occupied (j._thread_id & OCCUPIED_MARK == 0
).
JobSchedulers.is_tid_occupied
— Functionis_tid_occupied(j::Job) :: Bool
Return Bool. false
means job not occupied (j._thread_id & OCCUPIED_MARK != 0
) or not able to occupy (j._thread_id <= 0
).
JobSchedulers.unsafe_occupy_tid!
— Functionunsafe_occupy_tid!(j::Job) :: Int
Return the j._thread_id
before changing, and mark the tid as occupied by adding OCCUPIED_MARK
to j._thread_id
.
Unsafe when:
- If
j._thread_id <= 0
orj
is occupied,j._thread_id
will not be changed, and returnj._thread_id
.
JobSchedulers.unsafe_unoccupy_tid!
— Functionunsafe_unoccupy_tid!(j::Job) :: Bool
Remove OCCUPIED_MARK
from j._thread_id
to mark the tid is ready to yield. Please ensure the tid is occupied (is_tid_occupied(j)
) before calling this function.
Unsafe when:
- If
j._thread_id < 0
, the tid will be changed to a very small negative number, which does not make sense. - If
j._thread_id == 0
, the tid will remain 0. - If
j
is not occupied, the tid will remain unchanged.
JobSchedulers.unsafe_original_tid
— Function@inline unsafe_original_tid(j::Job) = j._thread_id & ~OCCUPIED_MARK
Only useful when j._thread_id > 0
. Used to get the original tid of a job, without OCCUPIED_MARK
.
Internal - LinkedJobList
JobSchedulers.LinkedJobList
— TypeLinkedJobList()
LinkedJobList(j::Job)
Every job can only be in ONE LinkedJobList
. All iterative copy of LinkedJobList
will output Vector{Job}
.
Base.deleteat!
— Methoddeleteat!(l::LinkedJobList, node::Job)
Delete node::Job
from l::LinkedJobList
.
Caution: user needs to confirm that node::Job
shall in l::LinkedJobList
. This function will not check it.
Internal - Scheduling
JobSchedulers.JobQueue
— TypeJobQueue(; max_done::Int = 10000, max_cancelled::Int = 10000)
mutable struct JobQueue
const queuing::SortedDict{Int,LinkedJobList,Base.Order.ForwardOrdering} # priority => Job List
const queuing_0cpu::LinkedJobList # ncpu = 0, can run immediately
const future::LinkedJobList # all jobs with schedule_time > now()
const running::LinkedJobList const running::Vector{Job}
const done::Vector{Job}
const failed::Vector{Job}
const cancelled::Vector{Job}
max_done::Int
max_cancelled::Int
const lock_queuing::ReentrantLock
const lock_running::ReentrantLock
const lock_past::ReentrantLock
end
JobSchedulers.scheduler
— Methodscheduler()
The function of running Job's scheduler. It needs to be called by scheduler_start()
, rather than calling directly.
JobSchedulers.istaskfailed2
— Functionistaskfailed2(t::Task)
Extend Base.istaskfailed
to fit Pipelines and JobSchedulers packages, which will return a StackTraceVector
in t.result
, while Base considered it as :done
. The function checks the situation and modifies the real task status and other properties.
JobSchedulers.unsafe_run!
— Functionunsafe_run!(job::Job, current::DateTime=now()) :: UInt8
Jump the queue and run job
immediately, no matter what other jobs are running or waiting.
Return:
OK
: successfully scheduled to run.SKIP
: no thread is available, skip this time.FAIL
: failed to schedule.
Caution: it will not trigger scheduler_need_action()
.
JobSchedulers.unsafe_cancel!
— Functionunsafe_cancel!(job::Job, current::DateTime=now())
Caution: it is unsafe and should only be called within lock. Do not call from other module.
Caution: it will not trigger scheduler_need_action()
.
JobSchedulers.unsafe_update_state!
— Functionunsafe_update_state!(job::Job)
Update the state of job
from job.task
when job.state === :running
.
Caution: it is unsafe and should only be called within lock.
JobSchedulers.is_dependency_ok
— Functionis_dependency_ok(job::Job)::Bool
Caution: run it within lock only.
Algorithm: Break while loop when found dep not ok, and change job._dep_check_id
to the current id.
If dep is provided as Integer, query Integer for job and then replace Integer with the job.
JobSchedulers.set_scheduler_while_loop
— Functionset_scheduler_while_loop(b::Bool)
if set to false, the scheduler will stop.
JobSchedulers.get_priority
— Functionget_priority(job::Job) = job.priority
JobSchedulers.get_thread_id
— Functionget_thread_id(job::Job) = job._thread_id
JobSchedulers.date_based_on
— Functiondate_based_on(c::Cron) -> Symbol
Whether date of c
is based on :day_of_week
, :day_of_month
, :union
, :intersect
, :everyday
, :none
, or :undefined
.
JobSchedulers.next_recur_job
— Functionnext_recur_job(j::Job) -> Union{Job, Nothing}
Based on j.cron
and j.until
, return a new recurring Job
or nothing
.
Internal - Progress Meter
To display a progress meter, please use wait_queue(show_progress = true)
.
JobSchedulers.JobGroup
— Typemutable struct JobGroup
name::String
total::Int
queuing::Int
running::Int
done::Int
failed::Int
cancelled::Int
end
JobGroup
is computed when displaying a progress meter.
JobSchedulers.get_group
— Functionget_group(job::Job, group_seperator = GROUP_SEPERATOR)
get_group(name::AbstractString, group_seperator = GROUP_SEPERATOR)
Return nested_group_names::Vector{String}
.
Eg: If job.name
is "A: B: 1232"
, return ["A", "A: B", "A: B: 1232"]
JobSchedulers.progress_bar
— Functionprogress_bar(percent::Float64, width::Integer = 20)
Return ::String for progress bar whose char length is width
.
percent
: range from 0.0 - 1.0, or to be truncated.width
: should be > 3. If <= 10, percentage will not show. If > 10, percentage will show.
JobSchedulers.queue_progress
— Functionqueue_progress(;remove_tmp_files::Bool = true, kwargs...)
queue_progress(stdout_tmp::IO, stderr_tmp::IO;
group_seperator = GROUP_SEPERATOR, wait_second_for_new_jobs::Int = 1, loop::Bool = true, exit_num_jobs::Int = 0)
group_seperator
: delim to split(job::Job).name
to group and specific job names.wait_second_for_new_jobs::Int
: ifauto_exit
, and all jobs are PAST, not quitingqueue_progress
immediately but wait for a period. If new jobs are submitted, not quitingqueue_progress
.loop::Bool
: if false, only show the current progress and exit.exit_num_jobs::Int
: exit whenqueue()
has less thanInt
number of jobs. It is useful to ignore some jobs that are always running or recurring.
JobSchedulers.view_update
— Functionview_update(h, w; row = 1, groups_shown::Vector{JobGroup} = JobGroup[], is_in_terminal::Bool = true, is_interactive = true, group_seperator_at_begining = Regex("^" * GROUP_SEPERATOR.pattern))
Update the whole screen view.
JobSchedulers.PROGRESS_METER
— ConstantBool. Showing progress meter? Related to progress computation and display. true when waitqueue(showprogress=true)
JobSchedulers.update_group_state!
— Functionupdate_group_state!(job::Job)
This should only be called if JobSchedulers.PROGRESS_METER == true
. Update the job's group state, which will be used in Progress Meter.
JobSchedulers.init_group_state!
— Methodinit_group_state!()
Prepare group state for existing jobs