A pipeline consists of a series of steps, which usually
are added one by one. Each step is made up of a function computing
something once the pipeline is run. This function can be an existing
R function (e.g. mean()
) or an anonymous/lambda function specifically
defined for the pipeline. One useful feature is that function
parameters can refer to results of earlier pipeline steps using the
syntax x = ~earlier_step_name
- see the Examples for more details.
Usage
pipe_add(
pip,
step,
fun,
params = list(),
description = "",
group = step,
keepOut = FALSE
)
Arguments
- pip
Pipeline
object- step
string
the name of the step. Each step name must be unique.- fun
function
or name of the function to be applied at the step. Both existing and anonymous/lambda functions can be used. All function parameters must have default values. If a parameter is missing a default value in the function signature, alternatively, it can be set via theparams
argument (see Examples section withmean()
function).- params
list
list of parameters to set or overwrite parameters of the passed function.- description
string
optional description of the step- group
string
output collected after pipeline execution (seepipe_collect_out()
is grouped by the defined group names. By default, this is the name of the step, which comes in handy when the pipeline is copy-appended multiple times to keep the results of the same function/step grouped at one place.- keepOut
logical
ifFALSE
(default) the output of the step is not collected when callingpipe_collect_out()
after the pipeline run. This option is used to only keep the results that matter and skip intermediate results that are not needed. See also functionpipe_collect_out()
for more details.
Examples
# Add steps with lambda functions
p <- pipe_new("myPipe", data = 1)
pipe_add(p, "s1", \(x = ~data) 2*x) # use input data
pipe_add(p, "s2", \(x = ~data, y = ~s1) x * y)
try(pipe_add(p, "s2", \(z = 3) 3)) # error: step 's2' exists already
#> Error : step 's2' already exists
try(pipe_add(p, "s3", \(z = ~foo) 3)) # dependency 'foo' not found
#> Error : step 's3': dependency 'foo' not found
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data [NULL] FALSE data New
#> 2: s1 data [NULL] FALSE s1 New
#> 3: s2 data,s1 [NULL] FALSE s2 New
# Add step with existing function
p <- pipe_new("myPipe", data = c(1, 2, NA, 3, 4))
try(pipe_add(p, "calc_mean", mean)) # default value for x is missing
#> Error : 'x' parameter(s) must have default values
pipe_add(p, "calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
p |> pipe_run() |> pipe_get_out("calc_mean")
#> INFO [2025-01-03 19:12:40.030] Start run of 'myPipe' pipeline:
#> INFO [2025-01-03 19:12:40.031] Step 1/2 data
#> INFO [2025-01-03 19:12:40.033] Step 2/2 calc_mean
#> INFO [2025-01-03 19:12:40.034] Finished execution of steps.
#> INFO [2025-01-03 19:12:40.034] Done.
#> [1] 2.5
# Step description
p <- pipe_new("myPipe", data = 1:10)
pipe_add(p, "s1", \(x = ~data) 2*x, description = "multiply by 2")
print(p, verbose = TRUE) # print all columns including description
#> step fun funcName params depends out keepOut group
#> <char> <list> <char> <list> <list> <list> <lgcl> <char>
#> 1: data <function[1]> function <list[0]> [NULL] FALSE data
#> 2: s1 <function[1]> function <list[1]> data [NULL] FALSE s1
#> description time state
#> <char> <POSc> <char>
#> 1: 2025-01-03 19:12:40 New
#> 2: multiply by 2 2025-01-03 19:12:40 New
# Group output
p <- pipe_new("myPipe", data = data.frame(x = 1:2, y = 3:4))
pipe_add(p, "prep_x", \(data = ~data) data$x, group = "prep")
pipe_add(p, "prep_y", \(data = ~data) (data$y)^2, group = "prep")
pipe_add(p, "sum", \(x = ~prep_x, y = ~prep_y) x + y)
p |> pipe_run() |> pipe_collect_out(all = TRUE)
#> INFO [2025-01-03 19:12:40.047] Start run of 'myPipe' pipeline:
#> INFO [2025-01-03 19:12:40.048] Step 1/4 data
#> INFO [2025-01-03 19:12:40.051] Step 2/4 prep_x
#> INFO [2025-01-03 19:12:40.053] Step 3/4 prep_y
#> INFO [2025-01-03 19:12:40.055] Step 4/4 sum
#> INFO [2025-01-03 19:12:40.056] Finished execution of steps.
#> INFO [2025-01-03 19:12:40.056] Done.
#> $data
#> x y
#> 1 1 3
#> 2 2 4
#>
#> $prep
#> $prep$prep_x
#> [1] 1 2
#>
#> $prep$prep_y
#> [1] 9 16
#>
#>
#> $sum
#> [1] 10 18
#>