The following is a standalone implementation of Weave channel that can buffer up to one object.
It works with the default GC but crashes with gc:arc
import std/atomics
const MemBlockSize = 256
type
ChannelSPSCSingle* = object
full{.align: 128.}: Atomic[bool]
itemSize*: uint8
buffer*{.align: 8.}: UncheckedArray[byte]
proc `=`(
dest: var ChannelSPSCSingle,
source: ChannelSPSCSingle
) {.error: "A channel cannot be copied".}
proc initialize*(chan: var ChannelSPSCSingle, itemsize: SomeInteger) {.inline.} =
## If ChannelSPSCSingle is used intrusive another data structure
## be aware that it should be the last part due to ending by UncheckedArray
## Also due to 128 bytes padding, it automatically takes half
## of the default MemBlockSize
assert itemsize.int in 0 .. int high(uint8)
assert itemSize.int +
sizeof(chan.itemsize) +
sizeof(chan.full) < MemBlockSize
chan.itemSize = uint8 itemsize
chan.full.store(false, moRelaxed)
func isEmpty*(chan: var ChannelSPSCSingle): bool {.inline.} =
not chan.full.load(moAcquire)
func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} =
## Try receiving the item buffered in the channel
## Returns true if successful (channel was not empty)
##
## ⚠ Use only in the consumer thread that reads from the channel.
assert (sizeof(T) == chan.itemsize.int) or
# Support dummy object
(sizeof(T) == 0 and chan.itemsize == 1)
let full = chan.full.load(moAcquire)
if not full:
return false
dst = cast[ptr T](chan.buffer.addr)[]
chan.full.store(false, moRelease)
return true
func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
## Try sending an item into the channel
## Reurns true if successful (channel was empty)
##
## ⚠ Use only in the producer thread that writes from the channel.
assert (sizeof(T) == chan.itemsize.int) or
# Support dummy object
(sizeof(T) == 0 and chan.itemsize == 1)
let full = chan.full.load(moAcquire)
if full:
return false
cast[ptr T](chan.buffer.addr)[] = src
chan.full.store(true, moRelease)
return true
# Sanity checks
# ------------------------------------------------------------------------------
when isMainModule:
when not compileOption("threads"):
{.error: "This requires --threads:on compilation flag".}
template sendLoop[T](chan: var ChannelSPSCSingle,
data: sink T,
body: untyped): untyped =
while not chan.trySend(data):
body
template recvLoop[T](chan: var ChannelSPSCSingle,
data: var T,
body: untyped): untyped =
while not chan.tryRecv(data):
body
type
ThreadArgs = object
ID: WorkerKind
chan: ptr ChannelSPSCSingle
WorkerKind = enum
Sender
Receiver
template Worker(id: WorkerKind, body: untyped): untyped {.dirty.} =
if args.ID == id:
body
proc thread_func(args: ThreadArgs) =
# Worker RECEIVER:
# ---------
# <- chan
# <- chan
# <- chan
#
# Worker SENDER:
# ---------
# chan <- 42
# chan <- 53
# chan <- 64
Worker(Receiver):
var val: int
for j in 0 ..< 10:
args.chan[].recvLoop(val):
# Busy loop, in prod we might want to yield the core/thread timeslice
discard
echo " Receiver got: ", val
doAssert val == 42 + j*11
Worker(Sender):
doAssert args.chan.full.load(moRelaxed) == false
for j in 0 ..< 10:
let val = 42 + j*11
args.chan[].sendLoop(val):
# Busy loop, in prod we might want to yield the core/thread timeslice
discard
echo "Sender sent: ", val
proc main() =
echo "Testing if 2 threads can send data"
echo "-----------------------------------"
var threads: array[2, Thread[ThreadArgs]]
var chan = cast[ptr ChannelSPSCSingle](allocShared(MemBlockSize))
chan[].initialize(itemSize = sizeof(int))
createThread(threads[0], thread_func, ThreadArgs(ID: Receiver, chan: chan))
createThread(threads[1], thread_func, ThreadArgs(ID: Sender, chan: chan))
joinThread(threads[0])
joinThread(threads[1])
freeShared(chan)
echo "-----------------------------------"
echo "Success"
main()
On Nim from 2020-04-20, the issue changed, the previous crash was a segfault on createThread
likely similar to this https://github.com/nim-lang/Nim/issues/13881
Now it is similar to https://github.com/nim-lang/Nim/issues/13935#issuecomment-614249252 but:
nim cpp --gc:arc ...nim --gc:arc --exceptions:setjmp ...
Still hangs on devel (and SIGSEGVs with -d:danger)
Works for me on Windows, here is the output:
Sender sent: 42
Receiver got: 42
Receiver got: 53
Sender sent: 53
Receiver got: 64
Sender sent: 64
Sender sent: 75
Sender sent: 86
Receiver got: 75
Receiver got: 86
Receiver got: 97
Sender sent: 97
Sender sent: 108
Sender sent: 119
Receiver got: 108
Receiver got: 119
Receiver got: 130
Sender sent: 130
Sender sent: 141
Receiver got: 141
Works for me on Linux with https://github.com/nim-lang/Nim/pull/14722 !
Testing if 2 threads can send data
-----------------------------------
Receiver got: 42
Sender sent: 42
Receiver got: 53
Sender sent: 53
Sender sent: 64
Sender sent: 75
Receiver got: 64
Receiver got: 75
Receiver got: 86
Sender sent: 86
Sender sent: 97
Sender sent: 108
Receiver got: 97
Receiver got: 108
Receiver got: 119
Sender sent: 119
Receiver got: 130
Sender sent: 130
Receiver got: 141
Sender sent: 141
-----------------------------------
Success
@Araq You don't have "Success" in your output, on the current devel my code doesn't segfaults but it gets stuck at the same point.
@Yardanico good news!
@mratsim can you retest after https://github.com/nim-lang/Nim/commit/3ba0c30758e1044aba11bb908a7d83de7ea215bc ?
Just for curiosity, valgrind (with -d:useMalloc of course):
~/P/stuff ❯❯❯ valgrind --leak-check=full ./a
==20443== Memcheck, a memory error detector
==20443== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==20443== Using Valgrind-3.16.0.GIT and LibVEX; rerun with -h for copyright info
==20443== Command: ./a
==20443==
Testing if 2 threads can send data
-----------------------------------
Sender sent: 42
Receiver got: 42
Receiver got: 53
Sender sent: 53
Sender sent: 64
Sender sent: 75
Receiver got: 64
Receiver got: 75
Receiver got: 86
Sender sent: 86
Sender sent: 97
Receiver got: 97
Receiver got: 108
Sender sent: 108
Sender sent: 119
Sender sent: 130
Receiver got: 119
Receiver got: 130
Sender sent: 141
Receiver got: 141
-----------------------------------
Success
==20443==
==20443== HEAP SUMMARY:
==20443== in use at exit: 0 bytes in 0 blocks
==20443== total heap usage: 26 allocs, 26 frees, 2,678 bytes allocated
==20443==
==20443== All heap blocks were freed -- no leaks are possible
==20443==
==20443== For lists of detected and suppressed errors, rerun with: -s
==20443== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Yes, it's working now. I think this should be added as a test, we can test that "Success" is printed last (with a timeout as multithreading bugs lead to unresponsive apps from time to time)