Getting Started¶
Install¶
DryPipe is installed in a virtualenv like any python library
pyton3 -m venv your_venv
source your_venv/bin/activate
pip install dry-pipe
Once installed the DryPipe CLI is available
drypipe --help
Github repo¶
Issues can be submited on the github repo
Writing Pipelines¶
The DryPipe DSL¶
Tasks¶
The DryPipe DSL is meant to express two things:
Tasks and their attributes (in/out arguments, the code they execute, where and how they execute).
Producer/Consumer relationships between Tasks
Producer/consumer relationships, are often inherently related to their arguments, ex:
t1 = dsl.task(key="t1").outputs(x=int).calls(f1)()
t2 = dsl.task(key="t2").inputs(t1.outputs.x).outputs(f=dsl.file("f.tsv")).calls(f2)()
The above code expresses in a (mostly) declarative style the following:
task t1 produces a variable x of type int
task t2 consumes the variable x produced by t1
tasks t1 and t2 call functions f1 and f2
task t2 produces a file: “f.tsv”
An executable pipeline can be created with t1 and t2 with the following code:
from dry_pipe import DryPipe
def my_tasks(dsl):
t1 = dsl.task(key="t1").outputs(x=int).calls(f1)()
yield t1
yield dsl.task(key="t2").inputs(t1.outputs.x).outputs(f=dsl.file("f.tsv")).calls(f2)()
@DryPipe.python_call()
def f1():
return {"x": 123}
@DryPipe.python_call()
def f2(x, f):
with open(f) as _f:
_f.write(f"...{x}")
def my_pipeline():
return DryPipe.create_pipeline(my_tasks)
Assuming the function my_pipeline lives in module my_module the pipeline can be executed with the DryPipe CLI:
. /a-virtual-env-with-drypipe-installed/bin/activate
$ drypipe run -p my_module:my_pipeline --instance-directory=/x/y/z
The DAG¶
Pipelines can be represented by Directed Acyclic Graphs (DAG)
The following diagram shows a DAG representing a simple pipeline:
flowchart LR
t1(["t1"])
t2(["t2"])
t3(["t3"])
t1-->|"x: int"|t2
t2-->|"f.tsv"|t3
t1-->|"z.json"|t3
each oval shape represents a Task
each arrow represents data (files or variable) produced by a task, and consumed by another task
The function below my_dag_generator the generates the DAG above.
def my_dag_generator(dsl):
t1 = dsl.task(key="t1")\
.outputs(x=dsl.var(int), z=dsl.file("z.json"))\
.calls("""
#!/usr/bin/env bash
export x=123
echo '{"i": "abc", "a": [56,57]}' > $z
""")()
yield t1
t2 = dsl.task(key="t2")\
.inputs(t1.outputs.x)\
.outputs(f=dsl.file("f.tsv"))\
.calls(f2)()
yield t2
yield dsl.task(key="t3")\
.inputs(t1.outputs.z, t2.outputs.f)\
.outputs(t=dsl.file("pipeline-result.tsv"))\
.calls(f3)()
The my_dag_generator is a generator function,
that yields every tasks of the DAG
Static vs Dynamic pipelines
So far, all example pipelines have a static DAG, i.e. the set of Tasks is the same throughout the execution of the pipeline.
The DAG of most interesting pipelines are dynamic , new tasks are created during the pipeline’s execution.
Important
DAG generators are invoked repeatedly by DryPipe during the course of a pipeline execution
They generate the set of tasks at the time they are invoked
For a static DAG, every invocation yields the same set of tasks, but for dynamic DAGs, new tasks get generated as execution progresses.
The next series of examples will show static DAG generators, then we will show dynamic DAG generators.
Well behaved DAG generators¶
Well behaved DAG generators do NOT perform any work other than to generate tasks.
DAG generators get called repeatedly, doing “real work” (code meant to run only once that produce output results) would be wasteful, and should be done by tasks.
Generated output should depend only on:
the pipeline’s input dataset
the data produced by tasks that have completed
the execution status of tasks in the DAG(*)
The DryPipe DSL is meant to make the above particularly (2) and (3) as easy as possible.
Dividing work into smaller chunks¶
Here’s an example of a two task pipeline.
Note: tasks t1 and t2 are independent (no data is passed between them), therefore DryPipe will run them in parallel.
def my_pipeline_dag_generator(dsl):
yield dsl.task(
key="t1"
).outputs(
a_file=dsl.file("a-file.txt")
).calls("""
#!/usr/bin/env bash
echo 'hello' > $a_file
""")()
yield dsl.task(
key="t2"
).outputs(
other_file=dsl.file("another-file.txt")
).calls("""
#!/usr/bin/env bash
echo 'world' > $other_file
""")()
File dependencies between Tasks¶
Most pipelines have tasks that depend on data produced by other tasks.
In the following example, task t2 consumes a file produced by t1.
DryPipe will therefore launch task t2, when (and only when) task t1 has successfully completed.
def my_pipeline_dag_generator(dsl):
t1 = dsl.task(
key="t1"
).inputs(
x=123
).outputs(
result=dsl.file("f.txt")
).calls("""
echo $(( x * x )) > $result
""")()
yield t1
# we keep a reference to t1 so we can refer to t1.outputs.result
yield dsl.task(
key="t2"
).inputs(
r=t1.outputs.result
).outputs(
f=dsl.file("final-result.txt")
).calls("""
result_from_t1="$(cat $r)"
echo "we got data from t1: $result_from_t1" > $f
""")()
The calls(…) clause can also take the name of a bash script, ex: calls("my-script.sh"), the path of the script
is resolved by $__pipeline_code_dir/my-script.sh (see environment variables)
Variable dependencies between Tasks¶
When a task only needs a variable (int, float, str) from an upstream task, it is more convenient to just pass the variable, as opposed to writing it in a file and then reading it by the consuming downstream task.
Variable passing between tasks is expressed as follows with the DSL
def my_pipeline_dag_generator(dsl):
t1 = dsl.task(
key="t1"
).inputs(
x=123,
y=3.14159265359,
z='abc'
).outputs(
result=int
).calls("""
#!/usr/bin/env bash
echo "all variables in the consumes(...) clause are in the env" $x, $y, $z
# 'export' is used to assign output variable in the produces(...) clause
export result=$(( x * y ))
""")()
yield t1
yield dsl.task(
key="t2"
).inputs(
r=t1.outputs.result
).outputs(
f=dsl.file("final-result.txt")
).calls("""
#!/usr/bin/env bash
echo "we got data from t1: $r" > $f
""")()
Calling Python functions from Tasks¶
The calls(…) clause of tasks, can take bash snippets (as in previous examples), or Python functions, provided that they
are annotated with @DryPipe.python_call()
The following pipeline does the same things as the previous, but uses python functions instead of bash snippets.
Note1: functions have their var arguments correctly parsed, according to the types specified in the generator function.
Note2: output vars from the produces(…) clause are also passed as arguments.
Note3: output variables (declared in the produces(…) clause) are returned by functions with a python dict()
Note4: print(“abc”) will write to in $__pipeline_instance_dir/.drypipe/$__task_key/out.log (see [DryPipe assigned environment variables of tasks])
import sys
from dry_pipe import DryPipe
@DryPipe.python_call()
def f1(x, y, z):
print(f"all variables in the consumes(...) clause are in the env {x}, {y}, {z}")
if isinstance(x, int):
print("Wow, x has the correct type !")
else:
# will write to $__pipeline_instance_dir/.drypipe/$__task_key/out.log
print("Something is seriously wrong !")
x_from_env = os.environ["x"]
if isinstance(x_from_env, str):
print("""one could grab vars from the environment, but
receiving pre parsed arguments is more convenient !""")
print("")
return {
"result": x * y
}
@DryPipe.python_call()
def f2(r, f):
with open(f) as f_handle:
f_handle.write(f"we got data from t1: {r}")
def my_pipeline_dag_generator(dsl):
t1 = dsl.task(
key="t1"
).inputs(
x=123,
y=3.14159265359,
z='abc'
).outputs(
result=int
).calls(
f1
)()
yield t1
yield dsl.task(
key="t2"
).inputs(
r=t1.outputs.result
).outputs(
f=dsl.file("final-result.txt")
).calls(
f2
)()
Dynamic DAGs¶
Most interesting pipelines will have DAGs that change depending on the input dataset.
This section shows how dynamic DAGs can be expressed with the DSL.
DAG generators generate the tasks of a pipeline at the instant it is invoked by DryPipe
DryPipe will track and orchestrate tasks, by invoking the generator repeatedly (a few times per minute) during the course of the pipeline execution.
At every invocation, DryPipe will analyze the generated DAG, and figure out what needs to be done (which tasks have all their dependencies satisfied and can be launched).
Below is an example of a dynamic/growing DAG. For simplicity, only two task status are shown (r=running, c=completed), there are many more in reality.
time |
DAG [r=running], [c=completed] |
pipeline status |
|---|---|---|
t0 |
task-a[r] |
running |
t1 |
task-a[c] |
running |
t2 |
task-a[c], task-b[r] |
running |
t3 |
task-a[c], task-b[c], task-c_1[r], task-c_2[r], …, task-c_n[r] |
running |
t4 |
task-a[c], task-b[c], task-c_1[r], task-c_2[c], …, task-c_n[r] |
running |
t5 |
task-a[c], task-b[c], task-c_1[c], task-c_2[c], …, task-c_n[c] |
completed |
Each row of the table shows all tasks in the DAGs, that the generator would return at various points in time t0, t1, …,t5.
Semi hardcoded DAGs¶
Some pipeline are only meant to run very few inputs, in these cases, it might make sense to hard code every instance.
The code below uses created two pipelines (pipeline_1_2_3 and pipeline_7_8_9_543), that work over two hardcoded datasets (lists of ints).
Both pipelines use the same generator function, and can then be run with:
drypipe run -p my_modue:pipeline_1_2_3
drypipe run -p my_modue:pipeline_7_8_9_543
def create_dag_generator(beautiful_numbers):
def dag_gen(dsl):
for i in beautiful_numbers:
yield dsl.task(
key=f"square-{i}"
).inputs(
x=i
).outputs(
f=dsl.file("file_with_squared_number.txt")
).calls("""
#!/usr/bin/env bash
echo $(( $x * $x)) > $f
""")()
return dag_gen
def pipeline_1_2_3():
return DryPipe.create_pipeline(
create_dag_generator([1,3,4]),
)
def pipeline_7_8_9_543():
return DryPipe.create_pipeline(
create_dag_generator([7, 8, 9, 543]),
)
Input file driven DAGs¶
Some Pipelines DAGs are driven by files from the pipeline dataset.
Since the DAG generator is just a normal python function, there are no other constraint on customization other than what can be coded in python code.
It’s entirely up to the pipeline developer to decide on the input file format, how many files, how to parse them, which library to use, etc.
Next example shows a common pattern:
The pipeline’s input consists of a TSV file, in which each line represents a unit or work.
The generator implements this, by reading the TSV file, and yielding a task for every line, with the proper arguments.
Note: the path of the file is taken from an environment variable set before running the pipeline, next example will show another approach.
export MY_INPUT_DATA_FILE_TSV=<a tsv file>
drypipe run -p my_modue:dag_gen
def dag_gen(dsl):
with open(os.environ['MY_INPUT_DATA_FILE_TSV']) as f:
for line in f.readlines():
i, other_arg = line.split("\t")
yield dsl.task(
key=f"t-{i}"
).inputs(
x=other_arg
).outputs(
f=dsl.file("result.txt")
).calls("""
#!/usr/bin/env bash
echo "compute data for $x" > $f
""")()
In a variation on this pattern, the TSV file lives the $__pipeline_instance_dir
def dag_gen(dsl):
with open(dsl.file_in_pipeline_instance_dir("dataset.tsv")) as f:
for line in f.readlines():
i, other_arg = line.split("\t")
yield dsl.task(
key=f"t-{i}"
)...
def my_pipeline():
return DryPipe.create_pipeline(my_pipeline)
With this approach, the $__pipeline_instance_dir is seeded with the input tsv (ex: dataset.tsv) file, before running the instance:
cp dataset.tsv ./pipeline-instance-dir-123
drypipe run -p my_modue:my_pipeline --instance-dir=./pipeline-instance-dir-123
Storing the pipeline’s input data in seed $__pipeline_instance_dir has the advantage of having the instance dir, contain the entirety of the pipeline instances data.
DAG driven by task execution¶
The following example introduces three new DSL elements:
dsl.fileset('work-chunk.*.fasta')in a produces(…) clausedsl.wait_for_tasks(task1, task2, ...)dsl.wait_for_matching_tasks(task1, task2, ...)
The DAG is a bit more complicated, so we’ll use diagram to show the “big picture”:
flowchart LR
prepare_chunks(["prepare-chunks\n[input_fasta=chimp.fasta]"])
w1(["task-for-chunk-1"])
w2(["task-for-chunk-2"])
wN(["task-for-chunk-N"])
analyze(["analyze-all"])
prepare_chunks-->|"work-chunk.1.fasta"|w1
prepare_chunks-->|"work-chunk.2.fasta"|w2
prepare_chunks-->|"work-chunk.N.fasta"|wN
w1-->|"results.json"|analyze
w2-->|"results.json"|analyze
wN-->|"results.json"|analyze
This pipeline has an initial task (prepare_chunks), that creates N files, for which a task is created (task-for-chunk-i).
prepare_task and task-for-chunk-0..n, have a producer/consumer relationship.
The DSL expresses this (in the code below), by declaring
dsl.fileset('work-chunk.*.fasta')
in the produces(…) clause of the producing task (prepare_task)
All downstream consuming tasks are yielded in the body of the for expression:
for _ in dsl.wait_for_tasks(prepare_chunks):
The last task, depends on the completion of all previous, it’s in a producer/consumer relationship with all upstream task-for-chunk-*
The relationship is expressed with:
for matcher in dsl.wait_for_matching_tasks("task-for-chunk-*"):
The evolution of the pipeline’s DAG over time could look as follows
time |
DAG [r=running], [c=completed] |
pipeline status |
|---|---|---|
t0 |
prepare_chunks[r] |
running |
t1 |
prepare_chunks[c] |
running |
t2 |
prepare_chunks[c], task-for-chunk.1[r], task-for-chunk.2[r], task-for-chunk.N[r] |
running |
t3 |
prepare_chunks[c], task-for-chunk.1[c], task-for-chunk.2[r], task-for-chunk.N[r] |
running |
t4 |
prepare_chunks[c], task-for-chunk.1[c], task-for-chunk.2[c], task-for-chunk.N[c] |
running |
t5 |
prepare_chunks[c], task-for-chunk.1[c], task-for-chunk.2[c], task-for-chunk.N[c], analyze[r] |
running |
t6 |
prepare_chunks[c], task-for-chunk.1[c], task-for-chunk.2[c], task-for-chunk.N[c], analyze[c] |
completed |
Each lines in the above table shows the return of the generator function at various times during the execution.
@DryPipe.python_call()
def create_n_chunks_of_work(input_fasta, __task_output_dir):
with open(input_fasta) as f:
c = 0
for w in get_next_chunk_from(f.readlines()):
with open(os.path.join(__task_output_dir, f"work-chunk-{c}.fasta")) as chunk_file:
write_chunk_into(w, chunk_file)
c += 1
def dag_gen(dsl):
prepare_chunks = dsl.task(
key=f"prepare-chunks"
).inputs(
input_fasta=dsl.file('chimp.fasta')
).outputs(
work_chunks=dsl.fileset('work-chunk.*.fasta')
).calls(
create_n_chunks_of_work
)()
yield prepare_chunks
for _ in dsl.wait_for_tasks(prepare_chunks):
# dsl.wait_for_tasks ensures that we can only get here when prepare_chunks
# has successfully completed
for work_chunk_file_handle in prepare_chunks.outputs.work_chunks.fetch():
# extract number from file name, i.e.
# work-chunk.3.fasta -> 3
chunk_number = work_chunk_file_handle.basename().split(".")[1]
yield dsl.task(
key=f"task-for-chunk.{chunk_number}"
).inputs(
f=work_chunk_file_handle
).outputs(
results_file=dsl.file("results.json")
).calls("""
#!/usr/bin/env bash
echo "work on $f"
""")()
# wait_for_matching_tasks("task-for-chunk.*") ensures that we can only get here when AKK tasks
# matching task-for-chunk-* have successfully completed
for matcher in dsl.wait_for_matching_tasks("task-for-chunk.*"):
yield dsl.task(
key="analyze-all-work-pieces"
).inputs(
# pattern_for_all_chunks is $__pipeline_instance_dir/output/task-for-chunk-*/results.json
pattern_for_all_chunks=matcher.all.results_file.as_glob_expression()
).outputs(
a_result_file=dsl.file("final-result-file")
).calls("""
#!/usr/bin/env bash
# the two following commands are equivalent:
ls $__pipeline_instance_dir/output/task-for-chunk.*/results.json
ls $pattern_for_all_chunks
# the second is way more DRY (Don't Repeat Yourself) !
echo "work on $f"
""").calls(
analyze_all
)()
@DryPipe.python_call()
def analyze_all(pattern_for_all_chunks, a_result_file):
with open(a_result_file, "w") as _a_result_file:
# Because we are in a Python Call, we can iterate over pattern_for_all_chunks().
# It is equivalent to calling :
# glob.glob(os.path.expandvars("$__pipeline_instance_dir/output/task-for-chunk.*/results.json"))
# but way more DRY !
for chunk_file in pattern_for_all_chunks():
some_results = read_from(chunk_file)
a_result_file.write(f"got {some_results} from chunk {chunk_file}")
When NOT to use dsl.wait_for¶
In some cases, a producer/consumer relationship can be expressed by simply passing a produced file or variable in the consumes(…) clause of the downstream task, ex:
dsl.task("key=t2").inputs(z=t1.outputs.abc)
In such cases, dsl.wait_for isn’t necessary. DryPipe will know that the consuming task needs to wait.
The above example shows cases where dsl.wait_for is needed.
The next example shows another case where dsl.wait_for is useful, where the generator function needs to access the actual result (variable x=dsl.var(int)) produced by a task, in order to parametrize a downstream task.
The task “highly-dependent-task” needs task_a.outputs.x to estimate a proper slurm execution time. Other uses cases can easily be imagined.
In the example, “highly-dependent-task” also needs to wait after tasks: dsl.wait_for_matching_tasks(“task-prefix-”, “other-task-prefix-“) just for the sake of showing how waiting after many kinds of tasks is expressed.
The main things to remember:
dsl.wait_for_tasks and dsl.wait_for_matching_tasks are used in conjunction with
forthe
forloop will return an empty iterator when tasks waited upon are NOT completed, and a sigle item otherwise
see also dsl.wait_for_tasks and dsl.wait_for_matching_tasks
def my_dag_generator(dsl):
task_a = dsl.task(
key="task-a"
).outputs(
x=dsl.var(int)
).calls(
some_func
)()
yield task_a
for _ in dsl.wait_for_tasks(task_a, task_b):
# because taskA has completed, we can fetch values from it's produces clause:
actual_x_loaded_from_completed_task = task_a.outputs.x.fetch()
assert isinstance(actual_x_loaded_from_completed_task, int)
for matcher1, matcher2 in dsl.wait_for_matching_tasks("task-prefix-*", "other-task-prefix-*"):
yield dsl.task(
key="highly-dependent-task",
task_conf=TaskConf(
executer_type="slurm",
slurm_account="me",
slurm_options=[
f"--time={estimate_time(actual_x_loaded_from_completed_task)}"
]
)
).inputs(
x=dsl.val
)...
Pipeline composition¶
In any programming languages, composition is a key for minimalism, expressiveness and reusability. It’s also true for DSLs.
Composing DryPipe pipelines is greatly simplified by the fact that they are generator functions.
The next example shows how the DAG generator of two pipelines are combined to create a new DAG generator.
Note1: The tasks from composed sub pipeline will live in the same $__pipeline_instance_dir
Note2: dsl.sub_pipeline(super_duper_pipeline_dag_generator, "s_")
Tasks generated by super_duper_pipeline_dag_generator will have their keys prefixed by “super_duper”, to ensure uniquenes of Task Keys of the new pipeline.
dsl.sub_pipeline returns a sub pipeline that needs to be yielded.
Note3: super_duper.wait_for_tasks("a-super-task")
This will wait for the completion of the task with key=”a-super-task” in the sub pipeline.
It’s a shorthand for calling dsl.wait_for_tasks("s_a-super-task"), as the actual task key in the new composed pipeline will be “s_a-super-task”
The example waits for tasks in other_sub_pipeline with other_sub_pipeline.wait_for_matching_tasks("do-it-*")
and for “task-dependent-on-a-super-task” and results are fed to task “grande-finale”
import super_duper_pipeline_dag_generator
import other_pipeline_dag_gen
def my_coposite_dag_generator(dsl):
super_duper = dsl.sub_pipeline(super_duper_pipeline_dag_generator, "s_")
yield super_duper
other_sub_pipeline = dsl.sub_pipeline(other_pipeline_dag_gen, "o_")
yield other_sub_pipeline
for super_task in super_duper.wait_for_tasks("a-super-task"):
v = super_task.outputs.a_variable
yield dsl.task(
"task-dependent-on-a-super-task"
).inputs(
y=dsl.val(v)
).outputs(
z=dsl.var(int)
).calls(
a_func
)()
for matcher in other_sub_pipeline.wait_for_matching_tasks("do-it-*"):
for task_dependent_on_a_super_task in dsl.wait_for_tasks("task-dependent-on-a-super-task"):
yield dsl.task(
"grande-finale"
).inputs(
z=task_dependent_on_a_super_task.outputs.z,
a_pattern=matcher.all.a_file_defined_in_do_it_tasks.as_glob_expression()
).calls(
grande_finale_func
)()
Directory structure¶
Every task in a pipeline instance has two dedicated directories:
an output directory: $__pipeline_instance_dir/output/$__task_key
a working directory: $__pipeline_instance_dir/.drypipe/$__task_key
Where $__task_key is the key assigned in the task declaration, ex: dsl.task(key="t123")
To avoid name clashes, DryPipe enforces uniqueness of task keys.
Warning
It’s considered bad practice for a task to write in a directory other than it’s dedicated $__task_output_dir (see well behaved tasks)
A two task pipeline with task keys “t1” and “t2”, would have the directory structure below.
$__pipeline_instance_dir
│
└───.drypipe
│ │
│ └────t1
│ │ task
│ │ task-env.sh
│ │ task-conf.json
│ │ state.(completed|launched|step-started.0|failed.0|killed.0|...)
│ │ out.log
│ │ drypipe.log
│ └─t2
│ │ task
│ │ ...
...
└───output
│ │
│ └───t1
│ │ results.txt
│ └───t2
│ │ f.tsv
Task launch script and variables¶
Environment variables are the mechanism by which DryPipe orchestrator parametrizes the tasks in a pipeline instance.
Tasks run as separate processes, and user code (passed in calls(…) clause) transparently receive their inputs and outputs as env variables.
Pipeline specific variables¶
Pipeline specific variables are declared with the consumes(…) and produces(…) clause of a pipeline’s tasks.
All these variables defined in these clauses end up as env variables in the task process.
[Bash Calls] simply refer to them as $my_var, while [python calls] have their vars injected in function args, ex: def f(my_var) (see example)
Generated scripts¶
DryPipe generates the following scripts for every task in a pipeline instance:
$__pipeline_instance_dir/.drypipe/task: the script that runs the task
$__pipeline_instance_dir/.drypipe/task-env.sh: the script that loads the task env vars
$__pipeline_instance_dir/.drypipe/sbatch-launcher.sh: launches ./task as a slurm job
Users normally don’t have to deal with these scripts, but they can be useful for debugging.
A task can be manualy launched by executing the task script $__pipeline_instance_dir/.drypipe/task
Additionally, the task script has these other useful commands:
task kill: will kill the tasktask tail: will do a multi tail -f of all three logs of the task (out.log, drypipe.log)task ps: equivalent to calling “ps -p”
DryPipe variables¶
The following table describing DryPipe assigned environment variables, pipeline coders will rarely need to access them.
They are documented here for the rare cases where they are needed and to help making the documentation more concise. Ex: we can refer to $__task_output_dir instead instead of “the task’s output directory”.
variable name |
description |
equivalent |
|---|---|---|
$__pipeline_instance_dir |
path of the pipeline instance directory |
|
$__task_key |
|
|
$__task_output_dir |
the task’s dedicated output dir |
$__pipeline_instance_dir/output/$__task_key |
$__out_log |
destination of the task’s stdout |
$__pipeline_instance_dir/.drypipe/$__task_key/out.log |
$__scratch_dir |
temp working directory |
$SLURM_TMPDIR if tasks running in Slurm, otherwise $__task_output_dir/scratch |
$__pipeline_code_dir |
path to the instance’s code directory |
defaults to the directory of the python file where the DAG generator is coded, can be overriden. Useful for refering to scripts in a task’s bash snippet |
$__containers_dir |
the directory where containers used by the pipeline reside |
Note: $__pipeline_code_dir, and $__containers_dir are defined at the pipeline level, and can be overriden (see …)
Runtime configuration¶
Task level configs¶
Tasks in a pipeline have runtime configurations, that determin things such as:
where/how tasks are executed (locally launched processes ? Slurm node, ? remote ssh accessible machine)
how dependencies are managed, does a task run in a virtualenv, a conda env, a container ?
A pipeline instance can have tasks run in more than one host (and more than one slurm cluster), and DryPipe will ensure that
the input of every task (specified in it’s consumes(…) clause) is rsynced on the host where it’s executed
the output of every task is downloaded (rsynced) on the orchestrating host (the one running
drypipe run)
from dry_pipe import TaskConf
def my_pipeline_dag_generator(dsl):
# Task will run on the local machine (default task_conf of tasks)
yield dsl.task(
key="t1",
task_conf=TaskConf(
executer_type="process"
)
).calls(f1)()
# Task will run as a slurm job (orchestrating host must be a slurm login node)
yield dsl.task(
key="t2",
task_conf=TaskConf(
executer_type="slurm",
slurm_account="my-slurm-account",
sbatch_options= [
"--time=0:5:00"
],
container="my-container.sif"
)
).calls(f2)()
yield dsl.task(
key="t3",
task_conf=TaskConf(
executer_type="slurm",
ssh_specs="me@a-slurm-login-host:~/.ssh/id_rsa",
container="my-container.sif",
remote_base_dir="/remote-parent-dir-of-pipeline-instance",
remote_containers_dir="/remote-parent-dir-of-container-sif-files",
slurm_account="my-slurm-account",
sbatch_options= [
"--time=0:5:00"
]
)
).calls(f3)()
yield dsl.task(
key="t1",
task_conf=TaskConf(
executer_type="process",
ssh_specs="me@a-slurm-login-host:~/.ssh/id_rsa"
)
).calls(f4)()
Note1: TaskConf of t3 will causes the following:
It will run on host “a-slurm-login-host” in the following directories
/remote-parent-dir-of-pipeline-instance/$__pipeline_instance_dir/output/t3
/remote-parent-dir-of-pipeline-instance/$__pipeline_instance_dir/.drypipe/t3
using the container “my-container.sif” in the remote host in the directory $__remote_containers_dir:
/remote-parent-dir-of-container-sif-files/my-container.sif
Pipeline level configs¶
The following example shows how to override $__pipeline_code_dir and $__containers_dir, as well as the pipeline’s default TaskConf.
task-123 will get the pipeline default TaskConf
task-z will get it’s own TaskConf
from dry_pipe import TaskConf, DryPipe
def my_dag(dsl):
yield dsl.task(key="task-123").calls("""
#!/usr/bin/env bash
echo "hello world"
""")
yield dsl.task(
key="task-z",
task_conf=TaskConf(executer_type="process")
).calls("""
#!/usr/bin/env bash
echo "hello world"
""")
def my_pipeline():
return DryPipe.create_pipeline(
my_dag,
pipeline_code_dir="x/y/z",
containers_dir="a/b/c",
task_conf=TaskConf(
executer_type="slurm",
slurm_account="my-slurm-account",
)
)
DryPipe CLI¶
DryPipe CLI
usage: drypipe [-h] [--v] [--vv] [--dry-run]
{run,prepare,service,upgrade-drypipe,restart-failed-array-tasks,report-execution-times,task,restart,poll-task,remote-exec,fetch-remote-state,upload-drypipe-for-remote-instance,upload-task-inputs,sbatch,sbatch-gen,dump-env,array-submit,array-upload,array-download,array-submit-from-remote,array-watch-from-remote,array-create-parent,list-states,array-rsync-list,call}
...
Positional Arguments¶
- command
Possible choices: run, prepare, service, upgrade-drypipe, restart-failed-array-tasks, report-execution-times, task, restart, poll-task, remote-exec, fetch-remote-state, upload-drypipe-for-remote-instance, upload-task-inputs, sbatch, sbatch-gen, dump-env, array-submit, array-upload, array-download, array-submit-from-remote, array-watch-from-remote, array-create-parent, list-states, array-rsync-list, call
Named Arguments¶
- --v, -v
verbose (logging_level.INFO)
Default:
False- --vv, -vv
very verbose (logging_level.DEBUG)
Default:
False- --dry-run
don’t actualy run, but print what will run (implicit –verbose)
Default:
False
Sub-commands¶
run¶
generate tasks and run the pipeline
drypipe run [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR --generator
GENERATOR [--until PATTERN] [--restart-failed] [--reset-failed]
[--sleep-schedule SLEEP_SCHEDULE]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --generator
<module>:<function> task generator function, can also be set with environment var DRYPIPE_PIPELINE_GENERATOR
- --until
tasks matching PATTERN will not be started
- --restart-failed
failed tasks will be restarted
Default:
False- --reset-failed
failed tasks will be reset and then restarted
Default:
False- --sleep-schedule
a list of sleep times in seconds, for the main loop of the service, can also be set with environment var DRYPIPE_SERVICE_SLEEP_SCHEDULE
Default:
0,1,3,5,10,15,20
prepare¶
generate tasks, WITHOUT running the pipeline
drypipe prepare [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR --generator
GENERATOR [--until PATTERN] [--sleep-schedule SLEEP_SCHEDULE]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --generator
<module>:<function> task generator function, can also be set with environment var DRYPIPE_PIPELINE_GENERATOR
- --until
tasks matching PATTERN will not be started
- --sleep-schedule
a list of sleep times in seconds, for the main loop of the service, can also be set with environment var DRYPIPE_SERVICE_SLEEP_SCHEDULE
Default:
0,1,3,5,10,15,20
service¶
run as service
drypipe service [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--config-generator CONFIG_GENERATOR
[--sleep-schedule SLEEP_SCHEDULE] [--log-conf LOG_CONF]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --config-generator
- a function that yields instances of dry_pipe.pipeline.PipelineType,
can also be set with environment var DRYPIPE_SERVICE_CONFIG_GENERATOR
- --sleep-schedule
a list of sleep times in seconds, for the main loop of the service, can also be set with environment var DRYPIPE_SERVICE_SLEEP_SCHEDULE
Default:
0,1,3,5,10,15,20- --log-conf
the path to a logging configuration file, can also be set with environment var DRYPIPE_LOGGING_CONF
upgrade-drypipe¶
upgrade drypipe version for the specified pipeline instance
drypipe upgrade-drypipe [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
restart-failed-array-tasks¶
restart failed array tasks, of specified array task
drypipe restart-failed-array-tasks [-h] --pipeline-instance-dir
PIPELINE_INSTANCE_DIR --task-key TASK_KEY
[--include-pre-launch]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --include-pre-launch
Also restart tasks that have failed to launch (useful after scancel on an array)
Default:
False
report-execution-times¶
execute time for all tasks, or all tasks matching filter expression
drypipe report-execution-times [-h] --pipeline-instance-dir
PIPELINE_INSTANCE_DIR [--task-key TASK_KEY]
[--filter FILTER]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
- --filter
glob expression to filter tasks
Default:
'*'
task¶
run specified task
drypipe task [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR --task-key
TASK_KEY [--wait] [--tail] [--by-runner] [--from-remote]
[--ssh-remote-dest SSH_REMOTE_DEST]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --wait
wait for task to complete before exiting
Default:
False- --tail
Default:
False- --by-runner
Default:
False- --from-remote
Default:
False- --ssh-remote-dest
example:me@myhost.example.com:/my-directory
restart¶
restart specified task –task-key, at last unsuccessful step. WARNING: if task is completed, will restart from first step
drypipe restart [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR --task-key
TASK_KEY [--at-step AT_STEP] [--reset] [--wait]
[--from-remote]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --at-step
restarts the task at the specified step (zero based).
- --reset
restart the task from the first step, clears the results directory if exists
Default:
False- --wait
wait for task to complete before exiting
Default:
False- --from-remote
Default:
False
poll-task¶
return state of task (used for polling remote tasks)
drypipe poll-task [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
remote-exec¶
execute remote tasks
drypipe remote-exec [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY [--wait]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --wait
wait for task to complete before exiting
Default:
False
fetch-remote-state¶
Undocumented
drypipe fetch-remote-state [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY [--wait]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --wait
wait for task to complete before exiting
Default:
False
upload-drypipe-for-remote-instance¶
Undocumented
drypipe upload-drypipe-for-remote-instance [-h] --pipeline-instance-dir
PIPELINE_INSTANCE_DIR --task-key
TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
upload-task-inputs¶
Undocumented
drypipe upload-task-inputs [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
sbatch¶
launch task (specified by –task-key) with sbatch
drypipe sbatch [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR --task-key
TASK_KEY [--wait]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --wait
wait for task to complete before exiting
Default:
False
sbatch-gen¶
print sbatch command for launching task, without invoking it
drypipe sbatch-gen [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
dump-env¶
dump all environment variables of specified task
drypipe dump-env [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR --task-key
TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
array-submit¶
submit array task
drypipe array-submit [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY [--limit N]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --limit
limit submitted array size to N tasks
array-upload¶
upload array task to remote location
drypipe array-upload [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
array-download¶
download all array tasks results (rsync or Globus fetch all __task_output_dir of child tasks)
drypipe array-download [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
array-submit-from-remote¶
submit array task to remote location
drypipe array-submit-from-remote [-h] --pipeline-instance-dir
PIPELINE_INSTANCE_DIR --task-key TASK_KEY
[--wait]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --wait
wait for task to complete before exiting
Default:
False
array-watch-from-remote¶
watch (poll and fetch children status) of array task at remote location
drypipe array-watch-from-remote [-h] --pipeline-instance-dir
PIPELINE_INSTANCE_DIR --task-key TASK_KEY
[--wait]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --wait
wait for task to complete before exiting
Default:
False
array-create-parent¶
create a parent array task with matching tasks
drypipe array-create-parent [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
list-states¶
Undocumented
drypipe list-states [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY [--gen-rsync-list]
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
- --gen-rsync-list
generate rsync list for file sets
Default:
False
array-rsync-list¶
Undocumented
drypipe array-rsync-list [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR
--task-key TASK_KEY
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
call¶
Undocumented
drypipe call [-h] --pipeline-instance-dir PIPELINE_INSTANCE_DIR --task-key
TASK_KEY
module_function
Positional Arguments¶
- module_function
Named Arguments¶
- --pipeline-instance-dir, -pid
pipeline instance directory, can also be set with environment var DRYPIPE_PIPELINE_INSTANCE_DIR
- --task-key, -k
task key
Definitions¶
DAG: directed acyclic graph
Orchestrating Host: the computer where
drypipe runordrypipe watchis executedPipeline: refers to the code of a pipeline, and to the object returned by
DryPipe.create_pipeline(my_dag_gen)Pipeline Instance: the execution of a pipeline over a dataset. A pipeline run over two datasets have two instances
Bash call: a bash snippet in the calls(…) clause of a task
PythonCall: a python function annotated with DryPipe.python_call() in the calls(…) clause of a task
Pipeline Input Dataset: pipeline instances execute over pre existing data, most often files (DryPipe assumes no particular format). We refer to it as the pipeline instance’s input dataset.