Julia Program

Please read the Command Program section in Manual page first.

Pipelines are built with multiple Programs. Program is the abstract type contains CmdProgram and JuliaProgram.

Differences between CmdProgram and JuliaProgram

JuliaProgram and CmdProgram are generally the same and remain most compatibility, except for the differences:

Diffcp :: CmdProgramjp :: JuliaProgram
Unique Fieldcp has a command template: cp.cmd::AbstractCmdjp has a main Julia code: jp.main that can be defined using quote .. end .
Method to RunReplace keywords in cp.cmd with values in user defined inputs and outputs}If main is a Expr, the variables mentioned in inputs and outputs will be replaced to the inputs["VAR"] format, and then a Julia function returning outputs::Dict{String} is created and called. If main isa Function, it just invokes jp.main(inputs::Dict{String}, outputs::Dict).
Returned Outputsoutputs provided in run(...) does not changeoutputs will only be overwritten by the returned value of jp.main when the returned value is a Dict and passes p.validate_outputs.
Dry RunReturn (replaced_cmd::AbstractCmd, run_id_file::String)Return (fake_outputs::Dict{String}, run_id_file::String)


JuliaProgram can be built with this method:

JuliaProgram <: Program

    name::String                            = "Julia Program",
    id_file::String                         = "",
    info_before::String                     = "auto",
    info_after::String                      = "auto",
    cmd_dependencies::Vector{CmdDependency} = Vector{CmdDependency}(),
    inputs                                  = Vector{String}(),
    validate_inputs::Expr                   = do_nothing,  # vars of inputs
    infer_outputs::Expr                     = do_nothing,  # vars of inputs
    prerequisites::Expr                     = do_nothing,  # vars of inputs and outputs
    main::Expr                              = do_nothing,  # vars of inputs and outputs
    outputs                                 = Vector{String}(),
    validate_outputs::Expr                  = do_nothing,  # vars of outputs
    wrap_up::Expr                           = do_nothing   # vars of inputs and outputs
) -> JuliaProgram


To run a JuliaProgram, the methods are the same as CmdProgram:

success, outputs = run(
) -> (success::Bool, outputs::Dict{String})
  • program_kwargs... include elements in p.inputs and p.outputs
  • Other keyword arguments are related to run. Details can be found at run.
Thread safety

Redirecting and directory change in Julia are not thread safe, so unexpected redirection and directory change might be happen if you are running programs in different Tasks or multi-thread mode.

Compatibility with JobSchedulers.jl

Pipelines.jl is fully compatible with JobSchedulers.jl which is a Julia-based job scheduler and workload manager inspired by Slurm and PBS.

run(::Program, ...) can be replaced by Job(::Program, ...). The latter creates a Job, and you can submit the job to queue by using submit!(::Job). See example below.


using Pipelines

p = JuliaProgram(
    id_file = "id_file",
    inputs = ["a",
              "b" => Int],
    outputs = "c" => "<a>.<b>",
    main = quote
        println("inputs are ", a, " and ", b)
        println("You can also use info in outputs: ", outputs["c"])
        println("The returned value will be assigned to a new outputs")
        println("It is ok to use inputs and outputs directly:")
        @show inputs
        @show outputs
        c = b^2

# running the program using `run`: keyword arguments include keys of inputs and outputs
success, new_out = run(p; a = `in1`, b = 2, c = "out", touch_run_id_file = false)

# an old way to `run` program: need to create inputs and outputs first.
inputs = Dict("a" => `in1`, "b" => 2)
outputs = "c" => "out"
success, new_out = run(p, inputs, outputs; touch_run_id_file = false)

# for CmdProgram, outputs are inferred before running the main command, however,
# for JuliaProgram, outputs will change to the returned value of main function, if the returned value is a Dict and pass `p.validate_outputs`
@assert new_out != outputs

Compatibility with JobSchedulers.jl

using JobSchedulers

scheduler_start()  # start job scheduler

job = Job(p, a=`in1`, b=2, out="any", touch_run_id_file=false)

submit!(job)  # submit job to queue

result(job)  # get the returned result