API

Program

Pipelines.infer_outputsFunction
infer_outputs(p::Program; input_kwargs...)
infer_outputs(p::Program, inputs)
infer_outputs(p::Program, inputs, outputs)

Infer the default outputs from p::Program and inputs::Dict{String}.

source

Command Program

Pipelines.CmdProgramMethod
CmdProgram <: Program

CmdProgram(;
    name::String                            = "Command Program",
    id_file::String                         = "",
    info_before::String                     = "auto",
    info_after::String                      = "auto",
    cmd_dependencies::Vector{CmdDependency} = Vector{CmdDependency}(),
    inputs                                  = Vector{String}(),
    validate_inputs::Expr                   = do_nothing,  # vars of inputs
    infer_outputs::Expr                     = do_nothing,  # vars of inputs
    prerequisites::Expr                     = do_nothing,  # vars of inputs and outputs
    cmd::Base.AbstractCmd                   = ``,
    outputs                                 = Vector{String}(),
    validate_outputs::Expr                  = do_nothing,  # vars of outputs
    wrap_up::Expr                           = do_nothing,  # vars of inputs and outputs
    arg_forward                             = Vector{Pair{String,Symbol}}(),
    mod::Module                             = Pipelines    # change to @__MODULE__ to avoid precompilation error
) -> CmdProgram

Command program template. To run a CmdProgram, use run(::CmdProgram; kwargs...).

Arguments

  • name::String: Program name.

  • id_file::String: The prefix of run ID file. To prevent from running the program with the same inputs and outputs twice, it will generate a unique run ID file after a successful run.

  • info_before::String: Print it when the program is started.

  • info_after::String: Print it when the program is finished.

  • cmd_dependencies::Vector{CmdDependency}: Any command dependencies used in the program.

  • inputs and outputs: Elements (or vectors containing elements) in the following format: (1) keyword (2) keyword => data_type (3) keyword => default_value (4) keyword => default_value => data_type.

    keyword is an argument name, normally it is a String. If the keyword does not affect results (such as ncpu, nthreads), it needs to be a Symbol. When generating unique run IDs, Symbol args are ignored.

    default_value is optional. If set, users may not provide this argument when running. Elsewise, users have to provide it. Caution: nothing is preserved and means default value not set. If String, it can contain other keywords, but need to quote using '<>', such as "<arg>.txt"

    data_type is optional. If set, the value provided have to be this data type, or an error will throw.

  • validate_inputs::Expr: A quoted code to validate inputs. Elements in inputs can be directly used as variables. If validation fail, throw error or return false. See details in quote_expr

  • infer_outputs::Expr: A quoted code to infer outputs from inputs. Elements in inputs can be directly used as variables. Has to return a Dict{String}("OUTPUT_VAR" => value). See details in quote_expr

  • prerequisites::Expr: A quoted code to run just before the main command. It prepares necessary things, such as creating directories. Elements in inputs and outputs can be directly used as variables. See details in quote_expr

  • cmd::AbstractCmd: The main command template. In the template, keywords in inputs::Vector{String} and outputs::Vector{String} will be replaced when envoking run(::CmdProgram, inputs::Dict{String, ValidInputTypes}, outputs::Dict{String, ValidInputTypes}).

  • validate_outputs::Expr: A quoted code to validate outputs. Elements in outputs can be directly used as variables. If validation fail, throw error or return false. See details in quote_expr

  • wrap_up::Expr: The last quoted code to run. Elements in inputs and outputs can be directly used as variables. See details in quote_expr

  • arg_forward: forward args from inputs and outputs to specific keywords in JobSchedulers.Job(), only supporting Pipelines.FORWARD_KEY_SET: Set([:ncpu, :mem, :user, :name]). Elements (or vectors containing elements) in the following format: "arg_of_inputs_or_outputs" => :key_in_FORWARD_KEY_SET.

  • mod::Module: Exprressions will evaluated to functions in mod. Please use mod = @__MODULE__ to prevent precompilation fail when defining the program within a package.

Compatibility of Pipelines < v0.8

You can still pass Function to variables require Expr, but you cannot use the 'elements as variables' feature. The function should take inputs::Dict{String} and/or outputs::Dict{String} as variables, and you have to use traditional inputs["VARNAME"] in functions.

