Runs all new and/or outdated pipeline steps.
Usage
pipe_run(
pip,
force = FALSE,
recursive = TRUE,
cleanUnkept = FALSE,
progress = NULL,
showLog = TRUE
)
Arguments
- pip
Pipeline
object- force
logical
ifTRUE
all steps are run regardless of whether they are outdated or not.- recursive
logical
ifTRUE
and a step returns a new pipeline, the run of the current pipeline is aborted and the new pipeline is run recursively.- cleanUnkept
logical
ifTRUE
all output that was not marked to be kept is removed after the pipeline run. This option can be useful if temporary results require a lot of memory.- progress
function
this parameter can be used to provide a custom progress function of the formfunction(value, detail)
, which will show the progress of the pipeline run for each step, wherevalue
is the current step number anddetail
is the name of the step.- showLog
logical
should the steps be logged during the pipeline run?
Examples
# Simple pipeline
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "add2", \(x = ~add1, z = 2) x + z)
pipe_add(p, "final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p |> pipe_run() |> pipe_collect_out()
#> INFO [2025-01-03 19:12:45.256] Start run of 'pipe' pipeline:
#> INFO [2025-01-03 19:12:45.257] Step 1/4 data
#> INFO [2025-01-03 19:12:45.260] Step 2/4 add1
#> INFO [2025-01-03 19:12:45.262] Step 3/4 add2
#> INFO [2025-01-03 19:12:45.264] Step 4/4 final
#> INFO [2025-01-03 19:12:45.265] Finished execution of steps.
#> INFO [2025-01-03 19:12:45.266] Done.
#> $final
#> [1] 8
#>
pipe_set_params(p, list(z = 4)) # outdates steps add2 and final
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data 1 FALSE data Done
#> 2: add1 data 2 FALSE add1 Done
#> 3: add2 add1 4 FALSE add2 Outdated
#> 4: final add1,add2 8 TRUE final Outdated
p |> pipe_run() |> pipe_collect_out()
#> INFO [2025-01-03 19:12:45.274] Start run of 'pipe' pipeline:
#> INFO [2025-01-03 19:12:45.275] Step 1/4 data - skip 'done' step
#> INFO [2025-01-03 19:12:45.276] Step 2/4 add1 - skip 'done' step
#> INFO [2025-01-03 19:12:45.277] Step 3/4 add2
#> INFO [2025-01-03 19:12:45.279] Step 4/4 final
#> INFO [2025-01-03 19:12:45.280] Finished execution of steps.
#> INFO [2025-01-03 19:12:45.280] Done.
#> $final
#> [1] 12
#>
pipe_run(p, cleanUnkept = TRUE)
#> INFO [2025-01-03 19:12:45.282] Start run of 'pipe' pipeline:
#> INFO [2025-01-03 19:12:45.283] Step 1/4 data - skip 'done' step
#> INFO [2025-01-03 19:12:45.284] Step 2/4 add1 - skip 'done' step
#> INFO [2025-01-03 19:12:45.285] Step 3/4 add2 - skip 'done' step
#> INFO [2025-01-03 19:12:45.286] Step 4/4 final - skip 'done' step
#> INFO [2025-01-03 19:12:45.286] Finished execution of steps.
#> INFO [2025-01-03 19:12:45.287] Clean temporary results.
#> INFO [2025-01-03 19:12:45.287] Done.
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data [NULL] FALSE data Outdated
#> 2: add1 data [NULL] FALSE add1 Outdated
#> 3: add2 add1 [NULL] FALSE add2 Outdated
#> 4: final add1,add2 12 TRUE final Done
# Recursive pipeline (for advanced users)
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "new_pipe", \(x = ~add1) {
p2 <- pipe_new("new_pipe", data = x)
pipe_add(p2, "add1", \(x = ~data) x + 1)
pipe_add(p2, "add2", \(x = ~add1) x + 2, keepOut = TRUE)
}
)
p |> pipe_run() |> pipe_collect_out()
#> INFO [2025-01-03 19:12:45.295] Start run of 'pipe' pipeline:
#> INFO [2025-01-03 19:12:45.296] Step 1/3 data
#> INFO [2025-01-03 19:12:45.298] Step 2/3 add1
#> INFO [2025-01-03 19:12:45.300] Step 3/3 new_pipe
#> INFO [2025-01-03 19:12:45.303] Abort pipeline execution and restart on new.
#> INFO [2025-01-03 19:12:45.304] Start run of 'new_pipe' pipeline:
#> INFO [2025-01-03 19:12:45.305] Step 1/3 data
#> INFO [2025-01-03 19:12:45.307] Step 2/3 add1
#> INFO [2025-01-03 19:12:45.309] Step 3/3 add2
#> INFO [2025-01-03 19:12:45.310] Finished execution of steps.
#> INFO [2025-01-03 19:12:45.313] Done.
#> $add2
#> [1] 5
#>
# Run pipeline with progress bar
p <- pipe_new("pipe", data = 1)
pipe_add(p, "first step", \() Sys.sleep(0.5))
pipe_add(p, "second step", \() Sys.sleep(0.5))
pipe_add(p, "last step", \() Sys.sleep(0.5))
pb <- txtProgressBar(min = 1, max = pipe_length(p), style = 3)
fprogress <- function(value, detail) {
setTxtProgressBar(pb, value)
}
pipe_run(p, progress = fprogress, showLog = FALSE)
#>
|
| | 0%
|
|======================= | 33%
|
|=============================================== | 67%
|
|======================================================================| 100%