Crystal: [RFC] Structured Concurrency

Created on 30 Jul 2018  Â·  39Comments  Â·  Source: crystal-lang/crystal

Crystal has an great concurrency model based on Fibers and Channels, which can be used to pass messages around.

Fibers are conceptually pretty simple. If you spawn one, it takes off from the main context and runs concurrently for an indefinite amount of time, depending on what it does and how long it takes to do this. From the perspective of the main control flow, it's essentially fire and forget.

Real life problems typically ask for a more sophisticated way of handling concurrent tasks. Sometimes you need to wait for either one, some or all tasks to be finished before continuing the main scope. Error handling in concurrent tasks is also important and the ability to cancel the remaining tasks if others have finished or errored.

Fibers and Channels can be used to implement a model for structured concurrency. Given that this is a pretty common idiom, I'd like to see a generalized implementation in Crystal's stdlib.

What we have

HTTP::Server#listen uses a custom implementation of a wait group executing a number of tasks simultaneously and waiting for all to finish. Other examples are in the parallel macro or Crystal::Compiler#codegen_many_units. parallel is the only feature of structured concurrency currently available in the stdlib, but it is only suitable for a fixed number of concurrent tasks that are known at compile time.

A more generalized approach would help to make this concept easily re-useable.
It can be implemented based on the existing features that Fiber and Channel provide. The only thing that's missing is a way to deliberatly kill fibers and unwrap their stack (see #3561, and a proposed implementation in #6450).

Background

I recommend reading the articles referenced below. They both describe a model of structured concurrency which essentially restricts the execution of concurrent tasks to a specific scope and having tools to manage them. This contrasts with the model of go (Go) and spawn (Crystal) which just fires off a new fiber without caring about it's life cycle. This makes it hard to follow control flow: what happens where and when in which scope.

The main idea of this proposal is to understand that each fiber is limited to the scope it is executed in:

Every time our control splits into multiple concurrent paths, we want to make sure that they join up again.

This ensures that fibers don't get lost doing whatever stuff they might not even be supposed to do anymore.
I believe this concept can be applied to almost any real-life use case of fibers.
Having a structured flow of control also allows for a proper exception flow. Right now, unhandled exceptions within a fiber are just printed and ignored. When a fiber is scoped to some parent context, an exception can just be propagated there.

Prototype

I have implemented a simple prototype of a concurrency feature (based on Fiber.cancel from #6450). The idea is to have a coordination tool for running fibers, called a Spindle. It is used to spawn fibers and ensure to collect them. This particular implementation allows running multiple tasks concurrently and if one of them fails, it cancels all the others. This is of course just an example of behaviour, there are many different ways to react.

The code can be found at: https://gist.github.com/straight-shoota/4437971943bae7000f03fabf3d814a2f

I don't have a concrete proposal how this should be implemented in terms of stdlib API's but the general idea is to provide tools for running tasks concurrently. We could even think about removing unscoped spawn (it can be considered harmful after all), but that's not necessarily required and can probably be decided upon later.

References

Some examples of similar libraries:

Most helpful comment

Promises isn't Structured Concurrency. Don't mix concepts, please :)