From Pipelines v0.8, all Expr provided will be converted to Function automatically.

Debug: variable not found

Please refer to quote_expr, section 'quote variables in other scopes.'

Example

p = CmdProgram(
    id_file = "id_file",
    inputs = ["input",
              "input2" => Int,
              "optional_arg" => 5,
              "optional_arg2" => 0.5 => Number],
    outputs = "output" => "<input>.output",
    validate_inputs = quote
        @show optional_arg
        optional_arg2 isa Float64 && inputs isa Dict
    end,
    cmd = `echo input input2 optional_arg optional_arg2 output`)

# running the program: keyword arguments include keys of inputs and outputs
success, outputs = run(p; input = `in1`, input2 = 2, output = "out", touch_run_id_file = false)

# an old way to `run` program: need to create Dicts of inputs and outputs first.
inputs = Dict("input" => `in1`,    "input2" => 2)
outputs = Dict("output" => "out")
run(p, inputs, outputs; touch_run_id_file = false)

See also: CmdProgram, JuliaProgram, quote_expr

source
Pipelines.prepare_cmdFunction
prepare_cmd(p::CmdProgram, inputs, outputs)

Prepare the runable command. Keywords in CmdProgram will be given to values of inputs/outputs.

source

Julia Program

Pipelines.JuliaProgramMethod
JuliaProgram <: Program

JuliaProgram(;
    name::String                            = "Julia Program",
    id_file::String                         = "",
    info_before::String                     = "auto",
    info_after::String                      = "auto",
    cmd_dependencies::Vector{CmdDependency} = Vector{CmdDependency}(),
    inputs                                  = Vector{String}(),
    validate_inputs::Expr                   = do_nothing,  # vars of inputs
    infer_outputs::Expr                     = do_nothing,  # vars of inputs
    prerequisites::Expr                     = do_nothing,  # vars of inputs and outputs
    main::Expr                              = do_nothing,  # vars of inputs and outputs
    outputs                                 = Vector{String}(),
    validate_outputs::Expr                  = do_nothing,  # vars of outputs
    wrap_up::Expr                           = do_nothing   # vars of inputs and outputs
    arg_forward                             = Vector{Pair{String,Symbol}}(),
    mod::Module                             = Pipelines    # change to @__MODULE__ to avoid precompilation error
) -> JuliaProgram

Julia program template. To run a JuliaProgram, use run(::JuliaProgram; kwargs...).

Arguments

  • name::String: Program name.

  • id_file::String: The prefix of run ID file. To prevent from running the program with the same inputs and outputs twice, it will generate a unique run ID file after a successful run.

  • info_before::String: Print it when the program is started.

  • info_after::String: Print it when the program is finished.

  • cmd_dependencies::Vector{CmdDependency}: Any command dependencies used in the program.

  • inputs and outputs: Elements (or vectors containing elements) in the following format: (1) keyword (2) keyword => data_type (3) keyword => default_value (4) keyword => default_value => data_type. See more: Arg.

    keyword is an argument name, normally it is a String. If the keyword does not affect results (such as ncpu, nthreads), it needs to be a Symbol. When generating unique run IDs, Symbol args are ignored.

    default_value is optional. If set, users may not provide this argument when running. Elsewise, users have to provide it. Caution: nothing is preserved and means default value not set. If String, it can contain other keywords, but need to quote using '<>', such as "<arg>.txt"

    data_type is optional. If set, the value provided have to be this data type, or an error will throw.

  • validate_inputs::Expr: A quoted code to validate inputs. Elements in inputs can be directly used as variables. If validation fail, throw error or return false. See details in quote_expr

  • infer_outputs::Expr: A quoted code to infer outputs from inputs. Elements in inputs can be directly used as variables. Has to return a Dict{String}("OUTPUT_VAR" => value). See details in quote_expr

  • prerequisites::Expr: A quoted code to run just before the main command. It prepares necessary things, such as creating directories. Elements in inputs and outputs can be directly used as variables. See details in quote_expr

  • main::Expr: The main julia code. Elements in inputs and outputs can be directly used as variables. See details in quote_expr

Returned outputs

