Drake: Dynamic branching

Created on 18 Jan 2019  路  61Comments  路  Source: ropensci/drake

We want to declare targets and modify the dependency graph while make() is running. Sometimes, we do not know what the targets should be until we see the values of previous targets. The following plan sketches the idea.

library(dplyr)
library(drake)
drake_plan(
  summaries = mtcars %>%
    group_by(cyl) %>%
    summarize(mean_mpg = mean(mpg)),
  individual_summary = target(
    filter(summaries, cyl == cyl_value),
    transform = cross(cyl_value = summaries$cyl)
  )
)

Issues:

  1. How will outdated() work now? Do we have to read the targets back into memory to check if the downstream stuff is up to date?
  2. This is the biggest implementation challenge drake has faced. Hopefully the work will migrate to the workers package.
advanced priority api

Most helpful comment

A more expedient approach

After talking with @dgkf at SDSS last week, I am no longer as reluctant as in https://github.com/ropensci/drake/issues/685#issuecomment-493838200. We can avoid a mess if we give dynamic branching its own DSL that works in tandem with the existing transformation DSL. This new dynamic DSL is just the transformation DSL invoked at runtime.

Proposal

library(drake)
plan <- drake_plan(
    vector_of_settings = target(
        f(x),
        transform = map(x = c(1, 2))
    ),
    analysis = target(
        g(x, y),
        transform = map(x),
        dynamic = map(y = vector_of_settings)
    )
)

print(plan)
#> # A tibble: 4 x 3
#>   target               command dynamic   
#>   <chr>                <expr>  <list>    
#> 1 vector_of_settings_1 f(1)    <lgl [1]> 
#> 2 vector_of_settings_2 f(2)    <lgl [1]> 
#> 3 analysis_1           g(1, y) <language>
#> 4 analysis_2           g(2, y) <language>

print(plan$dynamic)
#> [[1]]
#> [1] NA
#> 
#> [[2]]
#> [1] NA
#> 
#> [[3]]
#> map(y = vector_of_settings_1)
#> 
#> [[4]]
#> map(y = vector_of_settings_2)

drake_plan_source(plan)
#> drake_plan(
#>   vector_of_settings_1 = f(1),
#>   vector_of_settings_2 = f(2),
#>   analysis_1 = target(
#>     command = g(1, y),
#>     dynamic = map(y = vector_of_settings_1)
#>   ),
#>   analysis_2 = target(
#>     command = g(2, y),
#>     dynamic = map(y = vector_of_settings_2)
#>   )
#> )

Created on 2019-06-03 by the reprex package (v0.3.0)

When we create new targets, we probably do not need to register them in config$layout or the priority queue. Suppose make() is running, it just built/checked vector_of_settings_1, and we are about to build analysis_1. Suppose vector_of_settings_1 evaluated to c("a", "b"). Then, we could

  1. Create new "target" names analysis_1_a and analysis_1_b.
  2. Create new commands for each.
  3. Submit those new commands to the scheduler as consecutive jobs, skipping outdated targets.

    • Need to check analysis_1_a and analysis_1_b individually, based on previously-cached metadata.

  4. Store a value and metadata list for each target.
  5. Store a value for target analysis_1 that

    1. Is human-readable, and

    2. Reacts to changes in analysis_1_a and analysis_1_b.

cache <- drake_cache() # successor of get_cache()
cache$get_hash("analysis_1_a")
#> [1] "1d5108bacae437a0"
cache$get_hash("analysis_1_b")
#> [1] "17b1fbe1609400b9"
readd(analysis_1)
#> target             hash
#> 1 analysis_1_a 1d5108bacae437a0
#> 2 analysis_1_b 17b1fbe1609400b9

Remarks

  • Can anyone think of a better name than dynamic for this new field?
  • We need to detect dependencies in the dynamic column (or whatever we name it) just like command and triggers.
  • dynamic should print as an <expr> column in the tibble just like command.

Thanks

This idea, along with the original DSL, were inspired by @krlmlr in #233

304.

All 61 comments