Structured Concurrency is about controlling the lifetime of nested coroutines (they can't outlive their parent). We can spawn new coroutines at any time (e.g. on Socket#accept) and eventually tell one or many to stop at any time. Lifetime is all that's cared about —use channels to pass values.

A promise defers computation of a fixed set of fibers to return a set of values. Since Promise.all waits for values, you can't add a fiber at any time to the list (e.g. Socket#accept) and the current fiber is blocked. Promise.all only happens to fake Structure Concurrency.

Passing a mutable Array to Promise.all would be an ugly hack, and prone to concurrency issues: I can push a fiber? what if previous fibers are finished already? we raise an exception? what if we have a TCP server that responded to all requests and got idle, then comes a new request? what if I pop or delete a fiber from the array?

Promises != Structured Concurrency.

All 39 comments

I'd rather write a "good" Promise implementation and then have Promise.all/Promise.first etc. As far as i'm concerned the abstraction of a fiber with a return value, called a promise or future, is a better and more composable than waitgroups and can still be used as waitgroups pretty easily.

Talking about abstractions based on cancellation should wait untilwe actually have cancellation.

Promises are more about communication than structuring. You can do a lot of things in a similar way, but it feels like an inferior solution to me.

Running two methods concurrently would look like this with a promise:

promise = future ->do_some_stuff
do_some_other_stuff
promise.get

In my example implementation it looks like this:

concurrent do |spindle|
  spindle.spawn ->do_some_stuff
  spindle.spawn ->do_some_other_stuff
end

It's little bit more verbose, but easier to follow control flow. Especially when it gets more complicated.

(Verbosity could actually be reduced when using with spindle yield, but I think I like the expressiveness better.)

One way or the other: a good promise implementation would need a way to cancel a fiber as well, wouldn't it?

@straight-shoota you could do it like that or you could use promises like this:

Promise.all(
  Promise.new ->func,
  Promise.new ->func2
)

which is both less verbose and more powerful since you can do

value1, value2 = Promise.all(
  Promise.new ->func,
  Promise.new ->func2
)

very easily to get the function's return values.

Changing it to "first fiber wins" aka a race becomes s/all/first/, just changing one function instead of having to refactor your code to use a different concurrency mechanism (consider the difference between a function taking a spindle as a param and returning a promise, thats a big refactor)

and JS promises have no way to cancel them, we don't need cancellation for them

These Promise.all and Promise.first are nice, but they only allow a static number of concurrent tasks. This is just like the parallel macro and not sufficient for a real-world application where tasks can be dynamically added at runtime (like connection handlers of a network server for example).

At least I would want to have a way to cancel a promise/fiber (and do it gracefully) in case its result/effect is no longer required. It shouldn't block resources for unnecessary work.

@straight-shoota No, .all and .first can easily take arrays.

How would that work with adding child tasks during runtime? You would have to append them to the array - somehow that could probably be made to work with .all and .first. But it makes a really ugly API using an array as registry for tasks.

Ah I see what you want. We could easily add block-based versions of Promise.first and Promise.all which do what you want, but I can't help but feel that adding tasks at runtime would be hard to track. I'd be happy to do that though.

Promises isn't Structured Concurrency. Don't mix concepts, please :)

Structured Concurrency is about controlling the lifetime of nested coroutines (they can't outlive their parent). We can spawn new coroutines at any time (e.g. on Socket#accept) and eventually tell one or many to stop at any time. Lifetime is all that's cared about —use channels to pass values.

A promise defers computation of a fixed set of fibers to return a set of values. Since Promise.all waits for values, you can't add a fiber at any time to the list (e.g. Socket#accept) and the current fiber is blocked. Promise.all only happens to fake Structure Concurrency.

Passing a mutable Array to Promise.all would be an ugly hack, and prone to concurrency issues: I can push a fiber? what if previous fibers are finished already? we raise an exception? what if we have a TCP server that responded to all requests and got idle, then comes a new request? what if I pop or delete a fiber from the array?

Promises != Structured Concurrency.

I was thinking more of a

Promise.all do |promises|
  promises << ...
end

interface in addition to the other promise interfaces to make it do wait groups as well.

I guess it's probably quite a hack though, and better to separate the two concepts.

@RX14 That doesn't seem much different from

concurrent do |spindle|
  spindle.spawn ...
end

So we pretty much want the same thing in this regard ;)

Honestly I kind of like the idea of having something other than promises, which I sometimes feel like is a bad solution but manages to be used everywhere...

@straight-shoota yeah I like the concept I was just wondering if we could work it into the "promise" concept for simplicity.

I've been writing a Promise library for Crystal Lang and it's almost complete (core implementation complete with specs)
https://github.com/spider-gazelle/promise

It might not be "Structured Concurrency" in the most strict sense however it does simplify coordinating a bunch of async events and it's quite a popular paradigm. Would love to see it in the standard library if that's something you would consider.

I think my implementation is pretty neat in any case:

require "promise"

promise = Promise.new(Int32)
result = p.then { |result| result.not_nil! + 100 }.then { |result| "change #{result} type" }
promise.resolve(10)

# Can also grab the value via a future
result.value # => "change 110 type"

Any .then block can change promise types as it propagates down the chain.
.catch blocks can only propagate exceptions and are used to recover values back to the initiating promise value type.

Thanks to the Crystal type safety it puts most promise implementations to shame.

I don't think a promise library in crystal would look anything at all like a JS promise library. .then isn't really required at all. Libraries should expose synchronous APIs, and any uses of Promise should really stay internal to the application.

In fact making promises too much like JS will mean people start using promises like JS and crystal promises absolutely should not be used the same as JS. They should be used pretty sparingly.

Well yeah (exposing synchronous APIs)
Not sure how you escape from using the equivalent of .then though it's kind of the je ne sais quoi of promises. Without it all you have is futures.

