Julia: `wait()` with timeout

Created on 10 Jun 2020  Â·  47Comments  Â·  Source: JuliaLang/julia

It appears that we do not support timeouts in wait(). To motivate the need for this, it's a pretty critical basic functionality for writing resilient servers; blocking forever due to a dropped packet is unacceptable, so servers must instead be written in an explicitly polling style, which is bad for latency and for CPU usage.

Implementation-wise, wait(c) waits upon a condition for a notify() to push it back onto the scheduler's work queue. It seems to me that timeouts can be implemented by starting a second task that sleep()'s for a certain amount of time, then unconditionally notify()'s that condition. The consumer of the wait() call would then need to disambiguate a timeout from an actual event.

Taking a look at the different things that can be wait()'ed upon, most are implemented in terms of Condition objects, so a very simple @async (sleep(timeout); notify(c)) should work, and the _FDWatcher from FileWatching also notifies a condition in the end, therefore I believe a uniform API is possible here.

parallel

Most helpful comment

I disagree that forcing the user to split receive success and receive failure behavior across two separate tasks is elegant.

It's currently intended to force you to think about how to only have one (active) recv call site,

That is precisely what I'm arguing for; that a single recv() can signal both receive success and receive failure. To put my original question into code, it would look something like:

val = recv(sock; timeout=10.0)
if val === nothing
    retransmit(sock)
    # Either return nothing, raise error, reiterate recv call, or something else
end

