This is a pretty popular blog post from 2015 that helps illustrate
one way of thinking about concurrency and async I/O:
What Color is Your Function?
Here's a proposal along these lines. I think we should at least consider it.
Depends on:
@import("root") in all packages, so that the standard library and third party packages can access it.Goals:
In summary, writing Zig code should be maximally portable. That means proper
Zig libraries could work in constrained memory environments, multi-threaded environments,
single-threaded environments, blocking I/O applications, evented I/O applications,
inside OS kernels, and userland applications. And not only work correctly in these environments, but work optimally.
Implementation:
async notation from functions. A function is a coroutine if it has aawait or suspend in it. This is still part of the function's prototype; howeverStandard library functions that perform I/O, such as std.os.File.write, have bodies that look like
this:
pub fn write(file: *File, bytes: []const u8) usize {
// Note that this is an if with comptime-known condition.
if (std.event.loop.instance) |event_loop| {
const msg = std.event.fs.Msg{
.Write = std.event.fs.Msg.Write{
.handle = file.handle,
.ptr = bytes.ptr,
.len = bytes.len,
.result = undefined,
},
};
suspend {
event_loop.queueFsWrite(@handle(), &msg);
}
return msg.Write.result;
} else {
// blocking call
return std.os.linux.write(file.handle, bytes.ptr, bytes.len);
}
}
In std/event/loop.zig:
threadlocal var per_thread_instance: ?*Loop = null;
var global_state: Loop = undefined;
const io_mode = @fieldOrDefault(@import("root"), "io_mode", IoMode.Blocking);
const default_instance: ?*Loop = switch (io_mode) {
.Blocking => null,
.Evented => &global_state,
.Mixed => per_thread_instance,
};
const instance: ?*Loop = @fieldOrDefault(@import("root", "event_loop", default_instance);
In the root source file, pub const io_mode determines whether
the application is 100% blocking, 100% evented, or mixed (per-thread).
If nothing is specified then the application is 100% blocking.
Or an application can take even more control, and set the event loop instance directly.
This would potentially be used for OS kernels, which need an event loop specific to their
own code.
When the IO method is Mixed, in the standard library event loop implementation,
worker threads get a thread local variable per_thread_instance set to the event loop
instance pointer. The "main thread" which calls run also sets this thread local
variable to the event loop instance pointer. In this way, sections of the codebase
can be isolated from each other; threads which are not in the thread pool of the
event loop get blocking I/O (or potentially belong to a different event loop) while
threads belonging to a given event loop's thread pool, find their owner instance.
Now let's look at some everyday code that wants to call write:
fn foo() void {
const rc = file.write("hello\n");
}
// assume this is in root source file
pub const io_mode = .Evented;
Because io_mode is Evented the write function ends up calling suspend
and is therefore a coroutine. And following this, foo is also therefore a
coroutine, since it calls write.
When a coroutine calls a coroutine in this manner, it does a tail-async-await
coroutine call using its own stack.
But what if the code wants to express possible concurrency?
fn foo() void {
var future1 = async file1.write("hello1\n");
var future2 = async file2.write("hello2\n");
const rc1 = await future1;
const rc2 = await future2;
}
async is a keyword that takes any expression, which could be a block, but in
this case is a function call. The async expression itself returns a coroutine frame
type, which supports await. The inner expression result is stored in the frame,
and is the result when using await. I'll elaborate more on async blocks later.
If an application is compiled with IoMode.Blocking, then the write function is
blocking. How async interacts with an expression that is all blocking,
is to have the result of the async expression be the result of the inner expression.
So then the type of future1 and future2 remain the same as before for consistency,
but in the code generated, they are just the result values, and then the await
expressions are no-ops. The function is essentially rewritten as:
fn foo() void {
const rc1 = file1.write("hello1\n");
const rc2 = file2.write("hello2\n");
}
Which makes foo blocking as well.
What about a CPU bound task?
fn areTheyEqual() bool {
var pi_frame = async blk: {
std.event.loop.startCpuTask();
break :blk calculatePi();
};
var e_frame = async blk: {
std.event.loop.startCpuTask();
break :blk calculateE();
};
const pi = await pi_frame;
const e = await e_frame;
return pi == e;
}
Here, startCpuTask is defined as:
fn startCpuTask() void {
if (@import("builtin").is_single_threaded) {
return;
} else if (std.event.loop.instance) |event_loop| {
suspend {
event_loop.onNextTick(@handle());
}
}
}
So, if you build this function in multi-threaded mode, with io_mode != IoMode.Blocking,
the async expression suspends in the startCpuTask and then gets resumed by the event
loop on another thread. areTheyEqual becomes a coroutine. So even though
calculatePi and calculateE are blocking functions, they end up executing in different
threads.
If you build this application in --single-threaded mode, startCpuTask ends up being
return;. It is thus not a coroutine. And so the async expressions in
areTheyEqual have only blocking calls, which means they turn into normal expressions,
and after the function is analyzed, it looks like this:
fn areTheyEqual() bool {
const pi = calculatePi();
const e = calculateE();
return pi == e;
}
Ability to write code that works in an event-driven or blocking context.
For example, libraries should be able to use OS features such as the
file system and networking without having an opinion about whether to
use blocking or event-based APIs.
This is a great goal. Being able to write a library that e.g. talks to redis, without caring if it's calling async or sync functions is great: it means that the library becomes a non-blocking redis library if run from the right place!
In an application that never creates an event loop, there should be
no code generated to deal with event loops.
I think the answer here depends: some code will need an event loop. However this can be made transparent: the caller doesn't necessarily need to know. One example I had recently was DNS lookups: you want to do both A and AAAA lookups, and would like to parallelize them:
fn resolve(host: []u8) []dnsResults {
var r = dnsResults.init();
var t1 = newthread { r.lookupA(host) };
var t2 = newthread { r.lookupAAAA(host) };
waitFor(t1, t2);
return r;
}
This example has no need to yield back to the main loop: the event loop can happen directly inside of this function. However in an existing main loop, waiting on the records should be done at the top level.
In an application that never uses blocking I/O, there should be no
runtime overhead associated with detecting the event loop, and no code
generated at all to do blocking I/O.
If possible this would be good. Though maybe not possible?
Code can even express concurrency, such as two independent writes,
and then wait for them both to be done, and then if this code is used
from a blocking I/O application, with --single-threaded, it is as if
it were implemented fully in a blocking fashion.
I covered this above.
I think you just need the following functions on coroutines at a language level.
create(allocator: *Allocator, stackSize: usize, continuation: fn(arg: T) void) OutOfMemory!*Coroutine: allocate a new stack and set it up however is required.stack fieldresumedBy:?*Coroutine fieldcontinuation: fn(arg: T) T gets pushed onto the new stack. It takes a customisable type (which is the type that can be passed to/from resuming/yielding)complete:bool field indicating if the coroutine is finishedisRunning(co: *Coroutine) bool returns if the passed coroutine is currently runningco.resumedBy != nullrunning() ?*Coroutine returns the currently running coroutine (saved in thread-local storage)null if not in a coroutine (e.g. at _init)resume(co: *Coroutine, arg: T) Tassert that co is not currently runningco.resumedBy gets set to the current coroutine.coco.stack@yield(ret: T) Tassert that running().resumedBy != nullT (which is what will get run on resumption)co.resumedByresume return retI'm finding it difficult to express what I'm suggesting. Hopefully this sample pseudo-zig-code makes sense:
const JobList = std.LinkedList(void);
struct Job {
node: JobList.Node, // intrusive linked list
co: Coroutine,
waitingOn: []WaitArg,
readyWaits: ArrayList(WaitArg),
onComplete: ?fn (*Job) void,
pub fn init(f: fn(void) void, onComplete: ?fn (*Job) void) Job {
return Job{
.node = JobList.Node.init(undefined),
.co = coroutine.create(self.allocator, @stackRequired(f), startJob), // something here to pass f
.waitingOn = null,
.onComplete = onComplete,
}
}
}
threadlocal var currentLoop: ?&eventLoop = null;
threadlocal var currentJob: ?&Job = null;
struct eventLoop {
allocator: *Allocator,
pendingJobs: JobList,
pub fn empty(self:*eventLoop) bool {
return pendingJobs.len == 0;
}
pub fn step(self: *eventLoop) !void {
var ready: []waitArg = undefined;
if (currentLoop != null) {
// Let parent event loop do the waiting
ready = wait([]WaitArg{ { .EventLoop = &self } });
} else {
// There is no event loop above us, do the OS-level polling here
ready = someOsPoll(self.miscPrivateFields); // e.g. epoll, poll, select... whatever platform API is best
}
for (ready) |wait| {
for (findJobsThatWant(wait)) |job| {
job.readyWaits.add(wait); // on failure could just ignore, if not using edge-triggered events it will trigger again on next step() call
try self.pendingJobs.append(job);
}
}
var oldLoop = currentLoop;
currentLoop = self;
for (self.pendingJobs) |job| {
var oldJob = currentJob;
currentJob = &job;
job.waitingOn = job.co.resume(job.readyWaits.toSlice());
currentJob = oldJob;
job.readyWaits.shrink(0);
if (job.co.complete) {
self.pendingJobs.remove(job);
job.onComplete();
continue;
}
// update miscPrivateFields based on job.waitingOn
}
currentLoop = oldLoop;
}
pub fn loop(self: *eventLoop) !void {
while (!self.empty()) {
try self.step();
}
}
pub fn addJob(self: *eventLoop, job: *Job) void {
self.pendingJobs.append(job);
}
pub fn newJob(self: *eventLoop, f: fn(void) void) !*Job {
var job = try self.allocator.create(Job.init(f, allocator.destroy));
addJob(self, job);
return job;
}
}
const WaitType = enum { Timeout, PollFD, EventLoop }; // + any other OS primitives to wait on
const WaitArg = union(WaitType) {
Timeout: f64,
PollFD: struct {
fd: i32,
events: i32, // mask of POLLIN|POLLOUT|POLLPRI and possibly other poll() flags
},
EventLoop: *eventLoop, // We build in support so that event loops are themselves wait-able. This allows multiple layers of loops to be composable.
};
pub fn wait(arg: []WaitArg) ![]WaitArg {
if (currentJob.co == coroutine.running()) { // if we are inside of a managed coroutine then let the loop do the work
return @yield(arg);
} else {
// otherwise create a new single-use loop. could have a global one prepared.
var loop = eventLoop.init(); // stack allocated
defer loop.destroy();
var job = Job.init(wait); // stack allocated job
loop.addJob(&job);
try loop.loop();
// filter arg based on job.readyWaits
return arg;
}
}
Then the current std.os.posixWrite could be written as (changed the EAGAIN branch, previously was unreachable):
pub fn posixWrite(socket: *Socket, bytes: []const u8) usize {
const max_bytes_len = 0x7ffff000;
var index: usize = 0;
while (index < bytes.len) {
const amt_to_write = math.min(bytes.len - index, usize(max_bytes_len));
const rc = posix.write(fd, bytes.ptr + index, amt_to_write);
const write_err = posix.getErrno(rc);
switch (write_err) {
0 => {
index += rc;
continue;
},
posix.EINTR => continue,
posix.EAGAIN => {
var _ = try wait([]WaitArg{ { .PollFD = { .fd = fd, .events=POLLOUT } }});
continue;
}
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EBADF => unreachable, // always a race condition
posix.EDESTADDRREQ => unreachable, // connect was never called
posix.EDQUOT => return PosixWriteError.DiskQuota,
posix.EFBIG => return PosixWriteError.FileTooBig,
posix.EIO => return PosixWriteError.InputOutput,
posix.ENOSPC => return PosixWriteError.NoSpaceLeft,
posix.EPERM => return PosixWriteError.AccessDenied,
posix.EPIPE => return PosixWriteError.BrokenPipe,
else => return unexpectedErrorPosix(write_err),
}
}
}
I don't think this code can accomplish the stated goals:
fn resolve(host: []u8) []dnsResults {
var r = dnsResults.init();
var t1 = newthread { r.lookupA(host) };
var t2 = newthread { r.lookupAAAA(host) };
waitFor(t1, t2);
return r;
}
I'd need to understand what newthread and waitFor are doing. It looks like they allocate resources, and have the potential for failure, yet the parameters accept no allocator and the return type indicates no error.
If this code were to be used by a target which had no event loop abilities, this function needs to be purely blocking, which means that it does the lookups serially, and there should be no possibility of OutOfMemory error. Zig code cannot use an event loop without one being set up, in userland. There is no event loop runtime set by the language.
In an application that never uses blocking I/O, there should be no
runtime overhead associated with detecting the event loop, and no code
generated at all to do blocking I/O.
If possible this would be good. Though maybe not possible?
I just outlined how this is possible. Do you see a flaw that would be prevent me from implementing it?
It looks like you're advocating for stackful coroutines, while my proposal is built on the premise of stackless coroutines.
I have a strong stance against stackful coroutines. This kind of concurrency is not really better than creating threads, from a performance and scheduling perspective. One may as well use the OS for what it was designed for. But my main issue with it is that it makes parallelism too intentional. We're back to the threading model for concurrency rather than a more powerful abstraction. Stackless coroutines are also known as "continuation passing style". This allows us to have coroutines without heap allocation, or at least with limited heap allocation. In Zig we don't get to allocate memory for free; it comes at the cost of an extra allocator parameter, and a possible OutOfMemory error, neither of which are typically required for blocking calls.
What I have outlined here is a way that will work in Zig, and it even allows expressing "optional concurrency". It's not clear to me what problems you are pointing out in my proposal, or what you are trying to solve with your pseudocode.
I think in order to be convincing here, you'd have to show me a use case of userland code you would want to write, but using my proposal find yourself unable to express it, however with your counter-proposal the use case would be rectified.
I'd need to understand what newthread and waitFor are doing. It looks like they allocate resources, and have the potential for failure, yet the parameters accept no allocator and the return type indicates no error.
If this code were to be used by a target which had no event loop abilities, this function needs to be purely blocking, which means that it does the lookups serially, and there should be no possibility of OutOfMemory error. Zig code cannot use an event loop without one being set up, in userland. There is no event loop runtime set by the language.
I've rewritten that example to use my earlier imaginary library code:
fn resolve(host: []u8) []dnsResults {
var r = dnsResults.init();
// create a stack allocated loop
var loop = eventLoop.init();
defer loop.destroy();
// create a stack allocated Jobs
// need to figure out style for passing a callback + argument(s)
// for now I'm pretending we have closures with some sort of block syntax.
var j1 = Job.init({ r.lookupA(host) }, null);
var j2 = Job.init({ r.lookupAAAA(host) }, null);
// Add our new jobs to the loop
loop.addJob(&j1);
loop.addJob(&j2);
// run loop until no jobs left.
// as our loop only has 2 jobs, this is equivalent to waiting for our two jobs
//
// it could throw an error if e.g. epoll_wait() syscall fails.
// I also would propagate any errors in a job to here.
// i.e. if a Job returns an error, it should be returned from .loop
// preferably along with a pointer to the Job that errored.
try loop.loop();
return r;
}
I have a strong stance against stackful coroutines. This kind of concurrency is not really better than creating threads, from a performance and scheduling perspective.
I disagree. It's been widely shown that OS threads are too expensive to have e.g. one per HTTP request.
But my main issue with it is that it makes parallelism too intentional.
From the zen of zig: "Communicate intent precisely."
However with my proposal, the user only needs to know about the abstraction when they wish to have things happening in parallel. The rest of the code may remain oblivious.
What I have outlined here is a way that _will work_ in Zig, and it even allows expressing "optional concurrency". It's not clear to me what problems you are pointing out in my proposal, or what you are trying to solve with your pseudocode.
I'm slowly coming around to your proposal.
I'm wondering how it would work for interop with non-zig code:
continue function to be exported?The global io_mode seems a bit weird to me. Especially as zig is often used to create libraries called from other lanaguages.
Could it be a per-function flag?
I think using a per-function flag defeats the mitigation of the red/blue color discrepancy of functions. If I understand correctly, the io_mode would be relevant mostly for exes? Since libraries would either:
io_mode which any consumer must take into account when choosing, orio_modes making the library usable by any type of application. Maybe I'm wrong, but I don't think hiding the asynchronicity of the function is in line with Zig's goals.
Knowing if an operation is async or not is important, because if the event loop is single-threaded, a blocking operation means that nothing else can happen during that operation.
For example, imagine that you're working on an async web application written in Zig, and you're looking at the account creation part.
fn handle_account_creation(login: []const u8, password: []const u8) !void {
if (try sql("SELECT count(*) FROM users WHERE login = :?", login) != 0) {
return error.UserAlreadyExist;
}
try sql("INSERT INTO users VALUES (:?, ...)", login, ...);
}
If the sql function is async, then this (pseudo-)code has a potential race condition if two account creation requests with the same login happen at the same time.
In that case, the check if the user already exist may pass for both requests, and then you end up with two users with the same login.
If asynchronicity is implicit, then it's hard to know if sql will actually suspend: You have to check its implementation, and if it depends on io_mode, then you have to check that value too.
If it's explicit, then the await is a clear indication that some other code may get run while the function is suspended.
Thanks for the specific example code.
If the
sqlfunction is async, then this (pseudo-)code has a potential race condition if two account creation requests with the same login happen at the same time.
Isn't this the same problem is the code is blocking? If you do a thread per request, then handle_account_creation could be called from more than one thread, so you have this race condition either way.
From the perspective of the handle_account_creation, the sql operations are equivalent to using a global variable for a temporary variable, since the sql database is global to multiple calls to handle_account_creation. Although technically they would be OK in a single-threaded, blocking environment, functions which use global state as temporary values should generally be avoided, and when used, should be carefully documented under what conditions it is appropriate to call them.
More bluntly, I would reject this as a valid use case because the code is inherently flawed, and thus I don't think it should really be considered a guiding example for this proposal.
Most systems that has to manage external state (SQL, KV-stores etc.) that is distributed in some fashion has this problem, irrespective of if it's single threaded/blocking calls or full async, since it's possible to run more than one instance that is talking to the data APIs. Handling the case where one user might send two requests for being created in the database should be done in the database with e.g. UNIQUE indices and stuff like that, because that's one of the responsibilities of the RDBMS.
An interesting example with regards to this discussion is full async coroutines in Lua that are used in a fully procedural manner.
One thing I am uncertain of with regards to the Lua example, is that it is it not obvious how or if it is possible to run the async functions in parallel. I mean, in the example, the two database queries are sequentially executed, whereas, I can think of situations I would like to execute them in parallel, so that they both wait on I/O simultaneously.
Doing a bit of digging in the pgmoon library, it does support parallel execution (from the README):
This method also supports sending multiple queries at once by separating them with a ;. The number of queries executed is returned as a second return value after the result object. When more than one query is executed then the result object changes slightly. It becomes a array table holding all the individual results:
local res, num_queries = pg:query([[
select id, name from users;
select id, title from posts
]])
One thing I am uncertain of with regards to the Lua example, is that it is it not obvious how or if it is possible to run the async functions in parallel. I mean, in the example, the two database queries are sequentially executed, whereas, I can think of situations I would like to execute them in parallel, so that they both wait on I/O simultaneously.
In openresty (which is where pgmoon runs), you create a new stackful coroutine with e.g. ngx.timer.at(0, function() ...... end)
Doing a bit of digging in the pgmoon library, it does support parallel execution
I think this is a postgres feature rather than a pgmoon one.
Really like the proposal @andrewrk -- am new to Zig but falling in love very quickly :)
Done with the merge of #3033. Opening new issues for the remaining details to be ironed out.
Most helpful comment
Done with the merge of #3033. Opening new issues for the remaining details to be ironed out.