@stakach ah, my terminology was all messed up. I'd like futures in the stdlib and promises perhaps can be a shard built on that.

I'm still not sure what the conceptual difference between a promise and future would be in crystal though

I like to think of promises and futures as either end of a pipe, the promise is where I can put something in and the future is where I can wait and listen for the result.

Now taking this picture, I just described a Channel with a buffer capacity of one that can only be written to once. So perhaps for us promises would actually be just redundant to channels.

Promises are really complimentary to channels. Channels for distributing async work and promises for handling the results.

For instance, I'm working on updating this influxdb library to the latest version of Crystal lang as the original maintainer doesn't have the time.

The original version uses spawn to perform an async request (a channel here would be ideal) - however either way you can never know if the request succeeded and you don't always want to wait around at this point for the response. That bit of code looks pretty messy:

      if sync
        send_write(body).status_code == 204
      else
        spawn { send_write(body) }
        true
      end

A Promise in this case is the perfect solution.

  • You can ignore the result like the async implementation above
  • You can be synchronous if you want to be, using Promise#value
  • You can use a call back to handle a success or failure condition

For comparison, the promise version of the above would be:

promise = Promise.new(Bool)
channel.send {promise, body}
promise

Promises provide flexibility and are simple to use. I don't think they should be seen as competition to other flow control solutions, they are just one of many tools

Well, regardless of the terminology what I want is a fiber which can return a value, and you can wait for it to complete with a value or error

@stakach I don't think a promise is the right tool in your example. The library method #write should just be blocking like any Crystal IO method and directly call #send_write. It should be the caller's task to manage concurrency and execute this method in a separate fiber where appropriate. The library simply doesn't need to care about that.

This would be idiomatic Crystal with a simplified API by removing sync argument and a dependency on a promise implementation. It's consistent with other IO methods, including others on the same type (select, query, drop) don't seem to have an option to execute asynchronously.

@straight-shoota problem is running the write concurrently doesn't work on the current version of Crystal so it has to be via the channel to ensure serial writes.

Basically because the HTTP client response from influx is chunked, crystal yields the current fiber while it waits for IO mid HTTP response cycle.
Then I can start another request while the client is still processing the previous response which leads to weird errors. (not sure if this is an issue with the crystal lang HTTP::Client however if you take that influxdb library, make the minor changes to run on 0.25.1 and run the tests, it blows up) - this is a common issue with fibers and the library should definitely deal with this not the caller.

Yes, that's a shortcoming of HTTP::Client. It currently can't multiplex concurrent requests to the same endpoint over multiple connections. See #6011

Until this is resolved, you would need to use several client instances or guard one with a mutex. But this issue exists whether #write is async or not (it could always be called from a different fiber).

The problem of being called from multiple different fibers is already solved by using the channel - any fiber can call the code, the channel is used to make the HTTP::Client requests in serial and the promise returns the result to the calling fibers.

Very little complexity vs locks and/or multiple client instances

@RX14 I deliver you "a fiber which can return a value, and you can wait for it to complete with a value or error"
https://github.com/spider-gazelle/promise#simple-concurrency-control

It's not an alternative structured concurrency but I still think it's pretty cool.

@straight-shoota thanks for the advice - I threw a lock around the influxdb HTTP client, you were right, was definitely the way to go.
Also watched the Trio video does seem pretty cool. Look forward to Spindles landing!

Not sure how you escape from using the equivalent of .then

We had the same debates back there in Boost mailing lists:

sticker

Indeed. Go doesn't have promises either, I'm sure there's a good reason (might be lack of generics, but I'm not sure). I remember @waj was always against promises. The whole point of non-blocking IO and spawn is to avoid callback hell. But wait group is something good.

He overlooks the problem too much and the metaphors between one domain and the other are forced to match his worldview.

This article is nice.

Just two notes:

GC vs systems language

GC languages and systems-languages (i.e. languages where the sole runtime is the execution stack itself and you have to care about pointer and lifetimes) behave differently in such situations.

