Skip to contents

Generally, one should keep pipeline steps as simple as possible, basically following the principle “one step, one task”. This means that usually a lot of pipeline steps are used to calculate intermediate results and only a few steps contain the final results that we are interested in. This vignette shows how to conveniently collect and possibly group the output of those final steps.

Flag output steps

Output steps are flagged by settting the keepOut argument to TRUE when adding a step to the pipeline. In the following example, we will want to keep the output of the steps data_summary, model_summary, and model_plot.

library(pipeflow)
library(ggplot2)

pip <- pipe_new(
        "my-pipeline",
        data = airquality
    ) |>

    pipe_add(
        "data_prep",
        function(data = ~data) {
            replace(data, "Temp.Celsius", (data[, "Temp"] - 32) * 5/9)
        }
    ) |>

    pipe_add(
        "data_summary",
        function(
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone"
        ) {
            data[, c(xVar, yVar)]
        },
        keepOut = TRUE              # <- keep this
    ) |>

    pipe_add(
        "model_fit",
        function(
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "model_summary",
        function(
            fit = ~model_fit
        ) {
            summary(fit)
        },
        keepOut = TRUE              # <- keep this
    ) |>

    pipe_add(
        "model_plot",
        function(
            model = ~model_fit,
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone",
            title = "Linear model fit"
        ) {
            coeffs <- coefficients(model)
            ggplot(data) +
                geom_point(aes(.data[[xVar]], .data[["Ozone"]])) +
                geom_abline(intercept = coeffs[1], slope = coeffs[2]) +
                labs(title = title)
        },
        keepOut = TRUE              # <- keep this
    )

Looking at the pipeline, we see that the steps data_summary, model-summary, and model_plot have been flagged accordingly (see column keepOut).

pip
#             step             depends    out keepOut         group  state
#           <char>              <list> <list>  <lgcl>        <char> <char>
# 1:          data                     [NULL]   FALSE          data    New
# 2:     data_prep                data [NULL]   FALSE     data_prep    New
# 3:  data_summary           data_prep [NULL]    TRUE  data_summary    New
# 4:     model_fit           data_prep [NULL]   FALSE     model_fit    New
# 5: model_summary           model_fit [NULL]    TRUE model_summary    New
# 6:    model_plot model_fit,data_prep [NULL]    TRUE    model_plot    New

Graphically, steps flagged with keepOut = TRUE are displayed with a circle shape while “normal” steps are shown as rectangle boxes.

Now let’s run and collect the output of the flagged steps using the collect_out method, which returns a list with the output of the flagged steps.

pip$run()
# INFO  [2024-12-02 21:18:47.923] Start run of 'my-pipeline' pipeline:
# INFO  [2024-12-02 21:18:47.951] Step 1/6 data
# INFO  [2024-12-02 21:18:47.960] Step 2/6 data_prep
# INFO  [2024-12-02 21:18:47.983] Step 3/6 data_summary
# INFO  [2024-12-02 21:18:47.985] Step 4/6 model_fit
# INFO  [2024-12-02 21:18:47.989] Step 5/6 model_summary
# INFO  [2024-12-02 21:18:47.997] Step 6/6 model_plot
# INFO  [2024-12-02 21:18:48.008] Finished execution of steps.
# INFO  [2024-12-02 21:18:48.009] Done.

out <- pip$collect_out()

names(out)
# [1] "data_summary"  "model_summary" "model_plot"

As expected, the output list contains the output of the flagged steps.

str(out, max.level = 1)
# List of 3
#  $ data_summary :'data.frame':    153 obs. of  2 variables:
#  $ model_summary:List of 12
#   ..- attr(*, "class")= chr "summary.lm"
#  $ model_plot   :List of 11
#   ..- attr(*, "class")= chr [1:2] "gg" "ggplot"

Grouping output steps

Often certain output steps are related and should be grouped together. This can be achieved conveniently by setting the group argument when adding a step to the pipeline. Let’s illustrate this by slightly modifying the previous example.

pip <- Pipeline$new("my-pipeline", data = airquality) |>

    pipe_add(
        "data_prep",
        function(data = ~data) {
            replace(data, "Temp.Celsius", (data[, "Temp"] - 32) * 5/9)
        }
    ) |>

    pipe_add(
        "used_data",
        function(
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone"
        ) {
            data[, c(xVar, yVar)]
        },
        keepOut = TRUE,
        group = "Data"                 # <- define 'Data' group here
    ) |>

    pipe_add(
        "model_fit",
        function(
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "model_summary",
        function(
            fit = ~model_fit
        ) {
            summary(fit)
        },
        keepOut = TRUE,
        group = "Model"                # <- define 'Model' group here
    ) |>

    pipe_add(
        "model_plot",
        function(
            model = ~model_fit,
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone",
            title = "Linear model fit"
        ) {
            coeffs <- coefficients(model)
            ggplot(data) +
                geom_point(aes(.data[[xVar]], .data[["Ozone"]])) +
                geom_abline(intercept = coeffs[1], slope = coeffs[2]) +
                labs(title = title)
        },
        keepOut = TRUE,
        group = "Model"                # <- define 'Model' group here
    )

Looking at the pipeline, the defined groups are shown in the group column.

pip
#             step             depends    out keepOut     group  state
#           <char>              <list> <list>  <lgcl>    <char> <char>
# 1:          data                     [NULL]   FALSE      data    New
# 2:     data_prep                data [NULL]   FALSE data_prep    New
# 3:     used_data           data_prep [NULL]    TRUE      Data    New
# 4:     model_fit           data_prep [NULL]   FALSE model_fit    New
# 5: model_summary           model_fit [NULL]    TRUE     Model    New
# 6:    model_plot model_fit,data_prep [NULL]    TRUE     Model    New

As you see, by default, the group is identical to the step name, that is, each step represents the trivial case of a one-sized group. Again, we run the pipeline and collect the output.

pip$run()
# INFO  [2024-12-02 21:18:48.242] Start run of 'my-pipeline' pipeline:
# INFO  [2024-12-02 21:18:48.243] Step 1/6 data
# INFO  [2024-12-02 21:18:48.246] Step 2/6 data_prep
# INFO  [2024-12-02 21:18:48.248] Step 3/6 used_data
# INFO  [2024-12-02 21:18:48.250] Step 4/6 model_fit
# INFO  [2024-12-02 21:18:48.274] Step 5/6 model_summary
# INFO  [2024-12-02 21:18:48.276] Step 6/6 model_plot
# INFO  [2024-12-02 21:18:48.282] Finished execution of steps.
# INFO  [2024-12-02 21:18:48.282] Done.

out <- pip$collect_out()

names(out)
# [1] "Data"  "Model"

As we can see, the output related to the modelling has been grouped into one sublist named Model.

str(out, max.level = 2)
# List of 2
#  $ Data :'data.frame':    153 obs. of  2 variables:
#   ..$ Temp.Celsius: num [1:153] 19.4 22.2 23.3 16.7 13.3 ...
#   ..$ Ozone       : int [1:153] 41 36 12 18 NA 28 23 19 8 NA ...
#  $ Model:List of 2
#   ..$ model_summary:List of 12
#   .. ..- attr(*, "class")= chr "summary.lm"
#   ..$ model_plot   :List of 11
#   .. ..- attr(*, "class")= chr [1:2] "gg" "ggplot"