Use Cases
JobSchedulers.jl can used to glue commands in a pipeline/workflow, and can also be used in pure Julia functions.
Since v0.10, the speed of scheduling is 200-400X faster than previous versions, and can be used to replace Threads.@threads for ... end
. Scheduling 100,000 jobs can be done within 0.2 seconds.
Parallel Nested Loops
One of the many applications of scheduling systems is that it can be used as a drop-in replacement for nested multi-threaded loops that would otherwise be written with Threads.@threads
.
Consider a simplified scenario where you want to calculate the maximum mean values of random samples of various lengths that have been generated by several distributions provided by the Distributions.jl package. The results should be collected into a DataFrame. We have the following function:
using Random, Distributions, StatsBase, DataFrames
function f(dist, len, reps, σ)
v = Vector{Float64}(undef, len) # avoiding allocations
maximum(mean(rand!(dist, v)) for _ in 1:reps)/σ
end
Let us consider the following probability distributions for numerical experiments, all of which have expected values equal to zero, and the following lengths of vectors:
dists = [Cosine, Epanechnikov, Laplace, Logistic, Normal, NormalCanon, PGeneralizedGaussian, SkewNormal, SkewedExponentialPower, SymTriangularDist]
lens = [10, 20, 50, 100, 200, 500]
Using Threads.@threads
those experiments could be parallelized as:
using Base.Threads
function experiments_threads(dists, lens, K=1000)
res = DataFrame()
lck = ReentrantLock()
Threads.@threads for T in dists
dist = T()
σ = std(dist)
for L in lens
z = f(dist, L, K, σ)
Threads.lock(lck) do
push!(res, (;T, σ, L, z))
end
end
end
res
end
experiments_threads(dists, lens, 1000)
@time experiments_threads(dists, lens, 10000)
# 6.932880 seconds (746 allocations: 110.828 KiB)
# 6.868636 seconds (751 allocations: 110.906 KiB)
# 7.023086 seconds (746 allocations: 110.828 KiB)
Note that DataFrames.push!
is not a thread safe operation and hence we need to utilize a locking mechanism in order to avoid two threads appending the DataFrame at the same time.
The same code could be rewritten in JobSchedulers as:
using JobSchedulers
function experiments_jobschedulers(dists, lens, K=1000)
res = DataFrame()
for T in dists
dist = T()
σ = @submit std(dist)
for L in lens
z = @submit f(dist, L, K, result(σ))
push!(res, (;T, σ, L, z))
end
end
res.z = fetch.(res.z)
res.σ = fetch.(res.σ)
res
end
experiments_jobschedulers(dists, lens, 1000)
@time experiments_jobschedulers(dists, lens, 10000)
# 3.682429 seconds (4.68 k allocations: 268.984 KiB)
# 3.687437 seconds (4.77 k allocations: 270.609 KiB)
# 3.755103 seconds (4.74 k allocations: 269.812 KiB)
In this code we have job interdependence. Firstly, we are calculating the standard deviation σ
, and then we are using that value in the function f
. Here, submit!
wraps a task or a 0-argument function. Since submit!
yields a Job
rather than actual values, we need to use the result
function to obtain those values. Because computing z
requires completion of σ
, we need to add argument dependency=σ
to submit!
. In the last, after all jobs are submitted, we use fetch
to wait for each job to finish and return its value.
Also, note that contrary to the previous example, we do not need to implement locking as we are just pushing the Job
results of submit!
serially into the DataFrame (which is fast since submit!
doesn't block).
The above use case scenario has been tested by running julia -t 8
(or with JULIA_NUM_THREADS=8
as environment variable). The Threads.@threads
code takes 7.1 seconds to run, while the JobSchedulers code, runs around 3.7 seconds, resulting in a 1.8x speedup. To be noted, unlike Base.Threads
, JobSchedulers only use nthreads() - 1 = 7
threads to compute jobs, so the real speedup is 1.8 * 8/7 = 2.1
x.
Parallel Nested Loops was copied and edited from Dagger.jl's document. Most information are the same, except that JobSchedulers.jl was used.
A Workflow Example With Pipelines.jl
- Run
prog_A
with 2 threads and 4GB RAM. - Run
prog_B
with 8 threads. - After
prog_A
finished, runprog_C
(2 threads). - After
prog_B
andprog_C
finished, runprog_D
(12 threads)
The flowchart is like:
The Julia code:
using JobSchedulers, Pipelines
prog_A = CmdProgram(...)
prog_B = JuliaProgram(...)
prog_C = CmdProgram(...)
prog_D = JuliaProgram(...)
job_A = submit!(prog_A, A_args..., ncpu = 2, mem = 4GB)
job_B = submit!(prog_B, B_args..., ncpu = 8)
job_C = submit!(prog_C, C_args..., ncpu = 2,
dependency = job_A)
job_D = submit!(prog_D, D_args..., ncpu = 12,
dependency = [PAST => job_B, job_C])
wait_queue()
Overhead Test of Scheduling Systems
To test the overhead of scheduling systems, I compared Base.Threads
, Dagger.jl
, and JobSchedulers
using tiny tasks (x::Int += y::Int
) on seperate 24-thread Julia sessions on a platform with i9-13900K, 196GB DDR5 memory.
x += y
is not thread-safe, and it is for overhead test only. BenchmarkTools.jl cannot be used in this case because it competes scheduling systems.
Scripts
overhead-baseline.jl:
using .Threads
function experiments_threads(a, K=10000)
x = 0
Threads.@threads for i in 1:K
x += a
end
x
end
# compile
experiments_threads(1, 10)
# test
@time experiments_threads(1, 10000) # 0.000176 seconds (9.54 k allocations: 159.828 KiB)
@time experiments_threads(1, 100000) # 0.001873 seconds (99.61 k allocations: 1.530 MiB)
overhead-jobschedulers.jl:
using JobSchedulers
function experiments_jobschedulers(a, K=10000)
x = 0
f() = x += a
for i in 1:K
submit!(f)
end
wait_queue()
x
end
# compile
experiments_jobschedulers(1, 10)
# test
@time experiments_jobschedulers(1, 10000) # 0.010370 seconds (125.72 k allocations: 10.086 MiB)
@time experiments_jobschedulers(1, 100000) # 0.247005 seconds (1.26 M allocations: 100.839 MiB, 21.43% gc time)
overhead-dagger.jl:
using Dagger
function experiments_dagger(a, K=10000)
x = 0
f() = x += a
@sync for i in 1:K
Dagger.@spawn f()
end
x
end
# compile
experiments_dagger(1, 10)
# test
@time experiments_dagger(1, 10000) # 0.846881 seconds (26.50 M allocations: 1.828 GiB, 29.06% gc time)
@time experiments_dagger(1, 100000) # dead lock, hang for more than 60s
Results
JobSchedulers can schedule 10,000 tasks within 0.01 second, while Dagger uses 0.85 second.
JobSchedulers is stable even scheduling 100,000 tasks, while Dagger seems encountered a dead lock causing system hung.
Number of Task | 10,000 | 100,000 |
---|---|---|
Base.Threads (second) | 0.000176 | 0.001873 |
JobSchedulers (second) | 0.010370 | 0.247005 |
Dagger (second) | 0.846881 | failed, hung (dead lock) |