Thread-start functions (or, fiber-start functions) in GC'ed languages will be closures rather than actual plain functions and, if some value is captured from parent scope, these values won't be destroyed when the parent dies. So we don't usually worry about child threads/fibers overlapping the lifetime of the parent thread/fiber (just like what happens in Go's). If you need deterministic destruction of objects, just be explicit about it. pthread_cancel and pthread_join will do in C. Extending the use-case to the world beyond C, you'll probably follow Java's approach and change cancellation API to throw exceptions and triggers stack unwinding on the target thread/fiber. If you have C++, there will also be RAII scope guards properly handling cancellation requests (and I bet RAII is already outside of Crystal's scope).

Don't complicate the state machine

This author _too_ fancies about abstractions. There are two strategies here: graceful vs ungraceful shutdown. Interruption/cancellation API is about graceful shutdown. Don't fancy about timeouts and stuff. Ask the thread/fiber to gracefully shutdown and give it its time.

Want ungraceful shutdown leaking resources and possibly leaving live locks and breaking invariants on data structures? As a rule of thumb this is bad design and you don't want to turn into an idiom culturally seeded on your language. Two approaches here would be:

  • Spawn a real process (os.exit() won't clean any resources) with a greater level of isolation and kill the process when it fails to satisfy an associated watchdog.
  • kill() that unschedules the fiber forever as soon as scheduler has a chance. Marked with a BIG BOLD “don't use it if you don't know what you're doing” in the documentation. No fancying about timeout, just a simple API. If you want to give time for the fiber to finish, it's cancel() + timedjoin() (the only API specifically designed for dirty shutdown here is kill() and all others are a fundamental vocabulary that emerged elsewhere).

@vinipsmaker the fiber cancel method, that @straight-shoota has implemented, raises an error CancelledException which unwinds the stack (including finally blocks which would cleanup locks). So uses existing language features to implement nurseries in a really nice clean way

Is this impacted (e.g. made easier, harder, or superseded) in any way by the proposed MT support? Even if it's orthogonal, as suggested here, it would be nice to have eyes on the MT proposal with compatibility (or non-incompatibility) with this proposal in mind.

@chocolateboy Structured concurrency does not depend on multithreading, but it can easily integrate it. A concurrency context allows to configure very specifically whether tasks can be executed in parallel, how many threads, error handling etc.

I suppose only a limited version of this would be possible after 1.0, as a unlimited ability to spawn without a lifetime restriction isn't really compatible with the concept.

@yxhuvud Not sure I understand.
I'm convinced that when a tool for structured concurrency is available, ideally there should really be no reason to use a random anywhere. Best case scenario would be to avoid that completely. But you should still be able to have both, structured and unstructured fibers.
I don't see a reason why unrestricted spawns wouldn't work with a concept of structured concurrency. Technically, the structure would be build upon the unstructured primitives anyway.
Removing spawn as top level method would be possible when there's a different tool for spawning fibers in context. But it's not strictly necessary.

The way I see it, spawn coupled with Channel and select are the low-level concurrency primitives in Crystal. In fact, they are the same in Go. I see them similar to Erlang/Elixir receive and ! (send). People don't usually use them, they use abstractions on top of them (like GenServer), but from time to time they are useful... especially for building more abstractions.

So we should keep those in Crystal, and build good abstractions on top of them. For instance, actors would be nice.

"I don't see a reason why unrestricted spawns wouldn't work with a concept of structured concurrency."

Because it defeats the point. Users need to be forced to think about the lifetime of the things they spawn, and it should possible to see using visual inspection of the code what fibers can be running at any given point in the code. Or to quote the referred blogpost referred to above with the (quite telling) title notes-on-structured-concurrency-or-go-statement-considered-harmful :

The good news, though, is that these problems can all be solved: Dijkstra showed us how! We need to:

-   Find a replacement for go statements that has similar power, but follows the "black box rule",
-   Build that new construct into our concurrency framework as a primitive, and don't include any form of go statement.

The whole point of that whole post is a denouncement of the Go model with unbounded spawns!

And yes, fibers would obviously still be implemented using low level constructs. But probably used as often as Thread.new by end-user applications (ie never). The easiest way to create them (spawn) should not be the one with the worst long time maintainability properties. Hence my comment about a limited implementation (similar to how C still has goto).

Okay, I see what you mean. There's no technical reason, but it would make sense to advocate the higher level interface. I agree to that. And frankly, assuming its used in every API, there would be no reason for direct access to the low level interface.

I just feel that can only be achieved as a second step. The first step would be to design and implement a high-level interface for structured concurrency. Then wait for it being adopted. After that, the now (hopefully) unused features can be removed/reduced.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

oprypin picture oprypin  Â·  3Comments

relonger picture relonger  Â·  3Comments

ArthurZ picture ArthurZ  Â·  3Comments

pbrusco picture pbrusco  Â·  3Comments

cjgajard picture cjgajard  Â·  3Comments