This function can be used to apply the pipeline repeatedly to various data sets. For this, the pipeline split-copies itself by the list of given data sets. Each sub-pipeline will have one of the data sets set as input data. The step names of the sub-pipelines will be the original step names plus the name of the data set.
Usage
pipe_set_data_split(
pip,
dataList,
toStep = character(),
groupBySplit = TRUE,
sep = "."
)Arguments
- pip
Pipelineobject- dataList
listof data sets- toStep
stringstep name marking optional subset of the pipeline, to which the data split should be applied to.- groupBySplit
logicalwhether to set step groups according to data split.- sep
stringseparator to be used between step name and data set name when creating the new step names.
Examples
# Split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipe_set_data_split(p, dataList)
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data.a [NULL] FALSE a New
#> 2: add1.a data.a [NULL] TRUE a Outdated
#> 3: mult.a data.a,add1.a [NULL] TRUE a Outdated
#> 4: data.b [NULL] FALSE b New
#> 5: add1.b data.b [NULL] TRUE b Outdated
#> 6: mult.b data.b,add1.b [NULL] TRUE b Outdated
#> 7: data.c [NULL] FALSE c New
#> 8: add1.c data.c [NULL] TRUE c Outdated
#> 9: mult.c data.c,add1.c [NULL] TRUE c Outdated
p |> pipe_run() |> pipe_collect_out() |> str()
#> INFO [2025-07-27 10:40:36.789] Start run of 'pipe' pipeline:
#> INFO [2025-07-27 10:40:36.791] Step 1/9 data.a
#> INFO [2025-07-27 10:40:36.793] Step 2/9 add1.a
#> INFO [2025-07-27 10:40:36.795] Step 3/9 mult.a
#> INFO [2025-07-27 10:40:36.796] Step 4/9 data.b
#> INFO [2025-07-27 10:40:36.798] Step 5/9 add1.b
#> INFO [2025-07-27 10:40:36.800] Step 6/9 mult.b
#> INFO [2025-07-27 10:40:36.802] Step 7/9 data.c
#> INFO [2025-07-27 10:40:36.804] Step 8/9 add1.c
#> INFO [2025-07-27 10:40:36.806] Step 9/9 mult.c
#> INFO [2025-07-27 10:40:36.807] Finished execution of steps.
#> INFO [2025-07-27 10:40:36.808] Done.
#> List of 3
#> $ a:List of 2
#> ..$ add1.a: num 2
#> ..$ mult.a: num 2
#> $ b:List of 2
#> ..$ add1.b: num 3
#> ..$ mult.b: num 6
#> $ c:List of 2
#> ..$ add1.c: num 4
#> ..$ mult.c: num 12
# Don't group output by split
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipe_set_data_split(p, dataList, groupBySplit = FALSE)
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data.a [NULL] FALSE data.a New
#> 2: add1.a data.a [NULL] TRUE add1.a Outdated
#> 3: mult.a data.a,add1.a [NULL] TRUE mult.a Outdated
#> 4: data.b [NULL] FALSE data.b New
#> 5: add1.b data.b [NULL] TRUE add1.b Outdated
#> 6: mult.b data.b,add1.b [NULL] TRUE mult.b Outdated
#> 7: data.c [NULL] FALSE data.c New
#> 8: add1.c data.c [NULL] TRUE add1.c Outdated
#> 9: mult.c data.c,add1.c [NULL] TRUE mult.c Outdated
p |> pipe_run() |> pipe_collect_out() |> str()
#> INFO [2025-07-27 10:40:36.836] Start run of 'pipe' pipeline:
#> INFO [2025-07-27 10:40:36.837] Step 1/9 data.a
#> INFO [2025-07-27 10:40:36.839] Step 2/9 add1.a
#> INFO [2025-07-27 10:40:36.841] Step 3/9 mult.a
#> INFO [2025-07-27 10:40:36.843] Step 4/9 data.b
#> INFO [2025-07-27 10:40:36.845] Step 5/9 add1.b
#> INFO [2025-07-27 10:40:36.846] Step 6/9 mult.b
#> INFO [2025-07-27 10:40:36.848] Step 7/9 data.c
#> INFO [2025-07-27 10:40:36.851] Step 8/9 add1.c
#> INFO [2025-07-27 10:40:36.852] Step 9/9 mult.c
#> INFO [2025-07-27 10:40:36.854] Finished execution of steps.
#> INFO [2025-07-27 10:40:36.854] Done.
#> List of 6
#> $ add1.a: num 2
#> $ mult.a: num 2
#> $ add1.b: num 3
#> $ mult.b: num 6
#> $ add1.c: num 4
#> $ mult.c: num 12
# Split up to certain step
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y)
pipe_add(p, "average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data [NULL] FALSE data New
#> 2: add1 data [NULL] FALSE add1 New
#> 3: mult data,add1 [NULL] FALSE mult New
#> 4: average_result mult [NULL] TRUE average_result New
pipe_get_depends(p)[["average_result"]]
#> x
#> "mult"
pipe_set_data_split(p, dataList, toStep = "mult")
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data.a [NULL] FALSE a New
#> 2: add1.a data.a [NULL] FALSE a Outdated
#> 3: mult.a data.a,add1.a [NULL] FALSE a Outdated
#> 4: data.b [NULL] FALSE b New
#> 5: add1.b data.b [NULL] FALSE b Outdated
#> 6: mult.b data.b,add1.b [NULL] FALSE b Outdated
#> 7: data.c [NULL] FALSE c New
#> 8: add1.c data.c [NULL] FALSE c Outdated
#> 9: mult.c data.c,add1.c [NULL] FALSE c Outdated
#> 10: average_result <list[1]> [NULL] TRUE average_result New
pipe_get_depends(p)[["average_result"]]
#> $x
#> [1] "mult.a" "mult.b" "mult.c"
#>
p |> pipe_run() |> pipe_collect_out() |> str()
#> INFO [2025-07-27 10:40:36.887] Start run of 'pipe' pipeline:
#> INFO [2025-07-27 10:40:36.887] Step 1/10 data.a
#> INFO [2025-07-27 10:40:36.890] Step 2/10 add1.a
#> INFO [2025-07-27 10:40:36.892] Step 3/10 mult.a
#> INFO [2025-07-27 10:40:36.894] Step 4/10 data.b
#> INFO [2025-07-27 10:40:36.897] Step 5/10 add1.b
#> INFO [2025-07-27 10:40:36.899] Step 6/10 mult.b
#> INFO [2025-07-27 10:40:36.901] Step 7/10 data.c
#> INFO [2025-07-27 10:40:36.907] Step 8/10 add1.c
#> INFO [2025-07-27 10:40:36.910] Step 9/10 mult.c
#> INFO [2025-07-27 10:40:36.912] Step 10/10 average_result
#> INFO [2025-07-27 10:40:36.913] Finished execution of steps.
#> INFO [2025-07-27 10:40:36.914] Done.
#> List of 1
#> $ average_result: num 6.67
