Nim: Simple channels with --gc:arc

Created on 9 Apr 2020  ·  8Comments  ·  Source: nim-lang/Nim

The following is a standalone implementation of Weave channel that can buffer up to one object.
It works with the default GC but crashes with gc:arc

import std/atomics

const MemBlockSize = 256

type
  ChannelSPSCSingle* = object
    full{.align: 128.}: Atomic[bool]
    itemSize*: uint8
    buffer*{.align: 8.}: UncheckedArray[byte]

proc `=`(
    dest: var ChannelSPSCSingle,
    source: ChannelSPSCSingle
  ) {.error: "A channel cannot be copied".}

proc initialize*(chan: var ChannelSPSCSingle, itemsize: SomeInteger) {.inline.} =
  ## If ChannelSPSCSingle is used intrusive another data structure
  ## be aware that it should be the last part due to ending by UncheckedArray
  ## Also due to 128 bytes padding, it automatically takes half
  ## of the default MemBlockSize
  assert itemsize.int in 0 .. int high(uint8)
  assert itemSize.int +
          sizeof(chan.itemsize) +
          sizeof(chan.full) < MemBlockSize

  chan.itemSize = uint8 itemsize
  chan.full.store(false, moRelaxed)

func isEmpty*(chan: var ChannelSPSCSingle): bool {.inline.} =
  not chan.full.load(moAcquire)

func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} =
  ## Try receiving the item buffered in the channel
  ## Returns true if successful (channel was not empty)
  ##
  ## ⚠ Use only in the consumer thread that reads from the channel.
  assert (sizeof(T) == chan.itemsize.int) or
          # Support dummy object
          (sizeof(T) == 0 and chan.itemsize == 1)

  let full = chan.full.load(moAcquire)
  if not full:
    return false
  dst = cast[ptr T](chan.buffer.addr)[]
  chan.full.store(false, moRelease)
  return true

func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
  ## Try sending an item into the channel
  ## Reurns true if successful (channel was empty)
  ##
  ## ⚠ Use only in the producer thread that writes from the channel.
  assert (sizeof(T) == chan.itemsize.int) or
          # Support dummy object
          (sizeof(T) == 0 and chan.itemsize == 1)

  let full = chan.full.load(moAcquire)
  if full:
    return false
  cast[ptr T](chan.buffer.addr)[] = src
  chan.full.store(true, moRelease)
  return true

# Sanity checks
# ------------------------------------------------------------------------------
when isMainModule:

  when not compileOption("threads"):
    {.error: "This requires --threads:on compilation flag".}

  template sendLoop[T](chan: var ChannelSPSCSingle,
                       data: sink T,
                       body: untyped): untyped =
    while not chan.trySend(data):
      body

  template recvLoop[T](chan: var ChannelSPSCSingle,
                       data: var T,
                       body: untyped): untyped =
    while not chan.tryRecv(data):
      body

  type
    ThreadArgs = object
      ID: WorkerKind
      chan: ptr ChannelSPSCSingle

    WorkerKind = enum
      Sender
      Receiver

  template Worker(id: WorkerKind, body: untyped): untyped {.dirty.} =
    if args.ID == id:
      body

  proc thread_func(args: ThreadArgs) =

    # Worker RECEIVER:
    # ---------
    # <- chan
    # <- chan
    # <- chan
    #
    # Worker SENDER:
    # ---------
    # chan <- 42
    # chan <- 53
    # chan <- 64
    Worker(Receiver):
      var val: int
      for j in 0 ..< 10:
        args.chan[].recvLoop(val):
          # Busy loop, in prod we might want to yield the core/thread timeslice
          discard
        echo "                  Receiver got: ", val
        doAssert val == 42 + j*11

    Worker(Sender):
      doAssert args.chan.full.load(moRelaxed) == false
      for j in 0 ..< 10:
        let val = 42 + j*11
        args.chan[].sendLoop(val):
          # Busy loop, in prod we might want to yield the core/thread timeslice
          discard
        echo "Sender sent: ", val

  proc main() =
    echo "Testing if 2 threads can send data"
    echo "-----------------------------------"
    var threads: array[2, Thread[ThreadArgs]]

    var chan = cast[ptr ChannelSPSCSingle](allocShared(MemBlockSize))
    chan[].initialize(itemSize = sizeof(int))

    createThread(threads[0], thread_func, ThreadArgs(ID: Receiver, chan: chan))
    createThread(threads[1], thread_func, ThreadArgs(ID: Sender, chan: chan))

    joinThread(threads[0])
    joinThread(threads[1])

    freeShared(chan)

    echo "-----------------------------------"
    echo "Success"

  main()
