Runs all new and/or outdated pipeline steps.
Usage
pipe_run(
pip,
force = FALSE,
recursive = TRUE,
cleanUnkept = FALSE,
progress = NULL,
showLog = TRUE
)Arguments
- pip
Pipelineobject- force
logicalifTRUEall steps are run regardless of whether they are outdated or not.- recursive
logicalifTRUEand a step returns a new pipeline, the run of the current pipeline is aborted and the new pipeline is run recursively.- cleanUnkept
logicalifTRUEall output that was not marked to be kept is removed after the pipeline run. This option can be useful if temporary results require a lot of memory.- progress
functionthis parameter can be used to provide a custom progress function of the formfunction(value, detail), which will show the progress of the pipeline run for each step, wherevalueis the current step number anddetailis the name of the step.- showLog
logicalshould the steps be logged during the pipeline run?
Examples
# Simple pipeline
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "add2", \(x = ~add1, z = 2) x + z)
pipe_add(p, "final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p |> pipe_run() |> pipe_collect_out()
#> INFO [2025-07-27 10:40:31.564] Start run of 'pipe' pipeline:
#> INFO [2025-07-27 10:40:31.565] Step 1/4 data
#> INFO [2025-07-27 10:40:31.568] Step 2/4 add1
#> INFO [2025-07-27 10:40:31.570] Step 3/4 add2
#> INFO [2025-07-27 10:40:31.572] Step 4/4 final
#> INFO [2025-07-27 10:40:31.573] Finished execution of steps.
#> INFO [2025-07-27 10:40:31.574] Done.
#> $final
#> [1] 8
#>
pipe_set_params(p, list(z = 4)) # outdates steps add2 and final
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data 1 FALSE data Done
#> 2: add1 data 2 FALSE add1 Done
#> 3: add2 add1 4 FALSE add2 Outdated
#> 4: final add1,add2 8 TRUE final Outdated
p |> pipe_run() |> pipe_collect_out()
#> INFO [2025-07-27 10:40:31.584] Start run of 'pipe' pipeline:
#> INFO [2025-07-27 10:40:31.585] Step 1/4 data - skip 'done' step
#> INFO [2025-07-27 10:40:31.586] Step 2/4 add1 - skip 'done' step
#> INFO [2025-07-27 10:40:31.586] Step 3/4 add2
#> INFO [2025-07-27 10:40:31.588] Step 4/4 final
#> INFO [2025-07-27 10:40:31.589] Finished execution of steps.
#> INFO [2025-07-27 10:40:31.590] Done.
#> $final
#> [1] 12
#>
pipe_run(p, cleanUnkept = TRUE)
#> INFO [2025-07-27 10:40:31.592] Start run of 'pipe' pipeline:
#> INFO [2025-07-27 10:40:31.593] Step 1/4 data - skip 'done' step
#> INFO [2025-07-27 10:40:31.594] Step 2/4 add1 - skip 'done' step
#> INFO [2025-07-27 10:40:31.595] Step 3/4 add2 - skip 'done' step
#> INFO [2025-07-27 10:40:31.596] Step 4/4 final - skip 'done' step
#> INFO [2025-07-27 10:40:31.596] Finished execution of steps.
#> INFO [2025-07-27 10:40:31.597] Clean temporary results.
#> INFO [2025-07-27 10:40:31.597] Done.
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data [NULL] FALSE data Outdated
#> 2: add1 data [NULL] FALSE add1 Outdated
#> 3: add2 add1 [NULL] FALSE add2 Outdated
#> 4: final add1,add2 12 TRUE final Done
# Recursive pipeline (for advanced users)
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "new_pipe", \(x = ~add1) {
p2 <- pipe_new("new_pipe", data = x)
pipe_add(p2, "add1", \(x = ~data) x + 1)
pipe_add(p2, "add2", \(x = ~add1) x + 2, keepOut = TRUE)
}
)
p |> pipe_run() |> pipe_collect_out()
#> INFO [2025-07-27 10:40:31.605] Start run of 'pipe' pipeline:
#> INFO [2025-07-27 10:40:31.605] Step 1/3 data
#> INFO [2025-07-27 10:40:31.607] Step 2/3 add1
#> INFO [2025-07-27 10:40:31.609] Step 3/3 new_pipe
#> INFO [2025-07-27 10:40:31.613] Abort pipeline execution and restart on new.
#> INFO [2025-07-27 10:40:31.614] Start run of 'new_pipe' pipeline:
#> INFO [2025-07-27 10:40:31.618] Step 1/3 data
#> INFO [2025-07-27 10:40:31.621] Step 2/3 add1
#> INFO [2025-07-27 10:40:31.623] Step 3/3 add2
#> INFO [2025-07-27 10:40:31.624] Finished execution of steps.
#> INFO [2025-07-27 10:40:31.625] Done.
#> $add2
#> [1] 5
#>
# Run pipeline with progress bar
p <- pipe_new("pipe", data = 1)
pipe_add(p, "first step", \() Sys.sleep(0.5))
pipe_add(p, "second step", \() Sys.sleep(0.5))
pipe_add(p, "last step", \() Sys.sleep(0.5))
pb <- txtProgressBar(min = 1, max = pipe_length(p), style = 3)
fprogress <- function(value, detail) {
setTxtProgressBar(pb, value)
}
pipe_run(p, progress = fprogress, showLog = FALSE)
#>
|
| | 0%
|
|======================= | 33%
|
|=============================================== | 67%
|
|======================================================================| 100%