The returned value of p.main will be assigned to new outputs. Please ensure the variables in outputs are defined correctly, since it will return outputs::Dict{String,Any}.

  • validate_outputs::Expr: A quoted code to validate outputs. Elements in outputs can be directly used as variables. If validation fail, throw error or return false. See details in quote_expr

  • wrap_up::Expr: the last quoted code to run. Elements in inputs and outputs can be directly used as variables. See details in quote_expr

  • arg_forward: forward args from inputs and outputs to specific keywords in JobSchedulers.Job(), only supporting Pipelines.FORWARD_KEY_SET: Set([:ncpu, :mem, :user, :name]). Elements (or vectors containing elements) in the following format: "arg_of_inputs_or_outputs" => :key_in_FORWARD_KEY_SET.

  • mod::Module: Exprressions will evaluated to functions in mod. Please use mod = @__MODULE__ to prevent precompilation fail when defining the program within a package.

Compatibility of Pipelines < v0.8

You can still pass Function to variables require Expr, but you cannot use the 'elements as variables' feature. The function should take inputs::Dict{String} and/or outputs::Dict{String} as variables, and you have to use traditional inputs["VARNAME"] in functions.

From Pipelines v0.8, all Expr provided will be converted to Function automatically.

Debug: variable not found

Please refer to quote_expr, section 'quote variables in other scopes.'

Example

p = JuliaProgram(
    id_file = "id_file",
    inputs = ["a",
              "b" => Int],
    outputs = "c" => "<a>.<b>",
    main = quote
        println("inputs are ", a, " and ", b)
        println("You can also use info in outputs: ", c)
        println("The returned value will be assigned to a new outputs")
        c = b^2
    end)

# running the program: keyword arguments include keys of inputs and outputs
success, new_out = run(p; a = `in1`, b = 2, c = "out", touch_run_id_file = false)

@assert new_out != infer_outputs(p; a = `in1`, b = 2, c = "out")  # outputs will change to the returned value of main function, if the returned value is a Dict and pass `p.validate_outputs`

# an old way to `run` program: need to create Dicts of inputs and outputs first.
inputs = Dict("a" => `in1`, "b" => 2)
outputs = "c" => "out"
success, new_out = run(p, inputs, outputs; touch_run_id_file = false)
source

Quote for Program

Pipelines.quote_exprFunction
quote
    do_some_thing()
end :: Expr

quote creates a piece of code without using the explicit Expr constructor.

From Pipelines v0.8, you can use quote ... end to validate_inputs, infer_outputs, do prerequisites, do main, validate_outputs, and wrap_up a Program.

Elements in inputs or outputs can be directly used as variables for those arguments. See the table below.

ArgumentElements as variablesDefault returned value
validate_inputsinputsthe last expression
infer_outputsinputsthe last expression, can converted to Dict{String}
prerequisitesinputs, outputsthe last expression
main (JuliaProgram only)inputs, outputsoutputs::Dict{String}
validate_outputsoutputsthe last expression
wrap_upinputs, outputsthe last expression

Example

prog = JuliaProgram(
    inputs = ["A", "B"],
    outputs = ["OUT"],
    validate_inputs = quote
        @show A
        @show inputs
        A isa Number
    end,
    infer_outputs = quote
        Dict("OUT" => A + B)
    end,
    main = quote
        @show A
        @show B
        OUT = A + B
    end,
    validate_outputs = quote
        @show OUT
        OUT isa Number
    end
)

run(prog; A = 3, B = 5, touch_run_id_file = false)
# (true, Dict{String, Any}("OUT" = 8))
`quote` variables in other scopes
  1. A local variable (include function) should be referenced using $ in expression. (No need to use $ for global variables.)

  2. A local ::Symbol variable (sym) should be referenced using $(QuoteNode(sym)) in expression.

Example:

inputs = ["A", "B"]
g_var = 3
g_sym = :globalsymbol

function gen_expr()
    l_var = 5
    l_func() = @info("Use local function")
    l_sym = :abc
    expr = quote
        @show inputs
        @show g_var
        @show g_sym
        @show $(QuoteNode(l_sym))
        @show $l_var + 2
        $l_func()
        A + B
    end
end

expr = gen_expr()
func = Pipelines.quote_function(expr, inputs; mod = @__MODULE__)

