Hi,
While working on the torrent
shard, I came across the need to tell fibers to stop running forever regularly. Often fixed through other means, like a running
bool, I came across the following use-case where this is not easily doable:
The control fiber is basically doing: fiber_count.times{ candidates.receive }
The issue is, if the result is found, or the list is empty, the control fiber will now wait in the loop above forever, which would basically leak it. Another approach would be having yet another Channel which is monitored by the control fiber (Through select
), and if received, halts the control fiber.
However, I'd love to have a primitive which lets me send some kind of message to a Fiber, any Fiber, to please stop itself.
A solution employed by languages like Java is to inject an exception (java.lang.Thread.interrupt()
). This is quite nice, as properly implemented Fibers would then release resources through ensure
/rescue
. It would also kick a sleeping/waiting fiber out of a blocking call like Channel#receive
or IO::FileDescriptor#read
.
A second solution could be a Fiber#close
method, which sets a flag which is checked in Fiber#resume
, and then runs all ensure
blocks.
Ideas?
My first impression would be to assume that the fiber in question is not currently running when we issue the stop request (as is always the case for singlethreaded) and simply attempt to stop it from resuming. Then the problem comes down to 2 things:
Fiber#resume
checks before resuming. This should probably raise, and we should expect callers to check if the fiber is closed.Fiber#close
, as we know the fiber will never be resumed.Golang has a lot of good resources on concurrency patterns that you might be interested in reading. I believe the semantics of gouroutines and channels are similar enough to crystal to still apply.
Check out the pipelining blog post which I think would work well for you application. It also talks about cancelation.
You can send a sentinel value for each fiber on a channel and break out from there.
The done
method in the example signals to the fiber to exit next time it reads from the channel.
enum Control
Done
end
class Worker(T)
def initialize
@channel = Channel(T|Control).new(10)
@done = Channel(Control).new
end
def add(item : T)
@channel.send(item)
end
def run
spawn {
loop do
item = @channel.receive
case item
when T
puts "I got #{item}"
when Control
@done.send(item)
break
end
end
}
end
def done
@channel.send(Control::Done)
end
def join
puts "waiting"
@done.receive
end
end
w = Worker(Int32).new
w.run
w.add 2
w.add 1
w.add 3
w.done
w.join
puts "Done"
Most helpful comment
You can send a sentinel value for each fiber on a channel and break out from there.
The
done
method in the example signals to the fiber to exit next time it reads from the channel.