Skip to contents

Adds a named step to the pipeline. Each step is a function whose parameters either hold constant defaults or reference the output of a prior step using formula notation (~step_name). Dependencies are validated when the step is added.

Usage

pip_add(
  x,
  step,
  fun,
  group = step,
  tags = character(0),
  after = length(x),
  exec = "auto"
)

Arguments

x

A pipeflow pipeline object.

step

Unique step name.

fun

Function to execute for the step. Each function parameter must have a default value. Default values that are simple constants are resolved immediately. Default values that are formulas like ~other_step are treated as dependencies to those steps and resolved to the respective output values at runtime once the step is executed.

group

Optional character label used for grouping output collections - see also [pip_collect_out()].

tags

Optional character vector of tags belonging to the step. Can also be adjusted later using [pip_tag()].

after

Optional position after which the new step should be inserted (defaults to last position). Can be a step name or an integer index. If set to 0, the new step will be inserted at the beginning of the pipeline.

exec

Execution mode for this step. One of "auto", "split", "reduce" or "plain". Using execution mode exec = split, the output of the step is marked as partitioned output. In this mode, any step that depends on the split step (directly or indirectly) will have its output automatically mapped partition-wise during step execution. The reduce mode expects partitioned input and passes it through without mapping, while plain mode only accepts non-partitioned input and always intends to execute a single call. In summary:

  • auto: map if partitioned input appears, otherwise single call

  • split: single call, then mark output as partitioned

  • reduce: single call, but only valid with partitioned input

  • plain: single call, only valid with non-partitioned input

Value

The updated pipeline, invisibly.

Details

If after was specified, the new step will be inserted after the given step or position. Be aware that in contrast to adding a step at the end, inserting a step in the middle is a rather expensive operation as it requires re-wiring parts of the internal pipeline structure, especially if the new step is inserted at an early position.

Examples

# --- Groups, tags, and view filtering ---
p <- pip_new("analysis") |>
  pip_add("load", \(n = 5) seq_len(n),
    group = "io", tags = "raw"
  ) |>
  pip_add("clean", \(x = ~load) x * 2,
    group = "io", tags = "process"
  ) |>
  pip_add("fit", \(x = ~clean) sum(x),
    group = "model", tags = c("core", "daily")
  ) |>
  pip_add("report", \(x = ~fit) paste("result:", x),
    group = "model", tags = "report"
  )

pip_run(p)
#> info [2026-06-07 15:34:06.128 UTC]: Start run of pipeflow_pip 'analysis'
#> info [2026-06-07 15:34:06.128 UTC]: Step 1/4 load
#> info [2026-06-07 15:34:06.129 UTC]: Step 2/4 clean
#> info [2026-06-07 15:34:06.131 UTC]: Step 3/4 fit
#> info [2026-06-07 15:34:06.132 UTC]: Step 4/4 report
#> info [2026-06-07 15:34:06.134 UTC]: Finished run of pipeflow_pip 'analysis'
p
#> <pipeflow_pip> analysis (4 steps)
#> ---------------------------------
#>      step group depends            out state       tags
#> 1:   load    io              1,2,3,4,5  done        raw
#> 2:  clean    io    load  2, 4, 6, 8,10  done    process
#> 3:    fit model   clean             30  done core,daily
#> 4: report model     fit     result: 30  done     report

# Filter by tag using pip_view — keeps steps with any matching tag
pip_view(p, tags = "daily")
#> <pipeflow_view> analysis view (1 of 4 steps)
#> --------------------------------------------
#>  step group depends out state       tags
#>   fit model   clean  30  done core,daily
pip_view(p, tags = "core")
#> <pipeflow_view> analysis view (1 of 4 steps)
#> --------------------------------------------
#>  step group depends out state       tags
#>   fit model   clean  30  done core,daily
pip_view(p, tags = c("raw", "report"))
#> <pipeflow_view> analysis view (2 of 4 steps)
#> --------------------------------------------
#>    step group depends        out state   tags
#>    load    io          1,2,3,4,5  done    raw
#>  report model     fit result: 30  done report

# --- Split / reduce execution modes ---
q <- pip_new("split-demo") |>
  pip_add("data", \(x = iris) x) |>
  pip_add("split", \(x = ~data) split(x, x$Species),
    exec = "split"
  ) |>
  pip_add("stats", \(x = ~split) summary(x)) |>
  pip_add("combine", \(x = ~stats) do.call(rbind, x),
    exec = "reduce"
  )