(I'm also open to recv() throwing a TimeoutError or something, if we want to try/catch here instead)

(every recv call site always needs to handle all the cases from all recv call sites—there's no way to avoid that)

That's exactly what I want; for the receive handling logic to be in once place not split across different places. I believe you're saying that the fundamental pattern should be:

was_successful = false
@async begin
    sleep(timeout)
    if !was_successful
        handle_error()
    end
    close(sock)
end

recv(sock)
was_successful = true

This design makes debugging more difficult, errors do not propagate in a linear manner (because some will be hidden in tasks that we do not wait() upon) and requires that I close() my socket for any issue that induces a timeout.

I do not think it is reasonable to require that users must close() then re-connect() or re-bind() their sockets if something times out. In many applications, (such as low-latency audio transport) it would be typical for a recv() to timeout many times a second, as you want to react very quickly to a packet missing a deadline.

All 47 comments

Maybe something like

function wait_timeout(c, timeout)
    ch = Channel(2)

    @async begin
        res = wait(c)
        push!(ch, Some(res))
    end

    @async begin
        sleep(timeout)
        push!(ch, nothing)
    end

    res = take!(ch)

    return res
end

That way if the return value is nothing, you know the operation timed-out.

The two implementations suggested here seem to leak tasks. It might be possible to avoid this by using Timer since it's close-able.

In general, it'd be nice if we can take concurrency API seriously #33248 and avoid unstructured concurrency piling up. Timeout is one of the biggest success story of structured concurrency, after all: Timeouts and cancellation for humans — njs blog

The two implementations suggested here seem to leak tasks.

What does that mean?

It means that tasks spawn by the function are still running after the function call ends. This violates the _black box rule_; i.e., all the relevant computations end at the end of the function (see the Julep and https://github.com/JuliaLang/julia/issues/33248#issuecomment-531139728). This is bad because the errors occur in the leaked tasks are silently ignored and there is no backpressure for flooding the task scheduler.

Can't that be solved by wrapping the entire function in a @sync block?

Don't we need to cancel the other task for that? That's why the black box rule and task cancellation are the two must-have properties of the structured concurrency.

(But it might be possible to solve this particular issue by just using Timer without implementing full-blown structured concurrency.)

Ah, you're right of course. I got confused and thought we were notifying c automatically here, but we're not.

@JeffBezanson is flooding the task scheduler with tasks all sitting and waiting on things an issue for the runtime? For instance, if I am timing out a lot, I could imagine adding a new task to the scheduler every couple of seconds, and for a long-running instance (like a StorageServer) this could add up to thousands of tasks.

notifying c

Yeah, notifying c acts as the cancellation in one direction. We can close the Timer for cancellation in the other direction.

Even without closing the timer, the timer will eventually elapse, and the task will go away. It may throw an error, but that's fine, because we're not catching them. ;)

Cleaner to close the timer, for sure.

What happens if you wait on the same thing twice and the first call didn't timeout? If you don't clean up the timer in the first one, isn't there a chance that the first timeout calls notify during the second wait?

(Edit: edited twice; first I thought it was stupid but then realized that it might happen for re-waitable objects like Channel)

Concretely:

c :: Channel
wait_timeout(c, 0.1) === nothing && return
x = take!(c)
wait_timeout(c, 0.1) === nothing && return  # the timer from the first `wait_timeout` can fire here
y = take!(c)

The timer within the first wait_timeout() is signaling on a separate Channel from the one in the second wait_timeout()? I don't see how the second wait_timeout() can be effected by the first.

It's taking two things from the same channel c. I thought your plan was to notify one of the cond_ objects in the channel (maybe c.cond_wait)?

Oh, I understand now. Your concern is that if wait_timeout(c) does a notify(c.cond_wait) internally when its timeout has elapsed (in order to stop the task from its wait()) it will falsely notify someone else who is also waiting on it. That's a good concern.

Perhaps instead we can instead throwto() in order to interrupt the other Task?

using Test

function wait_timeout(c, timeout)
    ch = Channel{Bool}(2)

    happy_task = @async begin
        wait(c)
        push!(ch, true)
    end

    timeout_task = @async begin
        sleep(timeout)
        push!(ch, false)

        # Stop the other task from waiting anymore
        Base.throwto(happy_task, InterruptException())
    end

    return take!(ch)
end

# First, test no wait, wait
@testset "no wait, wait" begin
    c = Channel(10)
    put!(c, 1)
    @test wait_timeout(c, 1.0) == true
    @test take!(c) == 1
    @test wait_timeout(c, 1.0) == false
end

# Next, test wait, no wait
@testset "wait, no wait" begin
    c = Channel(10)
    @test wait_timeout(c, 1.0) == false
    put!(c, 1)
    @test wait_timeout(c, 1.0) == true
    @test take!(c) == 1
end

# Next, test no wait, no wait, after sleeping for more than timeout:
@testset "no wait, no wait" begin
    c = Channel(10)
    put!(c, 1)
    @test wait_timeout(c, 1.0) == true
    @test take!(c) == 1

    sleep(2.0)
    put!(c, 2)
    @test wait_timeout(c, 1.0) == true
    @test take!(c) == 2
end

# Finally, test wait, wait
@testset "wait, wait" begin
    c = Channel(10)
    @test wait_timeout(c, 1.0) == false
    @test wait_timeout(c, 1.0) == false
end

I guess the question is whether we are absolutely positive that happy_task won't leave some shared global state (either c or ch, I guess?) in some corrupted state if that exception is thrown? I don't know how throwto is implemented, but it looks somewhat evil and risky to me :)

Is it so bad to continue to just wait in the happy task? The downside are that the task and the channel live longer than strictly necessary, I guess?

IIUC it's unsafe to call schedule(task, err, error=true) and throwto(task, err) with the task that is already scheduled. That's how I understand the comments from @vtjnash in https://discourse.julialang.org/t/stop-terminate-a-sub-task-started-with-async/32193/6

Reading the code briefly, my impression is that these functions assume the ownership of the thread-local queue. So, I guess it's unsafe to run them outside the scheduler code. But wait can use some internal assumptions so maybe there is a way out?

OK, so here is what I meant by Timer-based implementation (now I changed the return value to Bool):

function wait_timeout(c::Channel, timeout::Real)
    cancel = Atomic{Bool}(false)
    isready(c) && return true
    @sync begin
        timer = Timer(timeout)
        try
            @async begin
                try
                    wait(timer)
                catch
                    return
                end
                cancel[] = true
                lock(c.cond_wait) do
                    notify(c.cond_wait)
                end
            end

            lock(c)
            try
                while !isready(c)
                    check_channel_state(c)
                    wait(c.cond_wait)
                    cancel[] && return false
                end
            finally
                unlock(c)
            end
        finally
            close(timer)
        end
    end
    return true
end

I don't _think_ it leaks a task (or more precisely a leaked task would be cleaned up soon enough; it'd be nice if @sync is exception-safe so that we don't have this quirk).

It passes @staticfloat's tests (thanks for sharing those, BTW!).

Alright, this all inspired me to read up on the various links that @tkf posted.

All the structured concurrency stuff is interesting, but in terms of a short term solution that can be added incrementally to packages, I liked .Net's cancellation framework best, by far. We also need something like that for the language server rather sooner than later, and that is the same framework that the VS Code team is using across VS Code. Sooo, I created https://github.com/davidanthoff/CancellationTokens.jl :) For now it is not thread safe, so can only be used with single threaded tasks, but the .Net original is thread safe and the source MIT licensed, so we just need to copy their design over. But the public API of the package should be more or less done. Feedback and help welcome!

read up on the various links that @tkf posted.

Mission accomplished! :laughing:

https://github.com/davidanthoff/CancellationTokens.jl

So my initial response was "why not just use Atomic{Bool}" but I guess you have the state machine to ensure that the cancel call ends when all the "receiver" sides acknowledged the request?

There is also support for waiting on a cancel token, which I don't think would work with a pure atomic, right? And the .Net implementation supports callback handlers, which I haven't added, but I think also need more than just an atomic.

Ah, I see. That makes sense.

My implementation wait_timeout(c::Channel, timeout::Real) above https://github.com/JuliaLang/julia/issues/36217#issuecomment-643719590 looks like easily generalizable. In particular, I think we can implement waitfirst(waitables...) quite easily.

I wonder if it makes sense to define internal interface something like

wait′(waitable, cancel) -> success::Bool
_notify(waitable)

where cancel is a Ref{Bool}-like object that defines thread-safe cancel[]. For normal wait, we can use

struct NoCancel end
Base.getindex(::NoCancel) = false

It's pretty straightforward to implement this for Channel and Task:

using Base.Threads: Atomic
using Base: check_channel_state

function _wait(t::Task, cancel)
    if !istaskdone(t)
        lock(t.donenotify)
        try
            while !istaskdone(t)
                wait(t.donenotify)
                cancel[] && return false   # added this line
            end
        finally
            unlock(t.donenotify)
        end
    end
    return true   # returning `true` instead
end

function _notify(t::Task)
    lock(t.donenotify) do
        notify(t.donenotify)
    end
end

function wait′(t::Task, cancel = NoCancel())
    t === current_task() && error("deadlock detected: cannot wait on current task")
    ok = _wait(t, cancel)
    if istaskfailed(t)
        throw(TaskFailedException(t))
    end
    return ok
end

function _notify(c::Channel)
    lock(c.cond_wait) do
        notify(c.cond_wait)
    end
end

function wait′(c::Channel, cancel = NoCancel())
    isready(c) && return true   # returning `true` instead
    lock(c)
    try
        while !isready(c)
            check_channel_state(c)
            wait(c.cond_wait)
            cancel[] && return false   # added this line
        end
    finally
        unlock(c)
    end
    return true   # returning `true` instead
end

With this implementation, we can now define

function waitfirst(waitables...)
    winner = Ref{Any}()
    cancel = Atomic{Bool}(false)
    @sync for w in waitables
        @async begin
            if wait′($w, $cancel)
                $cancel[] = true
                foreach(_notify, $waitables)
                $winner[] = $w
            end
        end
    end
    return winner[]
end

I think it's possible to define wait′ and _notify for Timer. Once it's implemented, it's quite easy to derive wait_timeout from waitfirst:

function wait_timeout(w, timeout::Real)
    t = Timer()
    try
       return waitfirst(w, t) === w
    finally
       close(T)
    end
end

I don't think a function like one of these should be implemented by notifying the waited-for object. Others might be waiting for the same objects, and they would get spurious wakeups.

What are the other implementation strategies?

Maybe we can define waitfirst(waitables...) if we have waitfirst(::Condition, ::Condition) as a primitive? I'm guessing that the cancellable wait can then use "shared" (e.g., c.cond_wait for Channel) and "unshared" Conditions. If we notify the unshared condition for cancellation, we can avoid the spurious wakeups?

That seems rather complex, like you're trying to put a round peg (old multiplexing event systems) into a square hole (our Task-based ownership design). Our objects are intended to essentially already have both capabilities through the existing Task system, which is why (unlike Go and C#) we have been trying not to additionally bolt things like CancellationTokens and waitmultiple on afterwards (note that the implementation of CancellationTokens appears to itself mostly just be a thin wrapper around Timer and Future! 🙂). Here's how I intended the implementation of this to work:

function wait_until(c, timeout::Real) # `c` is any object that is both wait-able and cancel-able (e.g. any IO or a Channel, etc.)
    timer = Timer(timeout) do t
        isready(c) || close(c)
    end
    try
        return wait(c)
    finally
        close(timer)
    end
end

Short, sweet, simple, no minor resource leaks—and essentially how the timeouts are implemented for the various resources in the FileWatching module.

So essentially you close something that you can wait on to cancel it?

I don't see, though, how that would make something like a cancellation token framework entirely unnecessary, given that you can only really close a thing, not an operation?

Because you can also only wait on a thing, and not an operation. The expectation thus is that the only reason you want to be able to cancel something is that there's already a resource attached. Otherwise, why are you trying to waste compute cycles on a pure operation with no output?

Don't you need close(::Task) for wait_until to work in full generality? For example, wait_until(c::Channel, timeout) shouldn't close c, right? (Discussion above: https://github.com/JuliaLang/julia/issues/36217#issuecomment-643685832). I can imagine wait_until would work if I do

function wait_until(c::Channel, timeout::Real)
    w = @async wait(c)
    timer = Timer(timeout) do t
        isready(c) || close(w)
    end
    try
        return wait(w)
    finally
        close(timer)
    end
end

I'd be very happy if we have close(::Task) :slightly_smiling_face:

Tasks are not resources—they are owners—ergo they should not have close (cf. past discussion at #6283)

OK, I guess discussion for it can happen elsewhere.

Anyway, I'm still interested in your implementation strategy for wait_until(c::Channel, timeout) that does not close c.

I'd suggest that there's never a need to interrupt a wait to perform work in Julia, since the Task system already does that. Thus the only reason to interrupt work seems to be to terminate a resource, thereby causing these concepts to combine seamlessly!

I agree close-ing resource is a very powerful construct. That's why I'm suggesting to use close(::Timer) from the beginning of this thread.

Anyway, what about

your implementation strategy for wait_until(c::Channel, timeout) that does not close c

?

Intentionally not directly possible

So no other way out than waitfirst(::Condition, ::Condition)?

This is obviously a very low-level primitive operation. But something like this might be required so that people can explore building more modern concurrency systems on top of it.

I agree close on a resource is a good way to handle this when applicable, but I don't find it 100% satisfying --- what if some clients are willing to wait longer than others for the same event? I.e. a timeout seems to be more a property of a specific request, such that one request for the same resource might time out when another doesn't. You could get around that by wrapping the resource in a proxy object of some kind though, I suppose?

Yes, the idea also is that this also discourages race-y code designs like the one you describe, with multiple tasks operating on the same resources simultaneously.

What about when I don't want to close() the resource? E.g. what if I have a protocol over UDP that requires me to request a retransmit if a packet goes missing? I need a wait to express "wait for 10 seconds, and if nothing happens, perform some internal logic and then request another packet". I don't want to close the socket because then I might miss a future packet coming in, and I don't want to leave the recv() hanging on there forever, because that means I need to duplicate my receiver handling logic (for successful handling in one place, and unsuccessful handling in another).

That design sounds awfully full of race conditions, which is why I think that ought to be difficult in Julia. It's currently intended to force you to think about how to only have one (active) recv call site, while yours sounds like you'd need the same logic in triplicate (every recv call site always needs to handle all the cases from all recv call sites—there's no way to avoid that). It also misses the simple elegance of Julia's actual design. Instead, we're intentionally forcing you to model your send and receive state machines separately:

waiting = true # this is our finite-state-machine
on_recv(msg) = (waiting = false) # trivial state update
t = Timer(0, 10) do t
    waiting && on_send(socket)
end
while waiting 
    on_recv(recv(socket))
end
close(t) # recv successful (future `on_recv` should ignore any duplicates)
close(socket) # don't care about possible in-flight packets anymore, discard them

(initial version of this used @sync and @async for some parts of this, but that's not needed here since the FSM is so simple)

I disagree that forcing the user to split receive success and receive failure behavior across two separate tasks is elegant.

It's currently intended to force you to think about how to only have one (active) recv call site,

That is precisely what I'm arguing for; that a single recv() can signal both receive success and receive failure. To put my original question into code, it would look something like:

val = recv(sock; timeout=10.0)
if val === nothing
    retransmit(sock)
    # Either return nothing, raise error, reiterate recv call, or something else
end

(I'm also open to recv() throwing a TimeoutError or something, if we want to try/catch here instead)

(every recv call site always needs to handle all the cases from all recv call sites—there's no way to avoid that)

That's exactly what I want; for the receive handling logic to be in once place not split across different places. I believe you're saying that the fundamental pattern should be:

was_successful = false
@async begin
    sleep(timeout)
    if !was_successful
        handle_error()
    end
    close(sock)
end

recv(sock)
was_successful = true

This design makes debugging more difficult, errors do not propagate in a linear manner (because some will be hidden in tasks that we do not wait() upon) and requires that I close() my socket for any issue that induces a timeout.

I do not think it is reasonable to require that users must close() then re-connect() or re-bind() their sockets if something times out. In many applications, (such as low-latency audio transport) it would be typical for a recv() to timeout many times a second, as you want to react very quickly to a packet missing a deadline.

State machines are hard to write, use, and debug. Otherwise, we don't have things like for loop and coroutine in many languages. I don't think forcing users to write state machines make _user code_ simple.

That code isn't the pattern that I posted for your use case: using close(sock) after might be the end state after many failures, and works well for typical cases since it's a terminal case for the FSM (so you don't need an external channel to synchronize it). Those two code examples look like they'd end up having many interleaved state machines going on (this is UDP, so remember that events can happen in any order with any number of repetitions)—which would then need additional external mechanisms to synchronize. That's why the code I gave has one Task handling all on_recv message and one handling on_send messages together in a pair, until success, then moves onto the next action (in my example, just a boring close call, but could be a new pair of recv/send operations).

State machines are hard to write, use, and debug.

I'm going to need a reference for that. I tried google, but the titles of the results I got back for "why is a state machine bad" seemed to follow the general pattern "why I thought they were {bad | hard to write | maintain | complicated | use | debug | antiquated}, but came to realize they led to far better code"

Let's step away from the example of retransmitting on failure and think of the more general case; I want to recv() from a socket, or more generally, wait() upon a resource, get notified by a timeout and end the recv()/wait(), and not destroy the resource (via close()) in the event of a timeout. I assert that that basic functionality is fundamental to writing robust distributed processes. If we cannot express that in Julia, I argue that our event model is fundamentally broken. As weak evidence, I put forth the fact that every major distributed API integrates timeouts into its methods, including the C socket API, the python threading/multiprocessing API and basically any other API you want to look at.

That seems like poor evidence, given that we use a platform abstraction layer (libuv) to improve upon the timeout capabilities from those APIs. As we talked about offline, there are good design patterns from only using the few primitives we have already.

State machines are hard to write, use, and debug.

I'm going to need a reference for that.

These are two examples of "structured programming" approach vs state machine:

  • yield in many languages (e.g., Python) vs Julia's iterate
  • async/await vs callback hell

As a more specific example in concurrent programming, Deadlocks in non-hierarchical CSP - Roman Elizarov - Medium is a nice post explaining how tricky communicating finite state machines can be. There are a few links to relevant discussions on select for building state machines in https://github.com/python-trio/trio/issues/242#issuecomment-353760723.

State machines are hard to write, use, and debug.

I'm going to need a reference for that.

Arguably, the entire point of both closures and coroutines is to express what is ultimately just a state machine in a more convenient, less error prone way. After going to that much language design trouble to not force users to explicitly write out state machines, it seems like a bit of weak sauce to say that here it's suddenly just fine.

Thanks for the links. The conclusion in the trio link particularly seems be exactly the same as mine. That first post (Medium) includes a CFSM—it's not an attempt to avoid them, only to demonstrate problems with getting them wrong. And the second one (trio) shows ways to simplify most FSM into linear form (by breaking them up into their independent parts), but hypothesizes that they can't be simplified if any cleanup, timeout, or retry actions are present aside from close. Both links talk about how using timeouts or select to base your program is a recipe for headaches, while using coroutines (as we do) to build your FSM is less error prone as it forces you to consider which parts are independent and isolate them.

I also appreciated this conclusion to the trio posts:

Well, no. The correct behaviour is such case [of reaching a limit such as time] is shutting the client down [by calling close] and possibly logging the event.

I had also asked about this on Discourse, and I agree that there should be a way to timeout on a wait/read. I would love to be able to use julia to run my instruments, but this has proven impossible without this feature.

In my particular case, I want to work with a SCPI instrument over TCP. This involves sending text commands and reading back the response. If I make a typo in the command, I would want the read to time out so that I can try again.

I find the given solution of needing to close the socket (to abort the read) and recreate everything over again to be pretty poor.

As an example (this could be interactive or not, but I might not be able to send an interrupt):

tcp = connect(host,port)

println(tcp, ":MEASURE:VOLT?")
voltage = readline(tcp) # all good here

println(tcp, ":MEASURE:VILT?") # oh no!
voltage2 = readline(tcp) # blocks forever

Note that I tried the following:

function read_timeout(tcp::TCPSocket, timeout=10)
    # read asynchronously
    ch = Channel{String}(c -> put!(c,readline(tcp)), 0)
    # set timeout timer
    tm = Timer(t -> isready(ch) || close(ch), timeout)
    # wait until value is read or timeout
    out = nothing
    try
        out = take!(ch)
        close(ch)
    catch
        @info "Command timed out!"
    finally
        close(tm)
    end
    return out
end

But it stops working when a timeout does happen, I can't seem to read from the socket anymore, even though it is still open.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

tkoolen picture tkoolen  Â·  3Comments

ararslan picture ararslan  Â·  3Comments

i-apellaniz picture i-apellaniz  Â·  3Comments

dpsanders picture dpsanders  Â·  3Comments

Keno picture Keno  Â·  3Comments