API
Program
Pipelines.Program — TypeSummary
abstract type Program <: AnySubtypes
CmdProgram
JuliaProgramPipelines.infer_outputs — Functioninfer_outputs(p::Program; input_kwargs...)
infer_outputs(p::Program, inputs)
infer_outputs(p::Program, inputs, outputs)Infer the default outputs from p::Program and inputs::Dict{String}.
Command Program
Pipelines.CmdProgram — MethodCmdProgram <: Program
CmdProgram(;
name::String = "Command Program",
id_file::String = "",
info_before::String = "auto",
info_after::String = "auto",
cmd_dependencies::Vector{CmdDependency} = Vector{CmdDependency}(),
inputs = Vector{String}(),
validate_inputs::Expr = do_nothing, # vars of inputs
infer_outputs::Expr = do_nothing, # vars of inputs
prerequisites::Expr = do_nothing, # vars of inputs and outputs
cmd::Base.AbstractCmd = ``,
outputs = Vector{String}(),
validate_outputs::Expr = do_nothing, # vars of outputs
wrap_up::Expr = do_nothing, # vars of inputs and outputs
arg_forward = Vector{Pair{String,Symbol}}(),
mod::Module = Pipelines # change to @__MODULE__ to avoid precompilation error
) -> CmdProgramCommand program template. To run a CmdProgram, use run(::CmdProgram; kwargs...).
Arguments
name::String: Program name.id_file::String: The prefix of run ID file. To prevent from running the program with the same inputs and outputs twice, it will generate a unique run ID file after a successful run.info_before::String: Print it when the program is started.info_after::String: Print it when the program is finished.cmd_dependencies::Vector{CmdDependency}: Any command dependencies used in the program.inputsandoutputs: Elements (or vectors containing elements) in the following format: (1)keyword(2)keyword => data_type(3)keyword => default_value(4)keyword => default_value => data_type.keywordis an argument name, normally it is aString. If the keyword does not affect results (such as ncpu, nthreads), it needs to be aSymbol. When generating unique run IDs, Symbol args are ignored.default_valueis optional. If set, users may not provide this argument when running. Elsewise, users have to provide it. Caution:nothingis preserved and means default value not set. IfString, it can contain other keywords, but need to quote using '<>', such as"<arg>.txt"data_typeis optional. If set, the value provided have to be this data type, or an error will throw.validate_inputs::Expr: A quoted code to validate inputs. Elements ininputscan be directly used as variables. If validation fail, throw error or return false. See details inquote_exprinfer_outputs::Expr: A quoted code to infer outputs from inputs. Elements ininputscan be directly used as variables. Has to return aDict{String}("OUTPUT_VAR" => value). See details inquote_exprprerequisites::Expr: A quoted code to run just before the main command. It prepares necessary things, such as creating directories. Elements ininputsandoutputscan be directly used as variables. See details inquote_exprcmd::AbstractCmd: The main command template. In the template, keywords ininputs::Vector{String}andoutputs::Vector{String}will be replaced when envokingrun(::CmdProgram, inputs::Dict{String, ValidInputTypes}, outputs::Dict{String, ValidInputTypes}).validate_outputs::Expr: A quoted code to validate outputs. Elements inoutputscan be directly used as variables. If validation fail, throw error or return false. See details inquote_exprwrap_up::Expr: The last quoted code to run. Elements ininputsandoutputscan be directly used as variables. See details inquote_exprarg_forward: forward args from inputs and outputs to specific keywords inJobSchedulers.Job(), only supportingPipelines.FORWARD_KEY_SET:Set([:ncpu, :mem, :user, :name]). Elements (or vectors containing elements) in the following format:"arg_of_inputs_or_outputs" => :key_in_FORWARD_KEY_SET.mod::Module:Exprressions will evaluated to functions inmod. Please usemod = @__MODULE__to prevent precompilation fail when defining the program within a package.
You can still pass Function to variables require Expr, but you cannot use the 'elements as variables' feature. The function should take inputs::Dict{String} and/or outputs::Dict{String} as variables, and you have to use traditional inputs["VARNAME"] in functions.
From Pipelines v0.8, all Expr provided will be converted to Function automatically.
Please refer to quote_expr, section 'quote variables in other scopes.'
Example
p = CmdProgram(
id_file = "id_file",
inputs = ["input",
"input2" => Int,
"optional_arg" => 5,
"optional_arg2" => 0.5 => Number],
outputs = "output" => "<input>.output",
validate_inputs = quote
@show optional_arg
optional_arg2 isa Float64 && inputs isa Dict
end,
cmd = `echo input input2 optional_arg optional_arg2 output`)
# running the program: keyword arguments include keys of inputs and outputs
success, outputs = run(p; input = `in1`, input2 = 2, output = "out", touch_run_id_file = false)
# an old way to `run` program: need to create Dicts of inputs and outputs first.
inputs = Dict("input" => `in1`, "input2" => 2)
outputs = Dict("output" => "out")
run(p, inputs, outputs; touch_run_id_file = false)See also: CmdProgram, JuliaProgram, quote_expr
Pipelines.prepare_cmd — Functionprepare_cmd(p::CmdProgram, inputs, outputs)Prepare the runable command. Keywords in CmdProgram will be given to values of inputs/outputs.
Julia Program
Pipelines.JuliaProgram — MethodJuliaProgram <: Program
JuliaProgram(;
name::String = "Julia Program",
id_file::String = "",
info_before::String = "auto",
info_after::String = "auto",
cmd_dependencies::Vector{CmdDependency} = Vector{CmdDependency}(),
inputs = Vector{String}(),
validate_inputs::Expr = do_nothing, # vars of inputs
infer_outputs::Expr = do_nothing, # vars of inputs
prerequisites::Expr = do_nothing, # vars of inputs and outputs
main::Expr = do_nothing, # vars of inputs and outputs
outputs = Vector{String}(),
validate_outputs::Expr = do_nothing, # vars of outputs
wrap_up::Expr = do_nothing # vars of inputs and outputs
arg_forward = Vector{Pair{String,Symbol}}(),
mod::Module = Pipelines # change to @__MODULE__ to avoid precompilation error
) -> JuliaProgramJulia program template. To run a JuliaProgram, use run(::JuliaProgram; kwargs...).
Arguments
name::String: Program name.id_file::String: The prefix of run ID file. To prevent from running the program with the same inputs and outputs twice, it will generate a unique run ID file after a successful run.info_before::String: Print it when the program is started.info_after::String: Print it when the program is finished.cmd_dependencies::Vector{CmdDependency}: Any command dependencies used in the program.inputsandoutputs: Elements (or vectors containing elements) in the following format: (1)keyword(2)keyword => data_type(3)keyword => default_value(4)keyword => default_value => data_type. See more:Arg.keywordis an argument name, normally it is aString. If the keyword does not affect results (such as ncpu, nthreads), it needs to be aSymbol. When generating unique run IDs, Symbol args are ignored.default_valueis optional. If set, users may not provide this argument when running. Elsewise, users have to provide it. Caution:nothingis preserved and means default value not set. IfString, it can contain other keywords, but need to quote using '<>', such as"<arg>.txt"data_typeis optional. If set, the value provided have to be this data type, or an error will throw.validate_inputs::Expr: A quoted code to validate inputs. Elements ininputscan be directly used as variables. If validation fail, throw error or return false. See details inquote_exprinfer_outputs::Expr: A quoted code to infer outputs from inputs. Elements ininputscan be directly used as variables. Has to return aDict{String}("OUTPUT_VAR" => value). See details inquote_exprprerequisites::Expr: A quoted code to run just before the main command. It prepares necessary things, such as creating directories. Elements ininputsandoutputscan be directly used as variables. See details inquote_exprmain::Expr: The main julia code. Elements ininputsandoutputscan be directly used as variables. See details inquote_expr
The returned value of p.main will be assigned to new outputs. Please ensure the variables in outputs are defined correctly, since it will return outputs::Dict{String,Any}.
validate_outputs::Expr: A quoted code to validate outputs. Elements inoutputscan be directly used as variables. If validation fail, throw error or return false. See details inquote_exprwrap_up::Expr: the last quoted code to run. Elements ininputsandoutputscan be directly used as variables. See details inquote_exprarg_forward: forward args from inputs and outputs to specific keywords inJobSchedulers.Job(), only supportingPipelines.FORWARD_KEY_SET:Set([:ncpu, :mem, :user, :name]). Elements (or vectors containing elements) in the following format:"arg_of_inputs_or_outputs" => :key_in_FORWARD_KEY_SET.mod::Module:Exprressions will evaluated to functions inmod. Please usemod = @__MODULE__to prevent precompilation fail when defining the program within a package.
You can still pass Function to variables require Expr, but you cannot use the 'elements as variables' feature. The function should take inputs::Dict{String} and/or outputs::Dict{String} as variables, and you have to use traditional inputs["VARNAME"] in functions.
From Pipelines v0.8, all Expr provided will be converted to Function automatically.
Please refer to quote_expr, section 'quote variables in other scopes.'
Example
p = JuliaProgram(
id_file = "id_file",
inputs = ["a",
"b" => Int],
outputs = "c" => "<a>.<b>",
main = quote
println("inputs are ", a, " and ", b)
println("You can also use info in outputs: ", c)
println("The returned value will be assigned to a new outputs")
c = b^2
end)
# running the program: keyword arguments include keys of inputs and outputs
success, new_out = run(p; a = `in1`, b = 2, c = "out", touch_run_id_file = false)
@assert new_out != infer_outputs(p; a = `in1`, b = 2, c = "out") # outputs will change to the returned value of main function, if the returned value is a Dict and pass `p.validate_outputs`
# an old way to `run` program: need to create Dicts of inputs and outputs first.
inputs = Dict("a" => `in1`, "b" => 2)
outputs = "c" => "out"
success, new_out = run(p, inputs, outputs; touch_run_id_file = false)Quote for Program
Pipelines.quote_expr — Functionquote
do_some_thing()
end :: Exprquote creates a piece of code without using the explicit Expr constructor.
From Pipelines v0.8, you can use quote ... end to validate_inputs, infer_outputs, do prerequisites, do main, validate_outputs, and wrap_up a Program.
Elements in inputs or outputs can be directly used as variables for those arguments. See the table below.
| Argument | Elements as variables | Default returned value |
|---|---|---|
| validate_inputs | inputs | the last expression |
| infer_outputs | inputs | the last expression, can converted to Dict{String} |
| prerequisites | inputs, outputs | the last expression |
| main (JuliaProgram only) | inputs, outputs | outputs::Dict{String} |
| validate_outputs | outputs | the last expression |
| wrap_up | inputs, outputs | the last expression |
Example
prog = JuliaProgram(
inputs = ["A", "B"],
outputs = ["OUT"],
validate_inputs = quote
@show A
@show inputs
A isa Number
end,
infer_outputs = quote
Dict("OUT" => A + B)
end,
main = quote
@show A
@show B
OUT = A + B
end,
validate_outputs = quote
@show OUT
OUT isa Number
end
)
run(prog; A = 3, B = 5, touch_run_id_file = false)
# (true, Dict{String, Any}("OUT" = 8))A local variable (include function) should be referenced using
$inexpression. (No need to use$for global variables.)A local
::Symbolvariable (sym) should be referenced using$(QuoteNode(sym))inexpression.
Example:
inputs = ["A", "B"]
g_var = 3
g_sym = :globalsymbol
function gen_expr()
l_var = 5
l_func() = @info("Use local function")
l_sym = :abc
expr = quote
@show inputs
@show g_var
@show g_sym
@show $(QuoteNode(l_sym))
@show $l_var + 2
$l_func()
A + B
end
end
expr = gen_expr()
func = Pipelines.quote_function(expr, inputs; mod = @__MODULE__)
in_dict = Dict("A" => 5, "B" => 50)
func(in_dict)You can still pass Function to variables require Expr, but you cannot use the 'elements as variables' feature. The function should take inputs::Dict{String} and/or outputs::Dict{String} as variables, and you have to use traditional inputs["VARNAME"] in functions.
See also: CmdProgram, JuliaProgram, quote_function
Pipelines.quote_function — Functionquote_function(expr::Expr, inputs::Vector{String}; specific_return = nothing, mod::Module = Pipelines)
# Return `Function` with one argument: `inputs::Dict{String}`.
quote_function(expr::Expr, inputs::Vector{String}, outputs::Vector{String}; specific_return = nothing, mod::Module = Pipelines)
# Return `Function` with two arguments: `inputs::Dict{String}, outputs::Dict{String}`.
quote_function(f::Function, x; specific_return, mod) = f
quote_function(f::Function, x, y; specific_return, mod) = f
# Directly return `f::Function` without any process.Description
When building Program, Expr are automatically converted to Function using quote_function. The elements of inputs and/or outputs in expr will be replaced by inputs["element"] and/or outputs["elements"], respectively. Also, in the generated function, the arguments (inputs and outputs) are regarded as Dict{String}.
specific_return: anExprappended toexpr.mod::Module:Exprressions will evaluated to functions inmod. Please usemod = @__MODULE__to prevent precompilation fail when defining the program within a package.
In expr::Expr, elements in inputs or outputs can be directly used as variables for those arguments. See the table below.
| Argument | Elements as variables | Default returned value |
|---|---|---|
| validate_inputs | inputs | the last expression |
| infer_outputs | inputs | the last expression, can be converted to Dict{String} |
| prerequisites | inputs, outputs | the last expression |
| main (JuliaProgram only) | inputs, outputs | outputs::Dict{String} |
| validate_outputs | outputs | the last expression |
| wrap_up | inputs, outputs | the last expression |
Usage in Program building
function JuliaProgram(; kwargs...)
...
# inputs isa Vector{String}
# outputs isa Vector{String}
# mod isa Module where evaluating expressions to functions in
validate_inputs = quote_function(validate_inputs, inputs; mod = mod)
infer_outputs = quote_function(infer_outputs, inputs; mod = mod)
prerequisites = quote_function(prerequisites, inputs, outputs; mod = mod)
validate_outputs = quote_function(validate_outputs, outputs; mod = mod)
wrap_up = quote_function(wrap_up, inputs, outputs; mod = mod)
main = quote_function(main, inputs, outputs; specific_return = :(outputs), mod = mod)
...
endArg
Pipelines.Arg — Type
Arg(name)
Arg(name => default)
Arg(name => type::Type)
Arg(name => default => type::Type)
Arg(name => type::Type => default)
Arg(name::Union{String,Symbol}, type::Type = Any, default = nothing;
required::Bool = isnothing(default),
independent::Bool = name isa Symbol
)
struct Arg{type,DefaultType}
name::String
type::Type
default::DefaultType
required::Bool
independent::Bool
endArg stores the settings of inputs and outputs in Program.
name: name of Arg.type: allowed type.default: default value.required = isnothing(default): if true, theArghas to be provided by users.independent = isa(name, Symbol): if true, the argument does not change the results of a Program, such as "nthreads", "memory". Independent args have no effect on run id.
Valid pair types
name: no default value.name => default: set default value, exceptvalueisnothing(default value not set).name => type: no default value, but value type is restricted.name => default => type: set default value and value type.name => type => default: set default value and value type.
Pipelines.RESERVED_KEY_SET — Constantconst RESERVED_KEY_SET = Set(["name", "user", "ncpu", "mem",
"schedule_time", "wall_time", "priority", "dependency",
"stdout", "stderr", "stdlog", "append", "dir", "inputs", "outputs",
"check_dependencies", "skip_when_done", "touch_run_id_file",
"verbose", "retry", "dry_run"])Reserved keys that cannot be used in inputs and outputs.
Pipelines.FORWARD_KEY_SET — ConstantFORWARD_KEY_SET = Set([:name, :user, :ncpu, :mem])Program objects has a field arg_forward. It can forward args from inputs and outputs to JobSchedulers.Job(), only supporting keyword arguments in Pipelines.FORWARD_KEY_SET. arg_forward accepts elements (or vectors containing elements) in the following format: "arg_of_inputs_or_outputs" => :key_in_FORWARD_KEY_SET."
Run Program
Base.run — Functionrun(p::Program; kwargs...)
run(p::Program, inputs, outputs; kwargs...)
run(p::Program, inputs; kwargs...) # only usable when `p.infer_outputs` is defined, or default outputs are set in `p`.Run Program (CmdProgram or JuliaProgram).
Return (success::Bool, outputs::Dict{String})
If p isa JuliaProgram, outputs will be overwritten by the returned value of p.main only when the returned value is a Dict{String} and passes p.validate_outputs. See more at JuliaProgram.
Positional Arguments
p::Program: the command or Julia program template.inputsandoutputs:p::Programstores a program template with replaceable portions as keywords. All keywords can be found atp.arg_inputsandp.arg_outputs. Here,inputsandoutputsare better to beDict(keyword::String => replacement).If data types of
inputsandoutputsare notDict{String}, they will be converted as far as possible. If the conversion fails, program will throw an error.
Keyword Arguments:
elements in
p.arg_inputsandp.arg_outputs. They will merge to positional argumentsinputsandoutputs.dir::AbstractString = "": directory to storerun_id_file. If setPipelines.auto_change_directory(true), Program will change to this directory before running. However, changing directory is not thread safe, so it is not recommended.check_dependencies::Bool = true: check dependencies forp(p.cmd_dependencies).skip_when_done::Bool = true: Skip running the program and returntrueif it has been done before (therun_id_fileexists andp.validate_outputs(outputs)passes.)touch_run_id_file::Bool = true: Iftrue, touch a unique run ID file, which indicate the program is successfully run with given inputs and outputs. Iffalse, the next time running the program,skip_when_done=truewill not take effect.verbose = true: Iftrueor:all, print all info and error messages. If:min, print minimum info and error messages. Iffalseor:none, print error messages only.retry::Int = 0: If failed, retry for INT times.dry_run::Bool = false: do not run the program, return(command::AbstractCmd, run_id_file::String)for CmdProgram, or(inferred_outputs::Dict{String}, run_id_file::String)for JuliaProgram.stdout,stderr,stdlogandappend::Bool = false: Redirect the program outputs to files.stdlogis the Julia logging of@info,@warn,@error, etc. Caution: Ifp isa CmdProgramand the original command (p.cmd) has redirection, arguments defined here might not be effective for the command.
- Stdout/stderr redirection is thread-safe by using
ScopedStreams.jlfrom v0.12.0. See details atredirect_stream. If you found an error related toScopedStream, please useScopedStreams.@gen_scoped_stream_methodsafter loading all modules. See details at ScopedStreams.jl. - Changing directory is not thread-safe in Julia.
Workflow
Go to the working directory. Establish redirection. (
dir,stdout,stderr,stdlog,append).Validate compatibility between
pandinputs/outputs.Check whether the program has run before. (
skip_when_done,p.validate_outputs)Check command dependencies. (
check_dependencies,p.cmd_dependencies)Validate
inputs. (p.validate_inputs)[CmdProgram only] Generate runnable command from
pandinputs/outputs. (stdout,stderr,append)Preparing before running main command. (
p.prerequisites)Run command [CmdProgram] or the main function [JuliaProgram].
If
p isa CmdProgram, validateoutputsonly. Ifp isa JuliaProgram, validate the returned value of the main function. If pass,outputswill overwritten by the returned value. Otherwise, the originaloutputsis kept. (p.validate_outputs)Wrap up. (
p.wrap_up)Success, touch run id file, and return
(success::Bool, outputs::Dict{String}). (touch_run_id_file::Bool)
Example
p = JuliaProgram(
id_file = "id_file",
inputs = ["a",
"b" => Int],
outputs = "c" => "<a>.<b>",
main = quote
println("inputs are ", a, " and ", b)
println("You can also use info in outputs: ", outputs["c"])
println("The returned value will be assigned to a new outputs")
println("It is ok to use inputs and outputs directly:")
@show inputs
@show outputs
c = b^2
end)
# running the program using `run`: keyword arguments include keys of inputs and outputs
success, new_out = run(p; a = `in1`, b = 2, c = "out", touch_run_id_file = false)
# an old way to `run` program: need to create inputs and outputs first.
inputs = Dict("a" => `in1`, "b" => 2)
outputs = "c" => "out"
success, new_out = run(p, inputs, outputs; touch_run_id_file = false)
# for CmdProgram, outputs are inferred before running the main command, however,
# for JuliaProgram, outputs will change to the returned value of main function, if the returned value is a Dict and pass `p.validate_outputs`
@assert new_out != outputsCommand Dependency
Pipelines.CmdDependency — TypeStruct
mutable struct CmdDependency
exec::Base.Cmd
test_args::Base.Cmd
validate_success::Bool
validate_stdout::Function
validate_stderr::Function
exit_when_fail::Bool
status::UInt8
endMethods
CmdDependency(;
exec::Base.Cmd=``,
test_args::Base.Cmd=``,
validate_success::Bool=false,
validate_stdout::Function=do_nothing,
validate_stderr::Function=do_nothing,
exit_when_fail::Bool=true
)Create Command Dependency (CmdDependency).
Arguments
exec::AbstractCmd: the command to call the dependency.test_args::AbstractCmd: for testing purposes, the command to be appended toexec.validate_success::Bool: when checking the dependency, whether to validate the exit code == 0.validate_stdout::Function: a function takes standard out asStringand return the validation result as::Bool.validate_stderr::Function: a function takes standard error asStringand return the validation result as::Bool.exit_when_fail::Bool: if validation fails, whether to throw error and exit.
Example
julia = CmdDependency(
exec = Base.julia_cmd(),
test_args = `--version`,
validate_success = true,
validate_stdout = x -> occursin(r"^julia version", x),
validate_stderr = do_nothing,
exit_when_fail = true
)
check_dependency(julia)Pipelines.check_dependency — Functioncheck_dependency(p::CmdDependency; exit_when_fail::Bool = p.exit_when_fail, force::Bool=true) -> BoolCheck CmdDependency by evaluating:
`$(p.exec) $(p.test_args)`force::Bool: force to check dependency, no matter whether it is previously checked.
If success, return true.
If fail, return false, or throw DependencyError when exit_when_fail set to true.
check_dependency(p::Program; exit_when_fail::Bool=true)Check dependencies listed in p.cmd_dependencies.
check_dependency(m::Module = @__MODULE__; exit_when_fail = true, verbose = true)Check all CmdDependency and Program under m::Module.
Pipelines.check_dependency_dir — Methodcheck_dependency_dir(path::Union{AbstractString,Cmd}; exit_when_false=true) -> BoolChecke whether a directory exists. Return ::Bool.
Pipelines.check_dependency_file — Methodcheck_dependency_file(path::Union{AbstractString,Cmd}; exit_when_false=true) -> BoolChecke whether a file exists. Return ::Bool.
Common Methods
Pipelines.status_dependency — Functionstatus_dependency(m::Module = @__MODULE__; exit_when_fail = false, verbose = true)Check all CmdDependency and Program under m::Module. Similar to check_dependency, but do not exit_when_fail by default.
Utils
Pipelines.replaceext — Methodreplaceext(path, replacement::AbstractString)If the last component of a path contains a dot, leave everything before the dot as usual, and everything after the dot is replaced by replacement. Otherwise, replacement will be appended to path.
If replacement is empty, the last dot will be removed.
Pipelines.removeext — Methodremoveext(path)If the last component of a path contains a dot, leave everything before the dot as usual, and everything including and after the dot is discarded.
Base.split — Methodsplit(c::Cmd) = c.exec :: Vector{String}Return splitted arguments of Cmd.
Pipelines.to_str — Methodto_str(x) -> String
str(x) -> StringConvert x to String.
x::Cmd: remove backticks (returnstring(x)[2:end-1]).x::Nothing: return"".x::Vector: join elements with"_"as delim.x::Any: returnstring(x).
Pipelines.to_cmd — Methodto_cmd(x) -> CmdConvert x to Cmd.
Exception: when x::Nothing, return nothing::Nothing.
Pipelines.do_nothing — Functiondo_nothing() = nothing
do_nothing(x) = nothing
do_nothing(x, y) = nothingPipelines.isok — Functionisok(x::Nothing) = true
isok(x::Bool) = x
isok(x::AbstractString) = true unless x is "" / n / no / null / f / false / 0
isok(x::Any) = true # default is trueRedirection
Pipelines.auto_change_directory — Functionauto_change_directory(b::Bool)When running a Program, whether automatically change to the directory.
Default is false since Pipelines v0.11.
Changing directory is not thread-safe in Julia.
Cannot set it in versions <= v0.10.6.
Pipelines.redirect_to_files — Functionredirect_stream(f::Function, out; mode="a+")
redirect_stream(f::Function, out, err; mode="a+")
redirect_stream(f::Function, out, err, log; mode="a+")Thread-safely run function f with redirected Base.stdout, Base.stderr, and logger.
out,err: can be file path (AbstractString), stream (IO), ornothing. Nothing means no redirect.log: besides the types supported byout, also supportAbstractLogger.mode: same asopen(..., mode). Only used forAbstractStringpositional arguments.
Tips
- Do not mess up this function with Base methods
redirect_stdout,redirect_stderr, andredirect_stdiobecause the Base methods are not thread-safe, and calling them will mess upredirect_streamredirection. - If passing an
IOorAbstractLogger, it won't be closed. Please useclose(io)orJobSchedulers.close_in_future(io, jobs)manually. - Do not pass two/three distinct descriptors of the same file to
out,errandlog. See Edge cases.
Examples
using ScopedStreams, Dates
# defines streams for redirection
iob = IOBuffer()
iof = tempname()
# a function that prints to stdout, stderr, and logger
function f(prepend::String, repeat_time::Int)
for _ in 1:repeat_time
println(stdout, "stdout: ", prepend, ": ", now())
println(stderr, "stderr: ", prepend, ": ", now())
@info string("stdlog: ", prepend, ": ", now())
sleep(1)
end
end
# run t1 and t2 concurrently, each with its own redirected streams
t1 = @task redirect_stream(iob) do
f("iob", 3)
end
t2 = @task redirect_stream(iof) do
f("iof", 2)
end
schedule(t1); schedule(t2)
wait(t1) ; wait(t2)
b_res = split(String(take!(iob)), "
")
# 13-element Vector{SubString{String}}:
# "stdout: iob: 2025-09-17T11:06:50.866"
# "stderr: iob: 2025-09-17T11:06:50.947"
# "┌ Info: stdlog: iob: 2025-09-17T11:06:50.954"
# "└ @ Main REPL[4]:5"
# "stdout: iob: 2025-09-17T11:06:52.024"
# "stderr: iob: 2025-09-17T11:06:52.024"
# "┌ Info: stdlog: iob: 2025-09-17T11:06:52.024"
# "└ @ Main REPL[4]:5"
# "stdout: iob: 2025-09-17T11:06:53.027"
# "stderr: iob: 2025-09-17T11:06:53.027"
# "┌ Info: stdlog: iob: 2025-09-17T11:06:53.027"
# "└ @ Main REPL[4]:5"
f_res = readlines(iof)
# 8-element Vector{String}:
# "stdout: iof: 2025-09-17T11:06:51.052"
# "stderr: iof: 2025-09-17T11:06:51.063"
# "┌ Info: stdlog: iof: 2025-09-17T11:06:51.063"
# "└ @ Main REPL[4]:5"
# "stdout: iof: 2025-09-17T11:06:52.072"
# "stderr: iof: 2025-09-17T11:06:52.072"
# "┌ Info: stdlog: iof: 2025-09-17T11:06:52.072"
# "└ @ Main REPL[4]:5"
rm(iof)Edge cases
It is possible to pass the same argument to out, err and log, like:
redirect_stream("out.txt", "out.txt", "out.txt") do
...
end
io = open("out.txt", "a+")
redirect_stream(io, io, io) do
...
endHowever it is not supported to pass two/three distinct descriptors of the same file:
# NOT supported
redirect_stream("out.txt", open("out.txt", "a+")) do
...
endScopedStreams.redirect_stream — Functionredirect_stream(f::Function, out; mode="a+")
redirect_stream(f::Function, out, err; mode="a+")
redirect_stream(f::Function, out, err, log; mode="a+")Thread-safely run function f with redirected Base.stdout, Base.stderr, and logger.
out,err: can be file path (AbstractString), stream (IO), ornothing. Nothing means no redirect.log: besides the types supported byout, also supportAbstractLogger.mode: same asopen(..., mode). Only used forAbstractStringpositional arguments.
Tips
- Do not mess up this function with Base methods
redirect_stdout,redirect_stderr, andredirect_stdiobecause the Base methods are not thread-safe, and calling them will mess upredirect_streamredirection. - If passing an
IOorAbstractLogger, it won't be closed. Please useclose(io)orJobSchedulers.close_in_future(io, jobs)manually. - Do not pass two/three distinct descriptors of the same file to
out,errandlog. See Edge cases.
Examples
using ScopedStreams, Dates
# defines streams for redirection
iob = IOBuffer()
iof = tempname()
# a function that prints to stdout, stderr, and logger
function f(prepend::String, repeat_time::Int)
for _ in 1:repeat_time
println(stdout, "stdout: ", prepend, ": ", now())
println(stderr, "stderr: ", prepend, ": ", now())
@info string("stdlog: ", prepend, ": ", now())
sleep(1)
end
end
# run t1 and t2 concurrently, each with its own redirected streams
t1 = @task redirect_stream(iob) do
f("iob", 3)
end
t2 = @task redirect_stream(iof) do
f("iof", 2)
end
schedule(t1); schedule(t2)
wait(t1) ; wait(t2)
b_res = split(String(take!(iob)), "
")
# 13-element Vector{SubString{String}}:
# "stdout: iob: 2025-09-17T11:06:50.866"
# "stderr: iob: 2025-09-17T11:06:50.947"
# "┌ Info: stdlog: iob: 2025-09-17T11:06:50.954"
# "└ @ Main REPL[4]:5"
# "stdout: iob: 2025-09-17T11:06:52.024"
# "stderr: iob: 2025-09-17T11:06:52.024"
# "┌ Info: stdlog: iob: 2025-09-17T11:06:52.024"
# "└ @ Main REPL[4]:5"
# "stdout: iob: 2025-09-17T11:06:53.027"
# "stderr: iob: 2025-09-17T11:06:53.027"
# "┌ Info: stdlog: iob: 2025-09-17T11:06:53.027"
# "└ @ Main REPL[4]:5"
f_res = readlines(iof)
# 8-element Vector{String}:
# "stdout: iof: 2025-09-17T11:06:51.052"
# "stderr: iof: 2025-09-17T11:06:51.063"
# "┌ Info: stdlog: iof: 2025-09-17T11:06:51.063"
# "└ @ Main REPL[4]:5"
# "stdout: iof: 2025-09-17T11:06:52.072"
# "stderr: iof: 2025-09-17T11:06:52.072"
# "┌ Info: stdlog: iof: 2025-09-17T11:06:52.072"
# "└ @ Main REPL[4]:5"
rm(iof)Edge cases
It is possible to pass the same argument to out, err and log, like:
redirect_stream("out.txt", "out.txt", "out.txt") do
...
end
io = open("out.txt", "a+")
redirect_stream(io, io, io) do
...
endHowever it is not supported to pass two/three distinct descriptors of the same file:
# NOT supported
redirect_stream("out.txt", open("out.txt", "a+")) do
...
endScopedStreams.@gen_scoped_stream_methods — Macro@gen_scoped_stream_methods(incremental::Bool=true)Create a __ScopedStreamsTmp module under the current module if not exist. In __ScopedStreamsTmp, importing all loaded modules via const ModName=Mod::Module and generating methods for ScopedStream from all existing methods with IO.
incremental::Bool=true: only generate methods for newly defined methods forIOsince last call. Iffalse, regenerate all methods forIO.
It does not overwrite existing methods, no matter incremental is true or false.
What will be generated?
# The existing method of `IO` as an template
Base.write(io::IO, x::UInt8)
# to generated the method for `ScopedStream`:
Base.write(io::ScopedStream, x::UInt8) = Base.write(deref(io), x)See also: ScopedStreams.gen_scoped_stream_methods.
ScopedStreams.gen_scoped_stream_methods — Functiongen_scoped_stream_methods(incremental::Bool=true; mod=@__MODULE__)In mod::Module, generate methods for ScopedStream from all existing methods with IO.
incremental::Bool=true: only generate methods for newly defined methods forIOsince last call. Iffalse, regenerate all methods forIO.
It does not overwrite existing methods, no matter incremental is true or false.
What will be generated?
# The existing method of `IO` as an template
Base.write(io::IO, x::UInt8)
# to generated the method for `ScopedStream`:
Base.write(io::ScopedStream, x::UInt8) = Base.write(deref(io), x)Side effects: hygiene of mod
The method generation makes all loaded modules accessible in the mod through const LOADED_MODULE_NAME = LOADED_MODULE::Module. It affects future loading those modules with import and using.
The macro version @gen_scoped_stream_methods solves the hygiene issue by creating a submodule called __ScopedStreamsTmp, and do works there.
See also: @gen_scoped_stream_methods.
Run ID File
Pipelines.RUN_ID_LINE_SKIP_EXTENSION — ConstantRUN_ID_LINE_SKIP_EXTENSION = [".so", ".dylib", ".dll"]If a file with an extension listed, run_id_file skip storing information of this file. It means whether to re-run a program, the state of the file will be ignored.
See also: Pipelines.create_run_id_file, Pipelines.cmd_to_run_id_lines, Pipelines.CMD_FILE_SPLITER
Pipelines.CMD_FILE_SPLITER — ConstantCMD_FILE_SPLITER = [',', ';', ':']It is aimed to guess whether an argument of a command contain multiple file names joined using file splitters.
See also: Pipelines.create_run_id_file, Pipelines.cmd_to_run_id_lines, Pipelines.RUN_ID_LINE_SKIP_EXTENSION
Pipelines.create_run_id_file — Functioncreate_run_id_file(run_id_file::AbstractString, inputs::Dict, outputs::Dict)Create run id file.
What is run id file
The run id file stores information of arguments and files related to a successful run of Program.
By comparing the run id file, we can determine whether we need to re-run a finished program.
File Name
The name of a run id file is <dir>/<program_id_prefix>.<argument_UUID>.
dir: working directory to run the program, which can be defined inrun(::Program; dir = "")program_id_prefix: the prefix of run ID file, which can be defined inCmdProgram(id_file = "")andJuliaProgram(id_file = "")argument_UUID: a unique ID generated from string representations of inputs and outputs arguments using the internal functiongenerate_run_uuid.
In this way, the name of a run id file will not change if running a program in the same directory with same inputs and outputs.
However, this is not enough for determine whether a job needs re-run. Consider this situation:
(1) Run prog with arg = 1, output
"out.txt"and"run_id_file_with_arg1"(2) Run prog with arg = 2, output
"out.txt"and"run_id_file_with_arg2"(3) Run prog with arg = 1 again, no re-run because
"out.txt"and"run_id_file_with_arg1"all exist!
To solve the issue, we need to store the states of inputs and outputs arguments.
Here, we guess file names from inputs and outputs.
If an argument is
AbstractStringorAbstractPath, andisfile()returns true, we store the file information. (We ignore directories because their contents are easy to change.)If an argument is
Base.AbstractCmd, we decompose the command into pieces, and check whether each piece is a file path. The rules of file guessing are complicated and mentioned inPipelines.cmd_to_run_id_linesandPipelines.CMD_FILE_SPLITER.If a file name is found, and its extension is not one of
Pipelines.RUN_ID_LINE_SKIP_EXTENSION, it will write to run id file.
Pipeline developers usually know what file extension should be ignored, and whether they have an argument joining two files with a splitter. In this way, we can use IN-PLACE methods to change Pipelines.RUN_ID_LINE_SKIP_EXTENSION and Pipelines.CMD_FILE_SPLITER. IN-PLACE methods are usually functions ending with !, such as empty!, push!, deleteat!
Contents of run id file
- Tab delimited, no header.
- Column 1:
iorostands for inputs or outputs. - Column 2: unix timestamp of when the file was last modified in Float64.
- Column 3: the size (in bytes) of the file.
- Column 3: key name of inputs or outputs. It may have duplication.
- Column 4: file path. It may have duplication.
Limitation
We cannot store states of all arguments. If we have a pure JuliaProgram without reading and writing files, we cannot guarantee the state of the arguments.
A work-around is to intentionally create a file with a fixed name, and the file name is defined in Program's outputs.
See also
Pipelines.cmd_to_run_id_lines, Pipelines.RUN_ID_LINE_SKIP_EXTENSION, Pipelines.CMD_FILE_SPLITER
Pipelines.cmd_to_run_id_lines — Functioncmd_to_run_id_lines(io::IO, arg_name::AbstractString, cmd::Base.AbstractCmd, first_char::String)io: IO of run id file.arg_name: name of the inputs/outputs argument.cmd: Subtypes of Base.AbstractCmd.first_char:"i"or"o", stands for inputs or outputs.
Rules to guess file names from command:
The first argument is ignored because usually it is a script.
Numbers are ignored.
If an argument starts with
-, matchingr"^-[A-Za-z0-9\-\_]+(/.+)"andr"^-[A-Za-z0-9\-\_]+=(.+)"only. If matched, go to the next rule.Check whether an arg is a file. If not, try to use
Pipelines.CMD_FILE_SPLITERto split the argument, and check each part. If found a file, go to the next rule.If a file name is found, and its extension is not one of
Pipelines.RUN_ID_LINE_SKIP_EXTENSION, it will write to run id file.
Pipelines.need_rerun — Functionneed_rerun(p::Program, run_id_file::AbstractString, inputs::Dict, outputs::Dict) -> BoolCheck whether re-run the program p. Return true means it need re-run.
Decision details
Is
run_id_filea file? If not, re-run.Run
p.validate_outputs(outputs). If fail, re-run.Comparing status of files in
run_id_fileusingPipelines.any_file_differ. If yes, re-run.
Pipelines.any_file_differ — Functionany_file_differ(run_id_file::AbstractString, inputs::Dict, outputs::Dict)Check whether any existing file (not dir) path of AbstractString or AbstractPath differ from records in run_id_file.
Internal
Pipelines.parse_arg — Functionparse_arg(v)Parsing inputs and outputs when creating Program objects.
Return Vector{Arg}.
Valid v element types
name: no default value.name => value: set default value, exceptvalueisnothing(default value not set).name => value_type::Type: no default value, but value type.name => value => value_type::Type: set default value and value type.name => value_type::Type => value: set default value and value type.
Pipelines.to_xxput_dict — Functionto_xxput_dict(p::Pair{String, V}) where V
to_xxput_dict(p::Pair)
to_xxput_dict(v::Vector{V}) where V <: Pair
to_xxput_dict(d::Dict)Convert inputs/outputs to Dict{String} in run(p, inputs, outputs)
Pipelines.keyword_interpolation — Functionkeyword_interpolation(inputs::Dict{String}, outputs::Dict{String})Interpolate <keyword> in String.
Pipelines.xxputs_completion_and_check — Functionxxputs_completion_and_check(p::Program, inputs, outputs)Check and complete
inputsusing types and values stored inp.Run
p.infer_outputsif defined, and then merge it and outputs (user-input keys are kept).Check and complete
outputsusing types and values stored inp.Check keyword consistency using
p.Interpolate <keyword> in String in completed
inputsandoutputs.Return inputs and outputs.
Pipelines.parse_program_args — Functionparse_program_args(p::Program; args...)Classify args... to inputs and outputs of p, and other keyword arguments. args includes inputs = ..., and outputs = ...
Return (inputs::Dict{String}, outputs::Dict{String}, other_kwargs::Tuple)