When I start small, passing pointer of channel seems working fine. But when I 
grouping logics into proc, I got segmentation fault, illegal storage access or 
channel only work on one direction, not the other. I cannot reason about the 
behavior, even after consulting the manual... so I search for help here.

In short, I want to ask

  1. Is it a correct If I pass the WorkChannel object (like below) to thread,
  2. Is it safe to declare channels in local varible in proc, and take the addr 
and pass to a thread (like newWorkerChannel)?
  3. Is threads, channels library stable?



nim version is 0.20.2 on ubunutu
    
    
    import os, asyncdispatch
    
    type
      WorkerChannel*[R,T] = object
        rx: ptr Channel[R]
        tx: ptr Channel[T]
      Worker*[R,T] = proc(args: WorkerChannel[R,T]) {.thread,nimcall.}
    
    proc newWorkerChannel[R,T]( maxTxItems: int = 0, maxRxItems: int = 0): 
WorkerChannel[R,T] =
      var tx: Channel[T]
      var rx: Channel[R]
      tx.open(maxTxItems)
      rx.open(maxRxItems)
      result.tx = tx.addr
      result.rx = rx.addr
    
    proc twist*[R,T](ch: WorkerChannel[R,T]) : WorkerChannel[T,R] =
      WorkerChannel[T,R](tx: ch.rx, rx: ch.tx)
    
    proc send*[R,T](ch: WorkerChannel[R,T], x: T, retyInterval: int = 100): 
Future[void] {.async.} =
      while not ch.tx[].trySend x:
        await sleepAsync(retyInterval)
    
    proc sendSync*[R,T](ch: WorkerChannel[R,T], x: T) = ch.tx[].send x
    
    proc recv*[R,T](ch: WorkerChannel[R,T], maxDelay: int = 100): Future[R] 
{.async.} =
      while true:
        let (avail, msg)= ch.rx[].tryRecv
        if avail: return msg
        await sleepAsync(maxDelay)
    
    proc recvSync*[R,T](ch: WorkerChannel[R,T]): R = ch.rx[].recv
    
    proc newWorker*[R,T](taskFunc: Worker[R,T], maxTxItems: int = 0, 
maxRxItems: int = 0): WorkerChannel[T,R] =
      ## Worker recieve txType and send rxType
      var th : Thread[WorkerChannel[R, T]]
      var ch = newWorkerChannel[R, T](maxTxItems=maxTxItems, 
maxRxItems=maxRxItems)
      createThread(th, taskFunc, ch)
      ch.twist()
    
    # ---------------------------------------------------------
    # sync
    
    proc taskSync(ch: WorkerChannel[int,string]) {.thread.} =
      # echo ch.recvSync
      for i in 1 .. 10:
        ch.sendSync $i
    
    proc worker() =
      # var ch = newWorker(taskSync)
      var th : Thread[WorkerChannel[int, string]]
      var chan = newWorkerChannel[int, string]()
      createThread(th, taskSync, chan)
      var ch = chan.twist()
      
      ## this line lead to Illegal storage access
      # ch.sendSync 10
      while true:
        echo ch.recvSync
    
    worker()
    
    # ---------------------------------------------------------
    # async
    
    proc taskAsync(ch: WorkerChannel[int,string]) {.async.} =
      var i = 0
      while true:
        echo "sending ", i, " on thread=", getThreadId()
        await ch.send($i)
        inc i
        await sleepAsync(500)
    
    proc task(ch: WorkerChannel[int,string]) {.thread.} =
      waitFor taskAsync(ch)
    
    proc main() {.async.} =
      # var chan = newAsyncWorker[int, string](task)
      var chan = newWorker(task)
      while true:
        var i = await chan.recv
        echo "recevied ", i
        await sleepAsync(100)
    
    ## If I try to run task on main thread, this works.
    # waitFor taskAsync(newWorkerChannel[int, string]()) # work
    # task(newWorkerChannel[int, string]()) # work
    
    ## If I run on threads, some time I got Illegal storage access, some time I 
got exit after two recv
    # asyncCheck main()
    # try: runForever()
    # except: discard
    
    
    
    
    Run

Reply via email to