in_dict = Dict("A" => 5, "B" => 50)
func(in_dict)
Compatibility of Pipelines < v0.8

You can still pass Function to variables require Expr, but you cannot use the 'elements as variables' feature. The function should take inputs::Dict{String} and/or outputs::Dict{String} as variables, and you have to use traditional inputs["VARNAME"] in functions.

See also: CmdProgram, JuliaProgram, quote_function

source
Pipelines.quote_functionFunction
quote_function(expr::Expr, inputs::Vector{String}; specific_return = nothing, mod::Module = Pipelines)
    # Return `Function` with one argument: `inputs::Dict{String}`.

quote_function(expr::Expr, inputs::Vector{String}, outputs::Vector{String}; specific_return = nothing, mod::Module = Pipelines)
    # Return `Function` with two arguments: `inputs::Dict{String}, outputs::Dict{String}`.

quote_function(f::Function, x; specific_return, mod) = f
quote_function(f::Function, x, y; specific_return, mod) = f
    # Directly return `f::Function` without any process.

Description

When building Program, Expr are automatically converted to Function using quote_function. The elements of inputs and/or outputs in expr will be replaced by inputs["element"] and/or outputs["elements"], respectively. Also, in the generated function, the arguments (inputs and outputs) are regarded as Dict{String}.

  • specific_return: an Expr appended to expr.

  • mod::Module: Exprressions will evaluated to functions in mod. Please use mod = @__MODULE__ to prevent precompilation fail when defining the program within a package.

In expr::Expr, elements in inputs or outputs can be directly used as variables for those arguments. See the table below.

ArgumentElements as variablesDefault returned value
validate_inputsinputsthe last expression
infer_outputsinputsthe last expression, can be converted to Dict{String}
prerequisitesinputs, outputsthe last expression
main (JuliaProgram only)inputs, outputsoutputs::Dict{String}
validate_outputsoutputsthe last expression
wrap_upinputs, outputsthe last expression

Usage in Program building

function JuliaProgram(; kwargs...)
    ...
    # inputs isa Vector{String}
    # outputs isa Vector{String}
    # mod isa Module where evaluating expressions to functions in

    validate_inputs = quote_function(validate_inputs, inputs; mod = mod)
    infer_outputs = quote_function(infer_outputs, inputs; mod = mod)
    prerequisites = quote_function(prerequisites, inputs, outputs; mod = mod)
    validate_outputs = quote_function(validate_outputs, outputs; mod = mod)
    wrap_up = quote_function(wrap_up, inputs, outputs; mod = mod)

    main = quote_function(main, inputs, outputs; specific_return = :(outputs), mod = mod)
    ...
end
source

Arg

Pipelines.ArgType

Arg(name)
Arg(name => default)
Arg(name => type::Type)
Arg(name => default => type::Type)
Arg(name => type::Type => default)

Arg(name::Union{String,Symbol}, type::Type = Any, default = nothing;
    required::Bool = isnothing(default),
    independent::Bool = name isa Symbol
)

struct Arg{type,DefaultType}
    name::String
    type::Type
    default::DefaultType
    required::Bool
    independent::Bool
end

Arg stores the settings of inputs and outputs in Program.

  • name: name of Arg.

  • type: allowed type.

  • default: default value.

  • required = isnothing(default): if true, the Arg has to be provided by users.

  • independent = isa(name, Symbol): if true, the argument does not change the results of a Program, such as "nthreads", "memory". Independent args have no effect on run id.

Valid pair types

  • name: no default value.

  • name => default: set default value, except value is nothing (default value not set).

  • name => type: no default value, but value type is restricted.

  • name => default => type: set default value and value type.

  • name => type => default: set default value and value type.

An edge situation

To create an argument with a default value of nothing, you cannot use =>. Instead, this works:

p = JuliaProgram(
    inputs = [
        Arg("ARG_NAME", nothing; required = false),
        "OTHER_ARG" => String
    ]
)
source
Pipelines.RESERVED_KEY_SETConstant
const RESERVED_KEY_SET = Set(["name", "user", "ncpu", "mem",
    "schedule_time", "wall_time", "priority", "dependency",
    "stdout", "stderr", "stdlog", "append", "dir", "inputs", "outputs",
    "check_dependencies", "skip_when_done", "touch_run_id_file",
    "verbose", "retry", "dry_run"])

