This function can be used to apply the pipeline repeatedly to various data sets. For this, the pipeline split-copies itself by the list of given data sets. Each sub-pipeline will have one of the data sets set as input data. The step names of the sub-pipelines will be the original step names plus the name of the data set.
Usage
pipe_set_data_split(
pip,
dataList,
toStep = character(),
groupBySplit = TRUE,
sep = "."
)
Arguments
- pip
Pipeline
object- dataList
list
of data sets- toStep
string
step name marking optional subset of the pipeline, to which the data split should be applied to.- groupBySplit
logical
whether to set step groups according to data split.- sep
string
separator to be used between step name and data set name when creating the new step names.
Examples
# Split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipe_set_data_split(p, dataList)
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data.a [NULL] FALSE a New
#> 2: add1.a data.a [NULL] TRUE a Outdated
#> 3: mult.a data.a,add1.a [NULL] TRUE a Outdated
#> 4: data.b [NULL] FALSE b New
#> 5: add1.b data.b [NULL] TRUE b Outdated
#> 6: mult.b data.b,add1.b [NULL] TRUE b Outdated
#> 7: data.c [NULL] FALSE c New
#> 8: add1.c data.c [NULL] TRUE c Outdated
#> 9: mult.c data.c,add1.c [NULL] TRUE c Outdated
p |> pipe_run() |> pipe_collect_out() |> str()
#> INFO [2025-01-03 19:12:47.669] Start run of 'pipe' pipeline:
#> INFO [2025-01-03 19:12:47.670] Step 1/9 data.a
#> INFO [2025-01-03 19:12:47.678] Step 2/9 add1.a
#> INFO [2025-01-03 19:12:47.681] Step 3/9 mult.a
#> INFO [2025-01-03 19:12:47.684] Step 4/9 data.b
#> INFO [2025-01-03 19:12:47.686] Step 5/9 add1.b
#> INFO [2025-01-03 19:12:47.688] Step 6/9 mult.b
#> INFO [2025-01-03 19:12:47.690] Step 7/9 data.c
#> INFO [2025-01-03 19:12:47.692] Step 8/9 add1.c
#> INFO [2025-01-03 19:12:47.694] Step 9/9 mult.c
#> INFO [2025-01-03 19:12:47.695] Finished execution of steps.
#> INFO [2025-01-03 19:12:47.696] Done.
#> List of 3
#> $ a:List of 2
#> ..$ add1.a: num 2
#> ..$ mult.a: num 2
#> $ b:List of 2
#> ..$ add1.b: num 3
#> ..$ mult.b: num 6
#> $ c:List of 2
#> ..$ add1.c: num 4
#> ..$ mult.c: num 12
# Don't group output by split
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipe_set_data_split(p, dataList, groupBySplit = FALSE)
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data.a [NULL] FALSE data.a New
#> 2: add1.a data.a [NULL] TRUE add1.a Outdated
#> 3: mult.a data.a,add1.a [NULL] TRUE mult.a Outdated
#> 4: data.b [NULL] FALSE data.b New
#> 5: add1.b data.b [NULL] TRUE add1.b Outdated
#> 6: mult.b data.b,add1.b [NULL] TRUE mult.b Outdated
#> 7: data.c [NULL] FALSE data.c New
#> 8: add1.c data.c [NULL] TRUE add1.c Outdated
#> 9: mult.c data.c,add1.c [NULL] TRUE mult.c Outdated
p |> pipe_run() |> pipe_collect_out() |> str()
#> INFO [2025-01-03 19:12:47.723] Start run of 'pipe' pipeline:
#> INFO [2025-01-03 19:12:47.724] Step 1/9 data.a
#> INFO [2025-01-03 19:12:47.726] Step 2/9 add1.a
#> INFO [2025-01-03 19:12:47.728] Step 3/9 mult.a
#> INFO [2025-01-03 19:12:47.730] Step 4/9 data.b
#> INFO [2025-01-03 19:12:47.732] Step 5/9 add1.b
#> INFO [2025-01-03 19:12:47.734] Step 6/9 mult.b
#> INFO [2025-01-03 19:12:47.736] Step 7/9 data.c
#> INFO [2025-01-03 19:12:47.738] Step 8/9 add1.c
#> INFO [2025-01-03 19:12:47.740] Step 9/9 mult.c
#> INFO [2025-01-03 19:12:47.741] Finished execution of steps.
#> INFO [2025-01-03 19:12:47.741] Done.
#> List of 6
#> $ add1.a: num 2
#> $ mult.a: num 2
#> $ add1.b: num 3
#> $ mult.b: num 6
#> $ add1.c: num 4
#> $ mult.c: num 12
# Split up to certain step
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y)
pipe_add(p, "average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data [NULL] FALSE data New
#> 2: add1 data [NULL] FALSE add1 New
#> 3: mult data,add1 [NULL] FALSE mult New
#> 4: average_result mult [NULL] TRUE average_result New
pipe_get_depends(p)[["average_result"]]
#> x
#> "mult"
pipe_set_data_split(p, dataList, toStep = "mult")
p
#> step depends out keepOut group state
#> <char> <list> <list> <lgcl> <char> <char>
#> 1: data.a [NULL] FALSE a New
#> 2: add1.a data.a [NULL] FALSE a Outdated
#> 3: mult.a data.a,add1.a [NULL] FALSE a Outdated
#> 4: data.b [NULL] FALSE b New
#> 5: add1.b data.b [NULL] FALSE b Outdated
#> 6: mult.b data.b,add1.b [NULL] FALSE b Outdated
#> 7: data.c [NULL] FALSE c New
#> 8: add1.c data.c [NULL] FALSE c Outdated
#> 9: mult.c data.c,add1.c [NULL] FALSE c Outdated
#> 10: average_result <list[1]> [NULL] TRUE average_result New
pipe_get_depends(p)[["average_result"]]
#> $x
#> [1] "mult.a" "mult.b" "mult.c"
#>
p |> pipe_run() |> pipe_collect_out() |> str()
#> INFO [2025-01-03 19:12:47.782] Start run of 'pipe' pipeline:
#> INFO [2025-01-03 19:12:47.783] Step 1/10 data.a
#> INFO [2025-01-03 19:12:47.786] Step 2/10 add1.a
#> INFO [2025-01-03 19:12:47.788] Step 3/10 mult.a
#> INFO [2025-01-03 19:12:47.790] Step 4/10 data.b
#> INFO [2025-01-03 19:12:47.792] Step 5/10 add1.b
#> INFO [2025-01-03 19:12:47.794] Step 6/10 mult.b
#> INFO [2025-01-03 19:12:47.796] Step 7/10 data.c
#> INFO [2025-01-03 19:12:47.799] Step 8/10 add1.c
#> INFO [2025-01-03 19:12:47.801] Step 9/10 mult.c
#> INFO [2025-01-03 19:12:47.804] Step 10/10 average_result
#> INFO [2025-01-03 19:12:47.805] Finished execution of steps.
#> INFO [2025-01-03 19:12:47.805] Done.
#> List of 1
#> $ average_result: num 6.67