pip_run(q)
#> info [2026-06-07 15:34:06.154 UTC]: Start run of pipeflow_pip 'split-demo'
#> info [2026-06-07 15:34:06.154 UTC]: Step 1/4 data
#> info [2026-06-07 15:34:06.155 UTC]: Step 2/4 split
#> info [2026-06-07 15:34:06.156 UTC]: Step 3/4 stats
#> info [2026-06-07 15:34:06.161 UTC]: Step 4/4 combine
#> info [2026-06-07 15:34:06.163 UTC]: Finished run of pipeflow_pip 'split-demo'
q[["stats", "out"]]   # partitioned list — one summary per species
#> $setosa
#>   Sepal.Length    Sepal.Width     Petal.Length    Petal.Width   
#>  Min.   :4.300   Min.   :2.300   Min.   :1.000   Min.   :0.100  
#>  1st Qu.:4.800   1st Qu.:3.200   1st Qu.:1.400   1st Qu.:0.200  
#>  Median :5.000   Median :3.400   Median :1.500   Median :0.200  
#>  Mean   :5.006   Mean   :3.428   Mean   :1.462   Mean   :0.246  
#>  3rd Qu.:5.200   3rd Qu.:3.675   3rd Qu.:1.575   3rd Qu.:0.300  
#>  Max.   :5.800   Max.   :4.400   Max.   :1.900   Max.   :0.600  
#>        Species  
#>  setosa    :50  
#>  versicolor: 0  
#>  virginica : 0  
#>                 
#>                 
#>                 
#> 
#> $versicolor
#>   Sepal.Length    Sepal.Width     Petal.Length   Petal.Width          Species  
#>  Min.   :4.900   Min.   :2.000   Min.   :3.00   Min.   :1.000   setosa    : 0  
#>  1st Qu.:5.600   1st Qu.:2.525   1st Qu.:4.00   1st Qu.:1.200   versicolor:50  
#>  Median :5.900   Median :2.800   Median :4.35   Median :1.300   virginica : 0  
#>  Mean   :5.936   Mean   :2.770   Mean   :4.26   Mean   :1.326                  
#>  3rd Qu.:6.300   3rd Qu.:3.000   3rd Qu.:4.60   3rd Qu.:1.500                  
#>  Max.   :7.000   Max.   :3.400   Max.   :5.10   Max.   :1.800                  
#> 
#> $virginica
#>   Sepal.Length    Sepal.Width     Petal.Length    Petal.Width   
#>  Min.   :4.900   Min.   :2.200   Min.   :4.500   Min.   :1.400  
#>  1st Qu.:6.225   1st Qu.:2.800   1st Qu.:5.100   1st Qu.:1.800  
#>  Median :6.500   Median :3.000   Median :5.550   Median :2.000  
#>  Mean   :6.588   Mean   :2.974   Mean   :5.552   Mean   :2.026  
#>  3rd Qu.:6.900   3rd Qu.:3.175   3rd Qu.:5.875   3rd Qu.:2.300  
#>  Max.   :7.900   Max.   :3.800   Max.   :6.900   Max.   :2.500  
#>        Species  
#>  setosa    : 0  
#>  versicolor: 0  
#>  virginica :50  
#>                 
#>                 
#>                 
#> 
#> attr(,"class")
#> [1] "list"                 "pipeflow_partitioned"
q[["combine", "out"]] # combined table
#>   Sepal.Length      Sepal.Width       Petal.Length      Petal.Width     
#>  "Min.   :4.300  " "Min.   :2.300  " "Min.   :1.000  " "Min.   :0.100  "
#>  "1st Qu.:4.800  " "1st Qu.:3.200  " "1st Qu.:1.400  " "1st Qu.:0.200  "
#>  "Median :5.000  " "Median :3.400  " "Median :1.500  " "Median :0.200  "
#>  "Mean   :5.006  " "Mean   :3.428  " "Mean   :1.462  " "Mean   :0.246  "
#>  "3rd Qu.:5.200  " "3rd Qu.:3.675  " "3rd Qu.:1.575  " "3rd Qu.:0.300  "
#>  "Max.   :5.800  " "Max.   :4.400  " "Max.   :1.900  " "Max.   :0.600  "
#>  "Min.   :4.900  " "Min.   :2.000  " "Min.   :3.00  "  "Min.   :1.000  "
#>  "1st Qu.:5.600  " "1st Qu.:2.525  " "1st Qu.:4.00  "  "1st Qu.:1.200  "
#>  "Median :5.900  " "Median :2.800  " "Median :4.35  "  "Median :1.300  "
#>  "Mean   :5.936  " "Mean   :2.770  " "Mean   :4.26  "  "Mean   :1.326  "
#>  "3rd Qu.:6.300  " "3rd Qu.:3.000  " "3rd Qu.:4.60  "  "3rd Qu.:1.500  "
#>  "Max.   :7.000  " "Max.   :3.400  " "Max.   :5.10  "  "Max.   :1.800  "
#>  "Min.   :4.900  " "Min.   :2.200  " "Min.   :4.500  " "Min.   :1.400  "
#>  "1st Qu.:6.225  " "1st Qu.:2.800  " "1st Qu.:5.100  " "1st Qu.:1.800  "
#>  "Median :6.500  " "Median :3.000  " "Median :5.550  " "Median :2.000  "
#>  "Mean   :6.588  " "Mean   :2.974  " "Mean   :5.552  " "Mean   :2.026  "
#>  "3rd Qu.:6.900  " "3rd Qu.:3.175  " "3rd Qu.:5.875  " "3rd Qu.:2.300  "
#>  "Max.   :7.900  " "Max.   :3.800  " "Max.   :6.900  " "Max.   :2.500  "
#>        Species    
#>  "setosa    :50  "
#>  "versicolor: 0  "
#>  "virginica : 0  "
#>  NA               
#>  NA               
#>  NA               
#>  "setosa    : 0  "
#>  "versicolor:50  "
#>  "virginica : 0  "
#>  NA               
#>  NA               
#>  NA               
#>  "setosa    : 0  "
#>  "versicolor: 0  "
#>  "virginica :50  "
#>  NA               
#>  NA               
#>  NA