Reserved keys that cannot be used in inputs and outputs.

source
Pipelines.FORWARD_KEY_SETConstant
FORWARD_KEY_SET = Set([:name, :user, :ncpu, :mem])

Program objects has a field arg_forward. It can forward args from inputs and outputs to JobSchedulers.Job(), only supporting keyword arguments in Pipelines.FORWARD_KEY_SET. arg_forward accepts elements (or vectors containing elements) in the following format: "arg_of_inputs_or_outputs" => :key_in_FORWARD_KEY_SET."

source

Run Program

Base.runFunction
run(p::Program; kwargs...)
run(p::Program, inputs, outputs; kwargs...)
run(p::Program, inputs; kwargs...) # only usable when `p.infer_outputs` is defined, or default outputs are set in `p`.

Run Program (CmdProgram or JuliaProgram).

Return (success::Bool, outputs::Dict{String})

Warning

If p isa JuliaProgram, outputs will be overwritten by the returned value of p.main only when the returned value is a Dict{String} and passes p.validate_outputs. See more at JuliaProgram.

Positional Arguments

  • p::Program: the command or Julia program template.

  • inputs and outputs: p::Program stores a program template with replaceable portions as keywords. All keywords can be found at p.arg_inputs and p.arg_outputs. Here, inputs and outputs are better to be Dict(keyword::String => replacement).

    If data types of inputs and outputs are not Dict{String}, they will be converted as far as possible. If the conversion fails, program will throw an error.

Keyword Arguments:

  • elements in p.arg_inputs and p.arg_outputs. They will merge to positional arguments inputs and outputs.

  • dir::AbstractString = "": directory to store run_id_file. If set Pipelines.auto_change_directory(true), Program will change to this directory before running. However, changing directory is not thread safe, so it is not recommended.

  • check_dependencies::Bool = true: check dependencies for p (p.cmd_dependencies).

  • skip_when_done::Bool = true: Skip running the program and return true if it has been done before (the run_id_file exists and p.validate_outputs(outputs) passes.)

  • touch_run_id_file::Bool = true: If true, touch a unique run ID file, which indicate the program is successfully run with given inputs and outputs. If false, the next time running the program, skip_when_done=true will not take effect.

  • verbose = true: If true or :all, print all info and error messages. If :min, print minimum info and error messages. If false or :none, print error messages only.

  • retry::Int = 0: If failed, retry for INT times.

  • dry_run::Bool = false: do not run the program, return (command::AbstractCmd, run_id_file::String) for CmdProgram, or (inferred_outputs::Dict{String}, run_id_file::String) for JuliaProgram.

  • stdout, stderr, stdlog and append::Bool = false: Redirect the program outputs to files. stdlog is the Julia logging of @info, @warn, @error, etc. Caution: If p isa CmdProgram and the original command (p.cmd) has redirection, arguments defined here might not be effective for the command.

Thread safety

Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different Tasks or multi-thread mode.

Workflow

  1. Go to the working directory. Establish redirection. (dir, stdout, stderr, stdlog, append).

  2. Validate compatibility between p and inputs/outputs.

  3. Check whether the program has run before. (skip_when_done, p.validate_outputs)

  4. Check command dependencies. (check_dependencies, p.cmd_dependencies)

  5. Validate inputs. (p.validate_inputs)

  6. [CmdProgram only] Generate runnable command from p and inputs/outputs. (stdout, stderr, append)

  7. Preparing before running main command. (p.prerequisites)

  8. Run command [CmdProgram] or the main function [JuliaProgram].

  9. If p isa CmdProgram, validate outputs only. If p isa JuliaProgram, validate the returned value of the main function. If pass, outputs will overwritten by the returned value. Otherwise, the original outputs is kept. (p.validate_outputs)

  10. Wrap up. (p.wrap_up)

  11. Success, touch run id file, and return (success::Bool, outputs::Dict{String}). (touch_run_id_file::Bool)

Example