ARORC GC Threads

All 8 comments

On Nim from 2020-04-20, the issue changed, the previous crash was a segfault on createThread
likely similar to this https://github.com/nim-lang/Nim/issues/13881

Now it is similar to https://github.com/nim-lang/Nim/issues/13935#issuecomment-614249252 but:

  • neither nim cpp --gc:arc ...
  • nor nim --gc:arc --exceptions:setjmp ...
    work

image

Still hangs on devel (and SIGSEGVs with -d:danger)

Works for me on Windows, here is the output:

Sender sent: 42
                  Receiver got: 42
                  Receiver got: 53
Sender sent: 53
                  Receiver got: 64
Sender sent: 64
Sender sent: 75
Sender sent: 86
                  Receiver got: 75
                  Receiver got: 86
                  Receiver got: 97
Sender sent: 97
Sender sent: 108
Sender sent: 119
                  Receiver got: 108
                  Receiver got: 119
                  Receiver got: 130
Sender sent: 130
Sender sent: 141
                  Receiver got: 141

Works for me on Linux with https://github.com/nim-lang/Nim/pull/14722 !

Testing if 2 threads can send data
-----------------------------------
                  Receiver got: 42
Sender sent: 42
                  Receiver got: 53
Sender sent: 53
Sender sent: 64
Sender sent: 75
                  Receiver got: 64
                  Receiver got: 75
                  Receiver got: 86
Sender sent: 86
Sender sent: 97
Sender sent: 108
                  Receiver got: 97
                  Receiver got: 108
                  Receiver got: 119
Sender sent: 119
                  Receiver got: 130
Sender sent: 130
                  Receiver got: 141
Sender sent: 141
-----------------------------------
Success

@Araq You don't have "Success" in your output, on the current devel my code doesn't segfaults but it gets stuck at the same point.
@Yardanico good news!

Just for curiosity, valgrind (with -d:useMalloc of course):

~/P/stuff ❯❯❯ valgrind --leak-check=full ./a
==20443== Memcheck, a memory error detector
==20443== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==20443== Using Valgrind-3.16.0.GIT and LibVEX; rerun with -h for copyright info
==20443== Command: ./a
==20443== 
Testing if 2 threads can send data
-----------------------------------
Sender sent: 42
                  Receiver got: 42
                  Receiver got: 53
Sender sent: 53
Sender sent: 64
Sender sent: 75
                  Receiver got: 64
                  Receiver got: 75
                  Receiver got: 86
Sender sent: 86
Sender sent: 97
                  Receiver got: 97
                  Receiver got: 108
Sender sent: 108
Sender sent: 119
Sender sent: 130
                  Receiver got: 119
                  Receiver got: 130
Sender sent: 141
                  Receiver got: 141
-----------------------------------
Success
==20443== 
==20443== HEAP SUMMARY:
==20443==     in use at exit: 0 bytes in 0 blocks
==20443==   total heap usage: 26 allocs, 26 frees, 2,678 bytes allocated
==20443== 
==20443== All heap blocks were freed -- no leaks are possible
==20443== 
==20443== For lists of detected and suppressed errors, rerun with: -s
==20443== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Yes, it's working now. I think this should be added as a test, we can test that "Success" is printed last (with a timeout as multithreading bugs lead to unresponsive apps from time to time)

Was this page helpful?
0 / 5 - 0 ratings