Pipelines.jl

A lightweight Julia package for computational pipelines.

Building reusable pipelines and workflows is easier than you have ever thought.

Package Features

  • Easy to build both simple and complex tasks.

  • Supports external command lines and pure Julia functions.

  • Supports resuming interrupted tasks, skipping finished tasks.

  • Supports dependency check.

  • Supports inputs, outputs validation, and so on.

  • Supports program queuing and workload management with JobSchedulers.jl

Installation

Pipelines.jl can be installed using the Julia package manager. From the Julia REPL, type ] to enter the Pkg REPL mode and run

pkg> add Pipelines

To use the package, type

using Pipelines

Quick Start

Pipelines are built with multiple Programs. Program is the abstract type of CmdProgram and JuliaProgram.

A CmdProgram contains a command template and name lists of inputs/outputs. The names of inputs/outputs will be replaced by real values when executing the program.

Let's set up a simple CmdProgram to print values using echo:

using Pipelines

echo = CmdProgram(
    inputs = [
        "REQUIRED",               # no default value; any data type.
        "TYPED" => String,        # no default value; String type only.
        "OPTIONAL" => 4,          # default value is 4; any data type.
        "FULL" => String => "abc" # default value is abc; String type only.
    ],
    cmd = `echo REQUIRED TYPED OPTIONAL FULL`   
)

Running the program is just like running other Cmd, but here we need to specify inputs in keyward arguments. Caution: use ; to split positional and keyward arguments, and do not use ,.

run(echo; REQUIRED = "Pipelines", TYPED = "are", FULL = "to build.", OPTIONAL = :easy)
Program will not run twice by default!

The program will generate a file (run id file) in the working directory. It records the states of all possible files (not dirs) guessed from AbstractString, FilePathsBase.AbstractPath, Base.AbstractCmd of inputs/outputs.

If you run a program with the same inputs again, and any file (not dir) in inputs/outputs is not modified, the program will just return the same result, without running the program twice.

input_args = (REQUIRED = "Pipelines", TYPED = "are", FULL = "to build.", OPTIONAL = :easy)
run(echo; input_args...)

Several methods can be used to re-run a program:

# Method 1: stop checking finished program using skip_when_done = false
run(echo; input_args..., skip_when_done = false)

# Method 2: delete the run_id_file before running again
cmd, run_id_file = run(echo; input_args..., dry_run = true) # Dry-run returns the command and run id file without running it.
rm(run_id_file)  # remove the run_id_file

# Method 3: Do not generate run_id_file after a successful run
run(echo; input_args..., touch_run_id_file=false)

Program with Outputs

Unlike the first example, many programs write files as outputs. Pipelines.jl has an elegant way to handle it.

The following program prints values simultaneously, sort them, and save to a file.

prog = CmdProgram(
    inputs = ["INPUT1", "INPUT2", "INPUT3"],
    outputs = "OUTPUT_FILE",
    cmd = pipeline(`echo INPUT1 INPUT2` & `echo INPUT3`, `sort`, "OUTPUT_FILE")
)

# inputs and outputs can be mixed together:
run(prog; INPUT1 = "Good", INPUT2 = "Morning", INPUT3 = "Human", OUTPUT_FILE = "morning.txt")

run(`cat morning.txt`) # print the content of out.txt
# Good Morning
# Human

Default values

Default values and data types can be set for keywords of inputs and outputs in this way:

echo = CmdProgram(
    inputs = [
        "REQUIRED",                     # no default value; any data type.
        "TYPED" => String,              # no default value; String type only.
        "OPTIONAL" => 5,                # default value is 5; any data type.
        "FULL1" => String => "abc",     # default value is abc; String type only.
        "FULL2" => "abc" => String,     # default value is abc; String type only.
        "INTERPOLATED" => "<FULL1>.xyz" # default value is value of FULL1 * ".xyz".
    ],
    cmd = `echo REQUIRED TYPED OPTIONAL FULL`   
)

Interpolation of default values

If the default value is a String, it can be interpolated by using <keyword>, such as "<FULL1>.xyz" in the example.

Generate outputs using Function

This step is prior to adding default values of outputs, and string interpolation using <>.

We also provide a parameter (infer_outputs::Expr) in CmdProgram to generate complex outputs::Dict{String} from inputs. You can use elements in inputs as variables in Expr. The Expression will automatically converted to function. Please make sure the returned value of the function has to be a Dict{String} with keys of outputs.

prog = CmdProgram(
    inputs = [
        "INPUT1" => Int,
        "INPUT2" => Int => 3
    ],
    outputs = "OUTPUT_FILE",
    cmd = pipeline(`echo INPUT1 INPUT2`, `sort`, "OUTPUT_FILE"),
    infer_outputs = quote
        Dict("OUTPUT_FILE" => joinpath(pwd(), string("out_", INPUT1, ".txt")))
    end
)
success, outputs = run(prog; INPUT1 = 5)
# (true, Dict{String, Any}("OUTPUT_FILE" => "/home/jiacheng/projects/Pipelines.jl/out_5.txt"))

We can also generate default outputs without running the program:

outputs = infer_outputs(prog; INPUT1 = 5)

Julia Program

Pipelines also defined JuliaProgram type for pure Julia functions. It is like CmdProgram and remain most compatibility. Main difference is that it uses argument main::Expr to replace cmd::AbstractCmd in arguments. More details are in the Julia Program, Manual Page.

prog = JuliaProgram(
    inputs = ["A", "B"],
    outputs = ["OUT"],
    infer_outputs = quote
        Dict("OUT" => A + B)
    end,
    main = quote
        @show A
        @show B
        OUT = A + B
    end
)

run(prog; A = 3, B = 5)
# (true, Dict{String, Any}("OUT" => 8))

Compatibility with JobSchedulers.jl

Pipelines.jl is fully compatible with JobSchedulers.jl which is a Julia-based job scheduler and workload manager inspired by Slurm and PBS.

run(::Program, ...) can be replaced by Job(::Program, ...). The latter creates a Job, and you can submit the job to queue by using submit!(::Job).

Future Development

  • Support running competitive tasks with locks.