p = JuliaProgram(
    id_file = "id_file",
    inputs = ["a",
              "b" => Int],
    outputs = "c" => "<a>.<b>",
    main = quote
        println("inputs are ", a, " and ", b)
        println("You can also use info in outputs: ", outputs["c"])
        println("The returned value will be assigned to a new outputs")
        println("It is ok to use inputs and outputs directly:")
        @show inputs
        @show outputs
        c = b^2
    end)

# running the program using `run`: keyword arguments include keys of inputs and outputs
success, new_out = run(p; a = `in1`, b = 2, c = "out", touch_run_id_file = false)

# an old way to `run` program: need to create inputs and outputs first.
inputs = Dict("a" => `in1`, "b" => 2)
outputs = "c" => "out"
success, new_out = run(p, inputs, outputs; touch_run_id_file = false)

# for CmdProgram, outputs are inferred before running the main command, however,
# for JuliaProgram, outputs will change to the returned value of main function, if the returned value is a Dict and pass `p.validate_outputs`
@assert new_out != outputs
source

Command Dependency

Pipelines.CmdDependencyType

Struct

mutable struct CmdDependency
    exec::Base.Cmd
    test_args::Base.Cmd
    validate_success::Bool
    validate_stdout::Function
    validate_stderr::Function
    exit_when_fail::Bool
end

Methods

CmdDependency(;
    exec::Base.Cmd=``,
    test_args::Base.Cmd=``,
    validate_success::Bool=false,
    validate_stdout::Function=do_nothing,
    validate_stderr::Function=do_nothing,
    exit_when_fail::Bool=true
)

Create Command Dependency (CmdDependency).

Arguments

  • exec::AbstractCmd: the command to call the dependency.

  • test_args::AbstractCmd: for testing purposes, the command to be appended to exec.

  • validate_success::Bool: when checking the dependency, whether to validate the exit code == 0.

  • validate_stdout::Function: a function takes standard out as String and return the validation result as ::Bool.

  • validate_stderr::Function: a function takes standard error as String and return the validation result as ::Bool.

  • exit_when_fail::Bool: if validation fails, whether to throw error and exit.

Example

julia = CmdDependency(
    exec = Base.julia_cmd(),
    test_args = `--version`,
    validate_success = true,
    validate_stdout = x -> occursin(r"^julia version", x),
    validate_stderr = do_nothing,
    exit_when_fail = true
)

check_dependency(julia)
source
Pipelines.check_dependencyFunction
check_dependency(p::CmdDependency; exit_when_fail::Bool = p.exit_when_fail) -> Bool

Check CmdDependency by evaluating:

`$(p.exec) $(p.test_args)`

If success, return true.

If fail, return false, or throw DependencyError when exit_when_fail set to true.

source
check_dependency(p::Program; exit_when_fail::Bool=true)

Check dependencies listed in p.cmd_dependencies.

source
check_dependency(m::Module = @__MODULE__; exit_when_fail = true, verbose = true)

Check all CmdDependency and Program under m::Module.

source

Common Methods

Pipelines.status_dependencyFunction
status_dependency(m::Module = @__MODULE__; exit_when_fail = false, verbose = true)

Check all CmdDependency and Program under m::Module. Similar to check_dependency, but do not exit_when_fail by default.

source

Utils

Pipelines.replaceextMethod
replaceext(path, replacement::AbstractString)

If the last component of a path contains a dot, leave everything before the dot as usual, and everything after the dot is replaced by replacement. Otherwise, replacement will be appended to path.

If replacement is empty, the last dot will be removed.

source
Pipelines.removeextMethod
removeext(path)

If the last component of a path contains a dot, leave everything before the dot as usual, and everything including and after the dot is discarded.

source
Base.splitMethod
split(c::Cmd) = c.exec :: Vector{String}

Return splitted arguments of Cmd.

source
Pipelines.to_strMethod
to_str(x) -> String
str(x) -> String

Convert x to String.

  • x::Cmd: remove backticks (return string(x)[2:end-1]).

  • x::Nothing: return "".

  • x::Vector: join elements with "_" as delim.

  • x::Any: return string(x).

source
Pipelines.to_cmdMethod
to_cmd(x) -> Cmd

Convert x to Cmd.

Exception: when x::Nothing, return nothing::Nothing.