One implementation idea that seems as though it could be "simple" to implement (says the guy who hasn't implemented anything in drake yet): Could there be a "plan in a plan" concept?

My thought is:

  • Generate outer_plan and inner_plan
  • Make a step in outer_plan that is: if (selection_criterion) make(inner_plan)
  • From an outdated() standpoint, you could add detection logic for "the inner_plan never needed to run", and as long as selection_criterion is up to date, it could have a status "not required to run". Or even simpler, if selection_criterion and inner_plan don't change, then I think outdated would show up correctly for outer_plan without modification-- the steps of inner_plan would just never show up.

A similar idea was proposed in #304. I would actually prefer to avoid nested plans because of the complexity and pre-planning they would require.

Fair enough. If I think of something else, I'll post it.

I want to point out the way Snakemake currently handles this, as a possible inspiration:

  • targets which will trigger a rebuild of the plan are declared differently (in Snakemake, as "checkpoint" rather than "rule")
  • dependencies which are conditional on the output of a checkpoint are marked by a call to "checkpoints..get()"

The second part is quite idiosyncratic to Python, so I wouldn't suggest it be implemented in the same way, but it seems easier to make the user explicitly mark the cases where dynamic branching needs to happen, than to try to detect it from the structure of their dependencies.

Using your example:

library(dplyr)
library(drake)
drake_plan(
  summaries = mtcars %>%
    group_by(cyl) %>%
    summarize(mean_mpg = mean(mpg)),
  individual_summary = target(
    filter(summaries, cyl == cyl_value),
    transform = cross(cyl_value = dynamic(summaries$cyl))
  )
)

One clear difference is that, using an R-based framework rather than a file-based framework, the output of summaries is still only one object. In a file-based framework, it might have been an unknown number of files.

As I wrote that example, I realized that it's actually much more similar to how Snakemake used to do it. In the end, they decided against that way, so maybe it would be good to know why and learn from that. Perhaps it's not relevant in drake's framework though.

Discussion from #233 carries over to this thread.

Users really want this flexibility, and often just assume drake already supports it, but I am beginning to question this dream scenario. If we try to implement dynamic branching deeply in drake's internals, we would need to rip half the package apart and double the complexity. Even if we offload scheduling to workers, we would still be in trouble. We would need to update config$graph, config$queue (the priority queue), and config$layout all mid-make(). drake is simply not designed for this.

The more I think about it, the more wisdom I see in @krlmlr's thinking behind #304. Possible compromise: a new split() transformation. @kendonB's #833 certainly seems to address this use case, not to mention #77.

Update: we now have split() in the dev version: https://ropenscilabs.github.io/drake-manual/plans.html#split. Should cover many use cases that would have otherwise required dynamic branching.

A more expedient approach

After talking with @dgkf at SDSS last week, I am no longer as reluctant as in https://github.com/ropensci/drake/issues/685#issuecomment-493838200. We can avoid a mess if we give dynamic branching its own DSL that works in tandem with the existing transformation DSL. This new dynamic DSL is just the transformation DSL invoked at runtime.

Proposal

library(drake)
plan <- drake_plan(
    vector_of_settings = target(
        f(x),
        transform = map(x = c(1, 2))
    ),
    analysis = target(
        g(x, y),
        transform = map(x),
        dynamic = map(y = vector_of_settings)
    )
)

print(plan)
#> # A tibble: 4 x 3
#>   target               command dynamic   
#>   <chr>                <expr>  <list>    
#> 1 vector_of_settings_1 f(1)    <lgl [1]> 
#> 2 vector_of_settings_2 f(2)    <lgl [1]> 
#> 3 analysis_1           g(1, y) <language>
#> 4 analysis_2           g(2, y) <language>

print(plan$dynamic)
#> [[1]]
#> [1] NA
#> 
#> [[2]]
#> [1] NA
#> 
#> [[3]]
#> map(y = vector_of_settings_1)
#> 
#> [[4]]
#> map(y = vector_of_settings_2)

drake_plan_source(plan)
#> drake_plan(
#>   vector_of_settings_1 = f(1),
#>   vector_of_settings_2 = f(2),
#>   analysis_1 = target(
#>     command = g(1, y),
#>     dynamic = map(y = vector_of_settings_1)
#>   ),
#>   analysis_2 = target(
#>     command = g(2, y),
#>     dynamic = map(y = vector_of_settings_2)
#>   )
#> )

Created on 2019-06-03 by the reprex package (v0.3.0)

When we create new targets, we probably do not need to register them in config$layout or the priority queue. Suppose make() is running, it just built/checked vector_of_settings_1, and we are about to build analysis_1. Suppose vector_of_settings_1 evaluated to c("a", "b"). Then, we could

  1. Create new "target" names analysis_1_a and analysis_1_b.
  2. Create new commands for each.
  3. Submit those new commands to the scheduler as consecutive jobs, skipping outdated targets.

    • Need to check analysis_1_a and analysis_1_b individually, based on previously-cached metadata.

  4. Store a value and metadata list for each target.
  5. Store a value for target analysis_1 that

    1. Is human-readable, and

    2. Reacts to changes in analysis_1_a and analysis_1_b.

cache <- drake_cache() # successor of get_cache()
cache$get_hash("analysis_1_a")
#> [1] "1d5108bacae437a0"
cache$get_hash("analysis_1_b")
#> [1] "17b1fbe1609400b9"
readd(analysis_1)
#> target             hash
#> 1 analysis_1_a 1d5108bacae437a0
#> 2 analysis_1_b 17b1fbe1609400b9

Remarks

  • Can anyone think of a better name than dynamic for this new field?
  • We need to detect dependencies in the dynamic column (or whatever we name it) just like command and triggers.
  • dynamic should print as an <expr> column in the tibble just like command.

Thanks

This idea, along with the original DSL, were inspired by @krlmlr in #233

304.

Hmm... what about targets downstream of analysis_1_a and analysis_1_b?

Easy, actually: just give a special attribute (maybe an S3 class) to the analysis_1 value (the data frame of hashes). That way, when we load analysis_1 as a dependency of a dynamic transformation (say, dynamic = map(analysis_1)) we will know to map over analysis_1_a and analysis_1_b instead.

We also need to think about how the new target names and splits are constructed. If vector_of_settings_1 is a list of large objects, we need to make up sensible names. Hashes? could be slow. We should also make an effort to handle group_by() data frames.

Come to think of it, we probably need a trace (drake_plan(trace = TRUE)) in those special data frames so that combine(.by) still works.

We also need to think about how the new target names and splits are constructed. If vector_of_settings_1 is a list of large objects, we need to make up sensible names. Hashes? could be slow. We should also make an effort to handle group_by() data frames.

The useful cases seem to be:

  • For a grouped data frame or indexed data table, use the grouping variables/index columns.
  • For a named vector/list, use the names, unique-ifying as needed.
  • For a non-named vector of character/integer/logical/factor values, use the values as names. Maybe also for numeric? complex? raw?
  • For other objects, default integer indices.

If users want the hash behavior, then they can use (and perhaps drake can provide?) a function which names a vector/list according to the hashes of its elements. e.g.

name_by_hash <- function(x, ...) {
  n <- vapply(x, digest::digest, "", ...)
  names(x) <- n
  x
}

Alternatively, always default to integer indices, and if the user wants something smarter, they can specify it with .id = (and allow tidy evaluation, so that something like .id = !!vapply(vector_of_settings, digest::digest, "") would work.

Hi, I just wanted to describe another use case that would greatly benefit from dynamic branching. In my case, I have a very large data frame somewhat like this:

> d <- tibble(year = rep(2010:2015, each = 5), x = 1:30)
# A tibble: 30 x 2
    year     x
   <int> <int>
 1  2010     1
 2  2010     2
 3  2010     3
 4  2010     4
 5  2010     5
 6  2011     6
 7  2011     7
 8  2011     8
 9  2011     9
10  2011    10
# ... with 20 more rows

I'd like to be able to split d by year, and then create a new column f(x) := x + 100 in those batches (where f is actually an expensive function). Since I'm not often adding new rows to the data frame for very old years, I want to benefit from cached results for the 2010, 2011, ... splits and only recalculate for 2019, which is the only split that has changed/new data.

It would be a game-changer to be able to use drake like this, since more often than not I can think of a splitting scheme that would effectively partition the data into stale and up-to-date splits.

@dpmccabe, I see what you mean. I just encountered a very similar situation for a project at work.

I am realizing that https://github.com/ropensci/drake/issues/685#issuecomment-498382301 has serious problems:

  1. It would be extremely difficult to make the current DSL behave exactly the same for dynamic branching, and the inevitable subtle differences would add a lot of confusion.
  2. To work around (1), we would need to make an entire new DSL just for dynamic branching, and that adds a lot of additional complexity for users.
  3. Either way, https://github.com/ropensci/drake/issues/685#issuecomment-498382301 could add a lot of complexity to the code base.

An alternative is @brendanf's suggestion of checkpointing (https://github.com/ropensci/drake/issues/685#issuecomment-466927091). For drake, this essentially means turning a plan into a bunch of subplans. Take this plan as an example:

drake_plan(
  vector_of_settings_1 = f(1),
  vector_of_settings_2 = f(2),
  analysis_1 = target(
    command = g(1, y),
    transform = map(y = vector_of_settings_1)
  ),
  analysis_2 = target(
    command = g(2, y),
    transform = map(y = vector_of_settings_2)
  )
)

It is already natural for users to think about it as two separate plans:

drake_plan(
  vector_of_settings_1 = f(1),
  vector_of_settings_2 = f(2)
)

drake_plan(
  analysis_1 = target(
    command = g(1, y),
    transform = map(y = vector_of_settings_1)
  ),
  analysis_2 = target(
    command = g(2, y),
    transform = map(y = vector_of_settings_2)
  )
)

Maybe make() could do something similar: split a monolithic plan into chunks as appropriate, and then transform/make() those chunks in topological order.

...but then r_make() would might have issues, depending on the implementation.

But I still think it is worth trying https://github.com/ropensci/drake/issues/685#issuecomment-498382301 in a branch. That approach to dynamic branching might allow drake to scale better with large numbers of targets.

Changing my mind back

I no longer believe https://github.com/ropensci/drake/issues/685#issuecomment-498382301 needs to complicate the internals or slow us down. With careful refactoring, we can

  1. Drop config$layout.
  2. Set transform = FALSE as default in drake_plan().
  3. Transform commands and analyze them for dependencies as make() is running.

Benefits

  • Config config$layout is a "God object". It is an awkwardly large, deeply nested list that takes up a ton of memory. The layout stores all the dependency information of all the targets and commands in advance. For dynamic branching, we will only need little bits of it at a time, and only at the moment we are about to build a target. If we just create those little bits as needed, I expect memory usage to lessen, and the code should be simpler.
  • We can even use this approach above to more clearly separate the static analysis of imported functions vs commands for targets.
  • Yes, all this requires analyzing target commands twice. But since untransformed plans are tiny, the net performance hit is negligible, and we will actually shave off a ton of preprocessing overhead.

Drawbacks

  • Dynamic branching creates atomicity. To see this, let's take a collection A = (a_1, ,,, a_n) of targets and another collection B = (b_1, ..., b_n), where b_i depends on a_i (and only a_i) for i = 1, ..., n. If A and B are dynamic, then we cannot start any b_j (j = 1, ..., n) until we finish a_i for all i. For the current non-dynamic branching, we can start each b_i as soon as a_i is finished.

A remark on implementation

Yes, I know this issue has been on the table for a year and a half and so far I have not implemented a thing. But now that we have a path to simplify drake instead of complicating it, I am more likely to get started. But first I should finish #820, which is time-consuming in its own right.

Are you still talking about

split a monolithic plan into chunks as appropriate, and then transform/make() those chunks in topological order.

or are you now thinking of transforming each rule individually? If there is still a notion of "chunks", where the division between the chunks are the "checkpoints", then we could have something like:

drake_plan(
  clusters = checkpoint(
    make_clusters() # returns a list
  ),
  A = target(
    do_A(x),
    transform = map(x = clusters)
  ),
  B = target(
    do_B(A),
    transform = map(A)
  )
)

All A and B targets would obviously have to wait for clusters before being transformed and running. But once the checkpoint is passed, then all transformations up to the next checkpoint (in this case there is none) could be applied at the same time. b_2 would be able to start as soon as a_2 is finished, regardless if a_1 is still running.

An alternate syntax would be:

drake_plan(
  clusters = target(
    make_clusters(), # returns a list
    unpack = TRUE # tell drake that we want targets clusters_1, clusters_2, clusters_3...
  ),
  A = target(
    do_A(clusters),
    transform = map(clusters)
  ),
  B = target(
    do_B(A),
    transform = map(A)
  )
)

I was thinking about the non-checkpoint route because we avoid creating extra API functionality.

drake_plan(
  data = make_clusters()
  clusters = target(
    data,
    transform = split(slices = length(data))
  ),
  A = target(
    do_A(x),
    transform = map(x = clusters)
  ),
  B = target(
    do_B(A),
    transform = map(A)
  )
)

And I think that's what we should go with first. But I do like your checkpoint() API idea because it avoids duplicating the data.

Also, I keep forgetting about the inevitable different behavior of the current DSL vs dynamic branching. In the first rollout at least, I think we really do need target(..., dynamic = ...), and we should caution users to not put transform downstream of dynamic.

drake_plan(
  data = make_clusters()
  clusters = target(
    data,
    dynamic = split(slices = length(data))
  ),
  A = target(
    do_A(x),
    dynamic = map(x = clusters)
  ),
  B = target(
    do_B(A),
    dynamic = map(A)
  )
)

Back to https://github.com/ropensci/drake/issues/685#issuecomment-511386400: maybe split() should just behave differently for dynamic branching. Maybe it should take no arguments and make a decision based on what the return value looks like. If clusters is a list, the following could save each element as a target. Vectors, and anything with a length(), could behave the same way.

drake_plan(
  clusters = target(
    get_list_of_datasets(),
    dynamic = split()
  ),
  A = target(
    do_A(x),
    dynamic = map(x = clusters)
  ),
  B = target(
    do_B(A),
    dynamic = map(A)
  )
)

In the case of data frames, we could listen to groupings specified with dplyr::group_by().

drake_plan(
  clusters = target(
    get_data_frame() %>%
      group_by(var1, var2),
    dynamic = split()
  ),
  A = target(
    do_A(x),
    dynamic = map(x = clusters)
  ),
  B = target(
    do_B(A),
    dynamic = map(A)
  )
)

Glad I took care of #820 before implementation. Should also do #822.

For the second time, I am questioning the need for a new "dynamic" argument of target(). Currently, we ignore grouping variables we do not already have.

library(drake)
drake_plan(
  datasets = get_datasets(),
  analysis = target(
    analyze_one_dataset(datasets),
    transform = map(datasets)
  )
)
#> Warning: A grouping or splitting variable for target 'analysis' is missing
#> or undefined. Transformation skipped and target deleted.
#> # A tibble: 1 x 2
#>   target   command     
#>   <chr>    <expr>      
#> 1 datasets get_datasets

Created on 2019-08-12 by the reprex package (v0.3.0)

Instead, transform_plan() could append a new "dynamic" column of parsed "transform" objects, which we could finish interpreting during make(). Those transform objects could have grouping variables to evaluate at runtime.

Otherwise, I like https://github.com/ropensci/drake/issues/685#issuecomment-498382301. A few more remarks:

  • When it comes to dynamic branching, we should distinguish between targets literally written into the plan (e.g. analysis_1) and the results of dynamic branching (e.g. analysis_1_a and analysis_1_b). For now, I will call the former untransformed dynamic targets and the latter transformed dynamic targets.
  • Transformed dynamic targets like should be treated as normally as possible. Things to keep in mind:

    • Dynamic dependencies should factor into the dependency hash. These should be different (and, in fact, disjoint) for analysis_1_a vs analysis_1_b.

    • To save time, we should avoid transforming commands. Instead, we can simply control the evaluation environment to make sure dynamic dependencies get plugged in properly.

  • Untransformed dynamic targets should be saved data frames (like in https://github.com/ropensci/drake/issues/685#issuecomment-498382301) with a special "drake_dynamic" S3 class.
  • All targets downstream of dynamic targets need to handle dynamic dependencies with transforms. We can enforce this by checking the "dynamic" column of the plan against the S3 classes of in-memory dependencies (i.e. whether they have the "drake_dynamic" S3 class).

Prep work:

  • [ ] Save the length() and dim() of each target in the metadata. That way, we can plan for dynamic targets without loading them again into memory.
  • [ ] Create a new dynamic column when we find targets mentioned in transformations.

Do you want to store vctrs::vec_size() instead of the length? That might work better for data frames and POSIXlt objects, for instance.

Hmm... maybe emulate it so we can avoid depending on vctrs? What are the relevant cases to consider? It might be nice to use dim()[1] for array-like objects and length() for everything else. Are there situations where vctrs::vec_size() is safer? When does it handle POSIXlt objects differently?

x <- as.POSIXlt(Sys.time(), "EST5EDT")  
y <- c(x, x)
library(vctrs)
vec_size(x)
#> [1] 1
length(x)
#> [1] 1
vec_size(y)
#> [1] 2
length(y)
#> [1] 2

Created on 2019-09-14 by the reprex package (v0.3.0)

{vctrs} conceptually treats data frames as a vector of rows, which I think would be idiomatic here. It's becoming a "free dependency" just like {rlang}, and imports only {rlang} which we already have.

As a stop-gap we can use NROW() which matches the behavior of vec_size() in many cases.

Indeed, I now see in the help file:

vec_size() is equivalent to NROW() but has a name that is easier to pronounce, and throws an error when passed non-vector inputs.

I like the child-proofing.

library(R6)
library(vctrs)
Accumulator <- R6Class("Accumulator", list(
  sum = 0,
  add = function(x = 1) {
    self$sum <- self$sum + x 
    invisible(self)
  })
)
x <- Accumulator$new()
NROW(x)
#> [1] 4
vec_size(x)
#> `x` must be a vector, not a `Accumulator/R6` object

Created on 2019-09-14 by the reprex package (v0.3.0)

I am starting to question https://github.com/ropensci/drake/issues/685#issuecomment-498382301 for different reasons:

  1. Step 5 requires us to define an extra target for bookkeeping. If analysis_1_a and analysis_1_b are dynamic targets, the proposal also requires us to define a dummy analysis_1 target, which is a data frame of hashes. That's messy.
  2. For HPC, the proposal makes it hard to do anything other than staged parallelism. If we're going to implement something as difficult as dynamic branching, I would like to do better.

We should declare dynamic targets that can exist autonomously, and we should push them to the priority queue.

Another challenge is to make a dynamic target depend on part of a dependency, not the whole thing. Example:

library(drake)
plan <- drake_plan(
    y = target(
        f(x),
        transform = map(x = c(1, 2))
    ),
    z = target(
        g(y),
        transform = map(y)
    )
)

z_1 depends on y[[1]], z_2 depends on y[[2]], and neither z_1 nor z_2 depends on all of y. However, I think we need the entirety of y in memory in order to check if z_1 and z_2 are up to date, and I think we need to check all the z_*'s at the same time. Then, we can push the outdated z_*'s to the priority queue and tell drake to just build them without checking them a second time.

Re https://github.com/ropensci/drake/issues/685#issuecomment-534540987, maybe we do need a bookkeeper target (like analysis_1) in order to implement combine(). However, we should be able to do map(), split(), and cross() (and eventually group_map()) without this dummy target.

Also, I think we should store the dummy target in a different storr namespace to separate it from the true targets.

Re #685 (comment), maybe we do need a bookkeeper target

And not just for dynamic targets, but also for dynamic dependencies as well: the things that get split up to make dynamic targets.

I reflected on dynamic branching (and #1027 and #1028) and came up with a clearer plan.

Goals of dynamic branching

  1. Define new targets while make() is running.
  2. Enable 惟(1) processing of large collections of up-to-date targets. I think we can make outdated() nearly instantaneous in many common use cases.
  3. Reduce the number of files in the cache for large workflows.

Sketch

drake_plan(
  w = seq_len(1e6),
  x = target(f(w), transform = map(w)),
  y = target(x, transform = map(x)),
  z = target(y, transform = combine(y))
)
  • Define x_1, ..., x_1000000 when it comes time to build x. y and z are similar.
  • Create only 4 metadata files: one for each of w, x, y, and z. This way, outdated() will be practically instantaneous when everything is up to date (#1027).
  • Create separate data files for x_1, ..., x_1000000, etc. (Each could be large.)

Definitions

  • Static target: a target built without any dynamic branching. Example: w.
  • Dynamic target: a row in drake_plan() with dynamic branching selected. Examples: x, y, and z.
  • Dynamic sub-target: a single branch of a dynamic target defined and built during make(). Examples: x_1, ..., x_1000000 and y_1, ..., y_1000000.
  • Dynamic dependency: a static or dynamic target on which a dynamic target depends. Examples: w, x, and y.
  • Dynamic sub-dependency: An element or dynamic sub-target of a dynamic dependency. Examples: w[1], ..., w[1e6], x_1, ..., x_1000000, and y_1, ..., y_1000000.

Note: a static target can be a dynamic dependency of a dynamic target downstream.

Target status

When we check whether a dynamic target is up to date, sometimes we can just check the target as a whole (fast). Other times, we need to look at its dynamic sub-targets too (slow). In the flowchart below, the yellow/orange steps should be time-savers and the blue ones should be time-consuming.

chart

outdated() should always assume the worst when it comes to (3). That is what will make so fast for up-to-date targets.

Building dynamic targets

When it comes time to build x:

  1. Declare the x_i's using the NROW() or vec_size() available in the metadata of the dynamic dependencies.
  2. Insert the x_i's as direct dependencies of x in the graph. Each x_i is a leaf node, with no dependencies of its own. We need them to be in the graph so we can decrease the key of x in the priority queue as the x_i's finish.
  3. Insert the x_i's at the head of the priority queue.

    • Give each x_i an ndeps key of 0.

    • Increase the ndeps key of x from 0 to the number of x_i's.

    • Remember to call $sort() afterwards because we're not using conventional priority queue algorithms. (It's never the bottleneck anyway.)

  4. Continue building targets as usual. Because of the way we updated the priority queue, we should reach all the x_i's before x itself. In the meantime, completely different targets can begin while the x_i's are in progress (if there are workers available).
  5. When we reach an x_i, store the target, but not the metadata.
  6. We should reach x again only after all the x_i's are checked and built. When that happens, the value of x should be a composite of the names and hashes of the x_i's. Store it as a regular target, metadata and all.

(2) and (3) become much simpler if there is no parallel computing. All we need to do is augment the character vector over which we are looping.

API and prep

The API is the same as the current DSL. However, we need to process things differently. Let's go with the dynamic column from https://github.com/ropensci/drake/issues/685#issuecomment-498382301. We should also start saving NROW() or vec_size() for all targets, not just the ones that are likely to be dynamic dependencies.

Aftermath and remarks

  • Several API functions need to adapt. For example, loadd(x) should probably load all the x_i's. We might also want to mark dynamic targets with their own shape vis_drake_graph(). For the sake of performance, it is probably best not to show dynamic sub-targets. And without metadata, we won't have build times on dynamic sub-targets, which could throw off predict_runtime() etc.
  • I have only wrapped my head around dynamic map()'s, and I will implement that piece first. combine(.by = ...) could be tricky. Let's cross that bridge when we get there.
  • Can we take this opportunity to load parts of a target instead of the entire thing? Maybe. With NROW()/vec_size() metadata, we can certainly define dynamic sub-targets without loading anything big. And we can load dynamic sub-targets as individual dependencies (e.g. for the y_i's and z_j's). But we will still need to load static targets all at once.

For predict_runtime() and predict_workers(), I think it would be reasonable to take the overall recorded time of x and divide it evenly into into predicted runtimes for x_1, ..., x_1000000.

Unfortunately, I think we need separate a new dynamic argument to target(). Otherwise, the static transforms tend to turn targets into grouping variables when they should really be part of dynamic branching.

Last week, I wrote enough of the API and documentation to get started. Implementation roadmap.

Prep

  • [x] Allow imports to be dynamic dependencies. (Affects static code analysis; easy.)
  • [x] Add a flag in the metadata to tell whether a target is dynamic or static (easy).
  • [x] Keep track of target NROW()s in a hash table.
  • [x] Compute the names and indices of sub-targets. Requires handling each transform individually (map vs cross vs combine).

Primary action items

  • [x] Register dynamic sub-targets for local computing.

    • [x] layout

    • [x] graph

  • [x] Load dependencies of dynamic targets into memory while respecting existing memory strategies.

    • [x] map()

    • [x] cross()

    • [x] combine()

  • [x] Build targets in loop parallelism parallelism = "loop".

    • [x] map()

    • [x] cross()

    • [x] combine()

  • [x] Build dynamic sub-targets, substituting in the correct sub-dependencies as appropriate. For the sake of speed, we should use environments for this, not metaprogramming or symbol substitution. We will need to handle dynamic dependencies differently depending on the type: we may have a list, an array-like object, or a dynamic target (tricky).
  • [x] Store the sub-targets and the overarching dynamic target as appropriate.
  • [x] Use dependency hashes instead of numeric indices for sub-target names. This may even make it easier to check if a sub-target is up to date. We should continually remove the keys of sub-targets we no longer use. Otherwise, we will end up with too many keys and no way to garbage-collect the data.
  • [x] Forbid dynamic grouping variables in the condition and change triggers.
  • [x] Allow modification of some key config elements by reference.

    • [x] The layout. Simple enough to store it as an environment instead of a list.

    • [x] The graph. Right here, we can put the new subgraph in a new orphan environment instead of config$graph. Of course then we need to update everything downstream in drake's internals. :mountain:

  • [x] Clean up and simplify R/backend_loop.R. Use local_build() if possible.
  • [x] Basic triggers for dynamic parents and sub-targets. See the flowchart in https://github.com/ropensci/drake/issues/685#issuecomment-541460119. For dynamic targets, be sure to check the subtargets. Do they all exist?
  • [x] Make drake_meta_() faster for sub-targets.
  • [x] Test dynamic triggers on each transform. Be sure to include dynamic targets downstream. After each make(), verify that stuff stays up to date when left unchanged. Test ideas:

    • [x] Leave everything up to date and nothing should rebuild.

    • [x] Change a static dependency and watch everything rebuild.

    • [x] Change parts of a dynamic dependency and watch some stuff rebuild.

    • [x] Change both static and dynamic dependencies and watch everything rebuild.

    • [x] Insert an element in the middle of a dynamic grouping variable. Watch singletons update downstream.

  • [x] Special tests for combine():

    • [x] Change .by to a completely different vector with the same groupings as before. Watch everything rebuild.

    • [x] Change .by to a different vector with a different length, with some groups the same. Watch some stuff rebuild.

    • [x] Change the vector .by, but leave all the groupings alone. None of the sub-targets should rebuild.

  • [x] Ensure that invalidating a sub-target invalidates the parent.
  • [x] Data recovery. Handle each dynamic parent target as a whole. Get the old metadata, verify the existence of the old value, the old sub-targets, and the old sub-target metadata, and restore things if possible.
  • [x] Build dynamic targets on HPC.

    • [x] Register sub-targets with the priority queue.

    • [x] Plug into future backend.

    • [x] Quick test with future.

    • [x] Plug into clustermq backend.

    • [x] Quick test with clustermq.

    • [x] Connect as many dynamic branching tests as possible to the hpc testing infrastructure.

    • [x] Test that sub-targets actually run on HPC.

Wrap-up

  • [x] Ensure specialized data formats apply to sub-targets but not to parent dynamic targets.
  • [x] Insert examples in the help files (make(), drake_plan(), target(), and R/package.R.)
  • [x] Indicate dynamic targets with a special shape in drake_graph_info(). Make sure it displays properly in vis_drake_graph(), sankey_drake_graph(), and drake_ggraph().
  • [x] Implement id_chr() for both static targets and dynamic sub-targets. Analogous to .id_chr in static branching.
  • [x] Update predict_runtime() and predict_workers().
  • [x] Test dynamic sub-targets and random numbers:

    • [x] Different seeds for different sub-targets

    • [x] 1:1 correspondence between the parent target seed and random sub-target values.

  • [x] loadd() and readd():

    • [x] Make them load lists of subtarget values of dynamic targets.

    • [x] Add an index vector argument to select sub-targets.

    • [x] Load the hash vector in drake_build() instead of loadd(deps = TRUE).

  • [ ] Benchmark static vs dynamic branching in this large workflow.

    • [ ] Initial drake_config().

    • [ ] Initial make().

    • [ ] make() with everything already up to date.

For splitting, we probably need separate split_list() and split_array() transforms.

Scratch that.

For splitting, we can probably make use of a .by argument to make it like group_map(). For both split() and combine(), .by should be a single object (import or target) and it alone determines the number of dynamic sub-targets. Simple and sensible.

Also, split() and combine() should work on one non-.by symbol at a time. Only map() and cross() can handle multiple symbols. So the signatures should look like this:

map(...)
cross(...)
split(target, .by)
combine(target, .by)

Utility functions:

  • read_subtarget(target, index): get a dynamic subtarget with a given index.
  • get_by(...): take multiple objects and make an index vector for .by. Should be used in commands for an auxiliary targets.

Another simplifying assumption for all transforms: for vector-like and list-like objects, map/reduce over the length. For array-like objects, map/reduce over the first dimension (e.g. rows for data frames).

A couple notes:

  1. For the non-HPC case, let's do the simple thing and spawn dynamic targets inside something like local_build(). We may even be able to leverage that technique for the HPC backends.
  2. Data recovery for dynamic sub-targets is going to require some work. Fortunately, a dynamic target is just a vector of hashes, and we can just match up those hashes to the sub-target names.

As I attempt an implementation, I am finding that because I am trying to avoid saving metadata lists, I have to reinvent a lot of internal machinery. Maybe it's better to save that metadata for dynamic sub-targets. The internal overhaul may not be as catastrophic, and we still gain efficiency because we do not need to actually check the metadata as often.

Yeah, we will need metadata for things like seeds and warnings. But we will still see performance gains in other ways.

It is coming time to work on split(), and I am rethinking it. It should look like split(..., .by) and probably take multiple variables for ... and .by.

On second thought, let's hold off on split(). map() might already have everything we need.

Thoughts on dynamic triggering:

  • For sub-targets, hold on to the metadata of the parent. Put it in the layout.
  • Use different S3 methods for handling triggers: static vs dynamic parent vs dynamic sub-target.
  • condition and change triggers should apply to sub-targets, not parents.

On second thought, let's leave the condition and change triggers as they are. Let's prevent people from using dynamic grouping variables inside condition and change. E.g. this should not be allowed:

drake_plan(
  x = seq_len(4),
  y = target(x, trigger = trigger(condition = x > 2), dynamic = map(x)),
  z = target(x, trigger = trigger(change = x), dynamic = map(x)),
)

To avoid duplicating code over various HPC backends, let's have backend_loop() use the priority queue. With that direction, I will likely work on HPC before triggers.

Registering dynamic sub-targets requires us to modify config objects, specifically the layout, graph, and priority queue. Because of the way the internals are currently structured, it would be best to modify these objects by reference. We already do this with the priority queue, and it is straightforward enough to use an environment instead of a list for the layout. But we may have to wrap the graph in an environment of its own. Added some action items.

Unfortunately, dynamic branching is currently slower than static branching when it comes to actually building targets.

library(drake)

plan_dynamic <- drake_plan(
  x = seq_len(1e4),
  y = target(x, dynamic = map(x))
)

plan_static <- drake_plan(
  z = target(w, transform = map(w = !!seq_len(1e4)))
)

cache_dynamic <- storr::storr_rds(tempfile())
cache_static <- storr::storr_rds(tempfile())

system.time(
  config_dynamic <- drake_config(
    plan_dynamic,
    cache = cache_dynamic,
    verbose = 0L
  )
)
#>    user  system elapsed 
#>   0.026   0.003   0.030

system.time(
  config_static <- drake_config(
    plan_static,
    cache = cache_static,
    verbose = 0L
  )
)
#>    user  system elapsed 
#>   1.904   0.004   1.910

system.time(
  suppressWarnings( # different issue
    make(config = config_dynamic)
  )
)
#>    user  system elapsed 
#>  78.014   3.630  81.767

system.time(
  suppressWarnings(
    make(config = config_static)
  )
)
#>    user  system elapsed 
#>  32.712   3.195  36.049

Created on 2019-11-02 by the reprex package (v0.3.0)

The good news is that make() is much faster to initialize. Because we have smaller plans, drake_config() runs super quickly. And for subsequent make()s, it is faster to check if everything is up to date.

library(drake)
library(profile)
library(jointprof)

plan_dynamic <- drake_plan(
  x = seq_len(1e4),
  y = target(x, dynamic = map(x))
)

plan_static <- drake_plan(
  z = target(w, transform = map(w = !!seq_len(1e4)))
)

cache_dynamic <- storr::storr_rds(tempfile())
cache_static <- storr::storr_rds(tempfile())

system.time(
  config_dynamic <- drake_config(
    plan_dynamic,
    cache = cache_dynamic,
    verbose = 0L
  )
)
#>    user  system elapsed 
#>   0.027   0.003   0.032

system.time(
  config_static <- drake_config(
    plan_static,
    cache = cache_static,
    verbose = 0L
  )
)
#>    user  system elapsed 
#>   3.525   0.004   3.530

Rprof(filename = "dynamic.rprof")
suppressWarnings(
  system.time(make(config = config_dynamic), gcFirst = FALSE)
)
#>    user  system elapsed 
#>  99.096   3.656 102.928
Rprof(NULL)
data <- read_rprof("dynamic.rprof")
write_pprof(data, "dynamic.pprof")

Rprof(filename = "static.rprof")
suppressWarnings(
  system.time(make(config = config_static), gcFirst = FALSE)
)
#>    user  system elapsed 
#>  52.112   3.708  55.916
Rprof(NULL)
data <- read_rprof("static.rprof")
write_pprof(data, "static.pprof")

suppressWarnings(
  system.time(make(config = config_dynamic), gcFirst = FALSE)
)
#>    user  system elapsed 
#>   3.239   0.164   3.418

suppressWarnings(
  system.time(make(config = config_static), gcFirst = FALSE)
)
#>    user  system elapsed 
#>  13.847   0.472  14.347

file.copy("dynamic.pprof", "~/Downloads")
#> [1] TRUE
file.copy("static.pprof", "~/Downloads")
#> [1] TRUE

Created on 2019-11-02 by the reprex package (v0.3.0)

I used those pprof files at the bottom to generate the flame graphs below. The one on the left is from static branching, and the one on the right is from dynamic branching.

Screenshot_20191102_194727

It looks like the main hangup is loading sub-target dependencies and registering sub-targets. Not too surprising. Speeding this up is going to be another slow-going long-term project. If you have more examples that demonstrate slowness, please post them. It took a long time to get static branching as fast as it is now, and I expect the same for dynamic branching.

Corrections to https://github.com/ropensci/drake/issues/685#issuecomment-541460119

The implementation in #1042 is different from https://github.com/ropensci/drake/issues/685#issuecomment-541460119. In particular, the flowchart in https://user-images.githubusercontent.com/1580860/66722470-27ede180-eddc-11e9-97ea-930c5a93d287.png.

Procedure for sub-targets

The procedure for sub-targets is actually simpler than I had originally planned.

  1. Check the static triggers of the dynamic target.
  2. If any static trigger fires, build all the sub-targets.
  3. If the static triggers do not fire, check all the sub-targets individually. It is not enough to check the dynamic dependencies as a whole because some of the sub-targets could have been deleted since the last make().

Procedure for dynamic targets as a whole

Each dynamic target has its own value alongside the values of the sub-targets. We recompute this value if

  1. Any sub-target changed, or
  2. Any dynamic dependency changed as a whole.

Why (2)? Because in some situations, we already have all the sub-targets, but we use fewer of them.

library(drake)
plan <- drake_plan(
  x = seq_len(3),
  y = target(x, dynamic = map(x))
)
make(plan)
#> target x
#> subtarget y_0b3474bd
#> subtarget y_b2a5c9b8
#> subtarget y_71f311ad

# readd() and loadd() understand dynamic targets.
readd(y)
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 2
#> 
#> [[3]]
#> [1] 3

# But a dynamic target is really just a vector of hashes.
cache <- drake_cache()
cache$get("y")
#> [1] "3908fe5069df3c28" "16b3cb68bd4872ed" "1a3b3c0d06147d80"
#> attr(,"class")
#> [1] "drake_dynamic"

# What if we shorten y?
plan <- drake_plan(
  x = seq_len(2),
  y = target(x, dynamic = map(x))
)

# y needs to change, but we leave the sub-targets alone.
make(plan)
#> target x

# readd() and loadd() understand dynamic targets.
readd(y)
#> [[1]]
#> [1] 1
#> 
#> [[2]]
#> [1] 2

# But a dynamic target is really just a vector of hashes.
cache$get("y")
#> [1] "3908fe5069df3c28" "16b3cb68bd4872ed"
#> attr(,"class")
#> [1] "drake_dynamic"

Created on 2019-11-02 by the reprex package (v0.3.0)

Why the cryptic sub-target names?

The sub-target names are ugly (e.g. y_71f311ad1) but incredibly useful.

  1. The suffixes of sub-targets are hashes of dynamic sub-dependencies. In other words, the act of computing the name is the same as checking if it is already up to date! All we need to do is check that the name exists in the cache! (After static triggers, of course.)
  2. The prefixes of static DSL get long and cumbersome too easily. A hash solves this problem because it has a fixed length by design, and it remains valid for all kinds of dynamic dependencies.
  3. A natural alternative is to index the sub-targets numerically, e.g. y_1, y_2, etc. (In fact, that is what I originally proposed in https://github.com/ropensci/drake/issues/685#issuecomment-541460119.) But if we did that, we would invalidate y_2 every time we insert an element in the middle of x. With hashes, we do not have this problem: the sub-targets of y can be in any order and still remain valid.
library(drake)
plan <- drake_plan(
  x = c("a", "b"),
  y = target(x, dynamic = map(x))
)

make(plan)
#> In drake, consider r_make() instead of make(). r_make() runs make() in a fresh R session for enhanced robustness and reproducibility.
#> target x
#> subtarget y_89ca58a1
#> subtarget y_38e75e51

plan <- drake_plan(
  x = c("a", "inserted_element", "b"),
  y = target(x, dynamic = map(x))
)

# Only one sub-target needs to build.
make(plan)
#> target x
#> subtarget y_06d53fef

# Permute x.
plan <- drake_plan(
  x = c("inserted_element", "b", "a"),
  y = target(x, dynamic = map(x))
)

# All sub-targets are still up to date!
make(plan)
#> target x

Created on 2019-11-02 by the reprex package (v0.3.0)

Implemented in #1042.

Also noteworthy: mapping over rows: https://github.com/ropensci/drake/pull/1042#issuecomment-549096614

One source of overhead I overlooked: computing the hashes of sub-values that go into the names of sub-targets. Unavoidable, but not terrible.

Dynamic parent targets are already vectors of hashes, so we can avoid this overhead if the dynamic dependency is itself dynamic: 5a07f675b1d0b648d6d61b6fa4cba2465c7bc941. Otherwise, we need to compute the hashes of all the sub-values.

Update: dynamic branching just got a huge speed boost in #1089 thanks to help from @billdenney and @eddelbuettel. With improvements both in development drake and development digest, dynamic branching is now about 33% faster than static branching overall. Benchmarking workflow: https://github.com/wlandau/drake-examples/blob/master/overhead/dynamic.R vs https://github.com/wlandau/drake-examples/blob/master/overhead/static.R.

Was this page helpful?
0 / 5 - 0 ratings