I've spent a few days on this and boiled this down to a minimal example. Can an 
expert with Nim parallelism please tell me where I'm going wrong here?

Output changes from run to run (is stochastic) but I did not intend that and do 
not see how this can be. Here are several runs in a row of a very simple thread 
pool algorithm that adds 1 to each element of a seq every time a job is 
submitted to the pool.
    
    
    1040.0 == 1000.0 (false)
    1020.0 == 1000.0 (false)
    1039.0 == 1000.0 (false)
    1000.0 == 1000.0 (true)  <-- the intended result
    1069.0 == 1000.0 (false)
    1040.0 == 1000.0 (false)
    995.0 == 1000.0 (false)
    
    
    Run

Minimal Example
    
    
    import locks
    from math import sum
    from strformat import `&`
    from std/decls import byaddr
    
    const NTHREADS = 8 # pool size
    const NSUBMISSIONS = 100 # number of jobs to submit to pool
    const NVALUES = 10 # length of reduction data seq
    
    type
      ThreadComm = enum Begin Ready Quit
      ThreadParams = tuple
        threadDataPtr: ptr ThreadData
        resultsPtr: ptr Reduction
      ThreadData = object
        input, output: Channel[ThreadComm]
        handle: Thread[ThreadParams]
      Reduction = object
        values: seq[float]
        lock: Lock
    
    proc singleThread(args: ThreadParams) =
      let threadData {.byaddr.}  = args.threadDataPtr[]
      let results {.byaddr.}  = args.resultsPtr[]
      threadData.output.send(ThreadComm.Ready)
      while true:
        let command = threadData.input.recv() # 'Begin' or 'Quit' received
        if command == ThreadComm.Quit: break
        acquire results.lock
        for i in 0..<results.values.len:
          results.values[i] += 1.0
        release results.lock
        threadData.output.send(ThreadComm.Ready)
    
    proc main =
      var pool = newSeq[ThreadData](len=NTHREADS)
      var reduction = Reduction(values: newSeq[float](len=NVALUES)) # target 
for reduce operation
      for thread in pool.mitems:
        open thread.input
        open thread.output
        thread.handle.createThread( singleThread, param=(addr thread, addr 
reduction) )
      # work submission loop
      var workID: Natural
      while workID < NSUBMISSIONS:
        for thread in pool.mitems:
          let channel = thread.output.tryRecv() # no data if thread not done yet
          if channel.dataAvailable: # assume state == ThreadComm.Ready if 
anything received
            inc workID
            thread.input.send(ThreadComm.Begin)
      # wrap up
      for thread in pool.mitems:
        thread.input.send(ThreadComm.Quit)
      for thread in pool.mitems:
        joinThread thread.handle
        close thread.input
        close thread.output
      # report
      const expected = 1000.0
      let total = sum(reduction.values)
      let TorF = total == expected
      echo &"{total} == {expected} ({TorF})"
    
    when isMainModule:
      main()
    
    
    Run

The goal is to use this minimal thread pool implementation, not a library. Yes, 
I'm aware of the libraries and have used them all. I'd love to understand why 
this doesn't work as I intend because it's rather basic. Please help me move 
along to the facepalm moment :)

Reply via email to