source
Pipelines.isokFunction
isok(x::Nothing) = true
isok(x::Bool) = x
isok(x::AbstractString) = true unless x is "" / n / no / null / f / false / 0
isok(x::Any) = true  # default is true
source

Redirection

Pipelines.auto_change_directoryFunction
auto_change_directory(b::Bool)

When running a Program, whether automatically change to the directory.

Default is false since Pipelines v0.11.

Changing directory is not thread-safe in Julia.

Cannot set it in versions <= v0.10.6.

source
Pipelines.redirect_to_filesFunction
redirect_to_files(f::Function, file; mode="a+")
redirect_to_files(f::Function, outfile, errfile; mode="a+")
redirect_to_files(f::Function, outfile, errfile, logfile; mode="a+")

Redirect outputs of function f to file(s).

  • xxxfile: File path (AbstractString), nothing or ::IO. nothing means no redirect. Files can be the same.
  • mode: same as open(..., mode).

Caution: If xxxfile is an IO, it won't be closed. Please use close(io) or JobSchedulers.close_in_future(io, jobs) manually!

Thread safety

Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different Tasks or multi-thread mode.

source
Pipelines.restore_stdoutFunction
restore_stdout()

Restore the current stdout to the original stdout. It is useful when redirecting stdout/stderr fails when calling redirect_to_files, which happens when an old stream is closed and then redirected to.

Thread safety

Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different Tasks or multi-thread mode.

See also restore_stderr().

source
Pipelines.restore_stderrFunction
restore_stderr()

Restore the current stderr to the original stderr. It is useful when redirecting stdout/stderr fails when calling redirect_to_files, which happens when an old stream is closed and then redirected to.

Thread safety

Redirecting in Julia are not thread safe, so unexpected redirection might be happen if you are running programs in different Tasks or multi-thread mode.

See also restore_stdout().

source

Run ID File

Pipelines.create_run_id_fileFunction
create_run_id_file(run_id_file::AbstractString, inputs::Dict, outputs::Dict)

Create run id file.

What is run id file

The run id file stores information of arguments and files related to a successful run of Program.

By comparing the run id file, we can determine whether we need to re-run a finished program.

File Name

The name of a run id file is <dir>/<program_id_prefix>.<argument_UUID>.

  • dir: working directory to run the program, which can be defined in run(::Program; dir = "")

  • program_id_prefix: the prefix of run ID file, which can be defined in CmdProgram(id_file = "") and JuliaProgram(id_file = "")

  • argument_UUID: a unique ID generated from string representations of inputs and outputs arguments using the internal function generate_run_uuid.

In this way, the name of a run id file will not change if running a program in the same directory with same inputs and outputs.

However, this is not enough for determine whether a job needs re-run. Consider this situation:

(1) Run prog with arg = 1, output "out.txt" and "run_id_file_with_arg1"

(2) Run prog with arg = 2, output "out.txt" and "run_id_file_with_arg2"

(3) Run prog with arg = 1 again, no re-run because "out.txt" and "run_id_file_with_arg1" all exist!

To solve the issue, we need to store the states of inputs and outputs arguments.

Here, we guess file names from inputs and outputs.

  • If an argument is AbstractString or AbstractPath, and isfile() returns true, we store the file information. (We ignore directories because their contents are easy to change.)

  • If an argument is Base.AbstractCmd, we decompose the command into pieces, and check whether each piece is a file path. The rules of file guessing are complicated and mentioned in Pipelines.cmd_to_run_id_lines and Pipelines.CMD_FILE_SPLITER.

  • If a file name is found, and its extension is not one of Pipelines.RUN_ID_LINE_SKIP_EXTENSION, it will write to run id file.

Control file name guessing

Pipeline developers usually know what file extension should be ignored, and whether they have an argument joining two files with a splitter. In this way, we can use IN-PLACE methods to change Pipelines.RUN_ID_LINE_SKIP_EXTENSION and Pipelines.CMD_FILE_SPLITER. IN-PLACE methods are usually functions ending with !, such as empty!, push!, deleteat!

Contents of run id file

  • Tab delimited, no header.
  • Column 1: i or o stands for inputs or outputs.
  • Column 2: unix timestamp of when the file was last modified in Float64.
  • Column 3: the size (in bytes) of the file.
  • Column 3: key name of inputs or outputs. It may have duplication.
  • Column 4: file path. It may have duplication.

