Use Cases

JobSchedulers.jl can used to glue commands in a pipeline/workflow, and can also be used in pure Julia functions.

Since v0.10, the speed of scheduling is 200-400X faster than previous versions, and can be used to replace Threads.@threads for ... end. Scheduling 100,000 jobs can be done within 0.2 seconds.

Parallel Nested Loops

One of the many applications of scheduling systems is that it can be used as a drop-in replacement for nested multi-threaded loops that would otherwise be written with Threads.@threads.

Consider a simplified scenario where you want to calculate the maximum mean values of random samples of various lengths that have been generated by several distributions provided by the Distributions.jl package. The results should be collected into a DataFrame. We have the following function:

using Random, Distributions, StatsBase, DataFrames

function f(dist, len, reps, σ)
    v = Vector{Float64}(undef, len) # avoiding allocations
    maximum(mean(rand!(dist, v)) for _ in 1:reps)/σ
end

Let us consider the following probability distributions for numerical experiments, all of which have expected values equal to zero, and the following lengths of vectors:

dists = [Cosine, Epanechnikov, Laplace, Logistic, Normal, NormalCanon, PGeneralizedGaussian, SkewNormal, SkewedExponentialPower, SymTriangularDist]
lens = [10, 20, 50, 100, 200, 500]

Using Threads.@threads those experiments could be parallelized as:

using Base.Threads

function experiments_threads(dists, lens, K=1000)
    res = DataFrame()
    lck = ReentrantLock()
    Threads.@threads for T in dists
        dist = T()
        σ = std(dist)
        for L in lens
            z = f(dist, L, K, σ)
            Threads.lock(lck) do
                push!(res, (;T, σ, L, z))
            end
        end
    end
    res
end

experiments_threads(dists, lens, 1000)
@time experiments_threads(dists, lens, 10000)
#  6.932880 seconds (746 allocations: 110.828 KiB)
#  6.868636 seconds (751 allocations: 110.906 KiB)
#  7.023086 seconds (746 allocations: 110.828 KiB)

Note that DataFrames.push! is not a thread safe operation and hence we need to utilize a locking mechanism in order to avoid two threads appending the DataFrame at the same time.

The same code could be rewritten in JobSchedulers as:

using JobSchedulers

function experiments_jobschedulers(dists, lens, K=1000)
    res = DataFrame()
    for T in dists
        dist = T()
        σ = @submit std(dist)
        for L in lens
            z = @submit f(dist, L, K, result(σ))
            push!(res, (;T, σ, L, z))
        end
    end
    res.z = fetch.(res.z)
    res.σ = fetch.(res.σ)
    res
end

experiments_jobschedulers(dists, lens, 1000)
@time experiments_jobschedulers(dists, lens, 10000)
#  3.682429 seconds (4.68 k allocations: 268.984 KiB)
#  3.687437 seconds (4.77 k allocations: 270.609 KiB)
#  3.755103 seconds (4.74 k allocations: 269.812 KiB)

In this code we have job interdependence. Firstly, we are calculating the standard deviation σ, and then we are using that value in the function f. Here, submit! wraps a task or a 0-argument function. Since submit! yields a Job rather than actual values, we need to use the result function to obtain those values. Because computing z requires completion of σ, we need to add argument dependency=σ to submit!. In the last, after all jobs are submitted, we use fetch to wait for each job to finish and return its value.

Also, note that contrary to the previous example, we do not need to implement locking as we are just pushing the Job results of submit! serially into the DataFrame (which is fast since submit! doesn't block).

The above use case scenario has been tested by running julia -t 8 (or with JULIA_NUM_THREADS=8 as environment variable). The Threads.@threads code takes 7.1 seconds to run, while the JobSchedulers code, runs around 3.7 seconds, resulting in a 1.8x speedup. To be noted, unlike Base.Threads, JobSchedulers only use nthreads() - 1 = 7 threads to compute jobs, so the real speedup is 1.8 * 8/7 = 2.1x.

Citation

Parallel Nested Loops was copied and edited from Dagger.jl's document. Most information are the same, except that JobSchedulers.jl was used.

A Workflow Example With Pipelines.jl

  • Run prog_A with 2 threads and 4GB RAM.
  • Run prog_B with 8 threads.
  • After prog_A finished, run prog_C (2 threads).
  • After prog_B and prog_C finished, run prog_D (12 threads)

The flowchart is like:

workflow flowchart

The Julia code:

using JobSchedulers, Pipelines

prog_A = CmdProgram(...)
prog_B = JuliaProgram(...)
prog_C = CmdProgram(...)
prog_D = JuliaProgram(...)

job_A = submit!(prog_A, A_args..., ncpu = 2, mem = 4GB)

job_B = submit!(prog_B, B_args..., ncpu = 8)

job_C = submit!(prog_C, C_args..., ncpu = 2,
                dependency = job_A)

job_D = submit!(prog_D, D_args..., ncpu = 12, 
                dependency = [PAST => job_B, job_C])

wait_queue()

Overhead Test of Scheduling Systems

To test the overhead of scheduling systems, I compared Base.Threads, Dagger.jl, and JobSchedulers using tiny tasks (x::Int += y::Int) on seperate 24-thread Julia sessions on a platform with i9-13900K, 196GB DDR5 memory.

Warning

x += y is not thread-safe, and it is for overhead test only. BenchmarkTools.jl cannot be used in this case because it competes scheduling systems.

Scripts

overhead-baseline.jl:

using .Threads
function experiments_threads(a, K=10000)
    x = 0
    Threads.@threads for i in 1:K
        x += a
    end
    x
end

# compile
experiments_threads(1, 10)

# test
@time experiments_threads(1, 10000)  #   0.000176 seconds (9.54 k allocations: 159.828 KiB)
@time experiments_threads(1, 100000) #   0.001873 seconds (99.61 k allocations: 1.530 MiB)

overhead-jobschedulers.jl:

using JobSchedulers

function experiments_jobschedulers(a, K=10000)
    x = 0
    f() = x += a
    for i in 1:K
        submit!(f)
    end
    wait_queue()
    x
end

# compile
experiments_jobschedulers(1, 10)

# test
@time experiments_jobschedulers(1, 10000)  # 0.010370 seconds (125.72 k allocations: 10.086 MiB)
@time experiments_jobschedulers(1, 100000) # 0.247005 seconds (1.26 M allocations: 100.839 MiB, 21.43% gc time)

overhead-dagger.jl:

using Dagger
function experiments_dagger(a, K=10000)
    x = 0
    f() = x += a
    @sync for i in 1:K
        Dagger.@spawn f()
    end
    x
end

# compile
experiments_dagger(1, 10)

# test
@time experiments_dagger(1, 10000)  # 0.846881 seconds (26.50 M allocations: 1.828 GiB, 29.06% gc time)
@time experiments_dagger(1, 100000) # dead lock, hang for more than 60s

Results

  • JobSchedulers can schedule 10,000 tasks within 0.01 second, while Dagger uses 0.85 second.

  • JobSchedulers is stable even scheduling 100,000 tasks, while Dagger seems encountered a dead lock causing system hung.

Number of Task10,000100,000
Base.Threads (second)0.0001760.001873
JobSchedulers (second)0.0103700.247005
Dagger (second)0.846881failed, hung (dead lock)