Limitation

We cannot store states of all arguments. If we have a pure JuliaProgram without reading and writing files, we cannot guarantee the state of the arguments.

A work-around is to intentionally create a file with a fixed name, and the file name is defined in Program's outputs.

See also

Pipelines.cmd_to_run_id_lines, Pipelines.RUN_ID_LINE_SKIP_EXTENSION, Pipelines.CMD_FILE_SPLITER

source
Pipelines.cmd_to_run_id_linesFunction
cmd_to_run_id_lines(io::IO, arg_name::AbstractString, cmd::Base.AbstractCmd, first_char::String)
  • io: IO of run id file.
  • arg_name: name of the inputs/outputs argument.
  • cmd: Subtypes of Base.AbstractCmd.
  • first_char: "i" or "o", stands for inputs or outputs.

Rules to guess file names from command:

  • The first argument is ignored because usually it is a script.

  • Numbers are ignored.

  • If an argument starts with -, matching r"^-[A-Za-z0-9\-\_]+(/.+)" and r"^-[A-Za-z0-9\-\_]+=(.+)" only. If matched, go to the next rule.

  • Check whether an arg is a file. If not, try to use Pipelines.CMD_FILE_SPLITER to split the argument, and check each part. If found a file, go to the next rule.

  • If a file name is found, and its extension is not one of Pipelines.RUN_ID_LINE_SKIP_EXTENSION, it will write to run id file.

source
Pipelines.need_rerunFunction
need_rerun(p::Program, run_id_file::AbstractString, inputs::Dict, outputs::Dict) -> Bool

Check whether re-run the program p. Return true means it need re-run.

Decision details

  1. Is run_id_file a file? If not, re-run.

  2. Run p.validate_outputs(outputs). If fail, re-run.

  3. Comparing status of files in run_id_file using Pipelines.any_file_differ. If yes, re-run.

source
Pipelines.any_file_differFunction
any_file_differ(run_id_file::AbstractString, inputs::Dict, outputs::Dict)

Check whether any existing file (not dir) path of AbstractString or AbstractPath differ from records in run_id_file.

source

Internal

Pipelines.parse_argFunction
parse_arg(v)

Parsing inputs and outputs when creating Program objects.

Return Vector{Arg}.

Valid v element types

  • name: no default value.

  • name => value: set default value, except value is nothing (default value not set).

  • name => value_type::Type: no default value, but value type.

  • name => value => value_type::Type: set default value and value type.

  • name => value_type::Type => value: set default value and value type.

source
Pipelines.to_xxput_dictFunction
to_xxput_dict(p::Pair{String, V}) where V
to_xxput_dict(p::Pair)
to_xxput_dict(v::Vector{V}) where V <: Pair
to_xxput_dict(d::Dict)

Convert inputs/outputs to Dict{String} in run(p, inputs, outputs)

source
Pipelines.try_functionFunction
try_function(f::Function, error_io::IO)
try_function(f::Function, ::Nothing   )

Try to run f. If f throws error, display stacktraces in error_io or stderr, and return stacktrace information stored as ::StackTraceVector.

source
Pipelines.StackTraceVectorType
struct StackTraceVector
    x::Vector
end
  • x = [(exception,backtrace), ...]: the result of Base.current_exceptions() in Julia 1.7 or Base.catch_stack() in Julia 1.1-1.6.
source
Pipelines.xxputs_completion_and_checkFunction
xxputs_completion_and_check(p::Program, inputs, outputs)
  1. Check and complete inputs using types and values stored in p.

  2. Run p.infer_outputs if defined, and then merge it and outputs (user-input keys are kept).

  3. Check and complete outputs using types and values stored in p.

  4. Check keyword consistency using p.

  5. Interpolate <keyword> in String in completed inputs and outputs.

  6. Return inputs and outputs.

source
Pipelines.parse_program_argsFunction
parse_program_args(p::Program; args...)

Classify args... to inputs and outputs of p, and other keyword arguments. args includes inputs = ..., and outputs = ...

Return (inputs::Dict{String}, outputs::Dict{String}, other_kwargs::Tuple)

source