Repository: qpid-proton
Updated Branches:
  refs/heads/master e769f784e -> 4a9f3b986


PROTON-1953: [go] occasional client/server hang with high volume of messages

Due to race/deadlock in read/write and engine main  goroutine. Simplified:
- start read/write goroutines as needed
- handle read/write completion via Inject(), no special channels


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/486fbaf0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/486fbaf0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/486fbaf0

Branch: refs/heads/master
Commit: 486fbaf034f2d89688bb678914d798a1a1595cc5
Parents: e769f78
Author: Alan Conway <acon...@redhat.com>
Authored: Wed Oct 10 16:09:33 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Thu Oct 11 11:16:37 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/proton/engine.go | 173 ++++++++++-----------------
 1 file changed, 64 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/486fbaf0/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/engine.go 
b/go/src/qpid.apache.org/proton/engine.go
index d28a09f..af26a5f 100644
--- a/go/src/qpid.apache.org/proton/engine.go
+++ b/go/src/qpid.apache.org/proton/engine.go
@@ -103,15 +103,16 @@ type Engine struct {
        err    ErrorHolder
        inject chan func()
 
-       conn       net.Conn
-       connection Connection
-       transport  Transport
-       collector  *C.pn_collector_t
-       handlers   []EventHandler // Handlers for proton events.
-       running    chan struct{}  // This channel will be closed when the 
goroutines are done.
-       closeOnce  sync.Once
-       timer      *time.Timer
-       traceEvent bool
+       conn             net.Conn
+       connection       Connection
+       transport        Transport
+       collector        *C.pn_collector_t
+       handlers         []EventHandler // Handlers for proton events.
+       running          chan struct{}  // This channel will be closed when the 
goroutines are done.
+       closeOnce        sync.Once
+       timer            *time.Timer
+       traceEvent       bool
+       reading, writing bool
 }
 
 const bufferSize = 4096
@@ -255,6 +256,12 @@ func (eng *Engine) Disconnect(err error) {
        <-eng.running
 }
 
+// For debugging purposes: like Transport.Log() but takes a format string
+// and works even if the transport has been freed.
+func (eng *Engine) log(format string, args ...interface{}) {
+       fmt.Fprintf(os.Stderr, "[%p]: %v", eng.transport, fmt.Sprintf(format, 
args...))
+}
+
 // Let proton run timed activity and set up the next tick
 func (eng *Engine) tick() {
        now := time.Now()
@@ -281,16 +288,50 @@ func (eng *Engine) dispatch() bool {
        return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != 
nil
 }
 
-func (eng *Engine) writeBuffer() []byte {
-       size := eng.Transport().Pending() // Evaluate before Head(), may change 
buffer.
-       start := eng.Transport().Head()
-       return cByteSlice(start, size)
+func (eng *Engine) write() {
+       if !eng.writing {
+               size := eng.Transport().Pending() // Evaluate before Head(), 
may change buffer.
+               start := eng.Transport().Head()
+               if size > 0 {
+                       eng.writing = true
+                       go func() { // Blocking Write() in separate goroutineb
+                               n, err := eng.conn.Write(cByteSlice(start, 
size))
+                               eng.Inject(func() { // Inject results of Write 
back to engine goroutine
+                                       eng.writing = false
+                                       if n > 0 {
+                                               eng.transport.Pop(uint(n))
+                                       }
+                                       if err != nil {
+                                               
eng.Transport().Condition().SetError(err)
+                                               eng.Transport().CloseHead()
+                                       }
+                               })
+                       }()
+               }
+       }
 }
 
-func (eng *Engine) readBuffer() []byte {
-       size := eng.Transport().Capacity()
-       start := eng.Transport().Tail()
-       return cByteSlice(start, size)
+func (eng *Engine) read() {
+       if !eng.reading {
+               size := eng.Transport().Capacity()
+               start := eng.Transport().Tail()
+               if size > 0 {
+                       eng.reading = true
+                       go func() { // Blocking Read in separate goroutine
+                               n, err := eng.conn.Read(cByteSlice(start, size))
+                               eng.Inject(func() {
+                                       eng.reading = false
+                                       if n > 0 {
+                                               eng.Transport().Process(uint(n))
+                                       }
+                                       if err != nil {
+                                               
eng.Transport().Condition().SetError(err)
+                                               eng.Transport().CloseTail()
+                                       }
+                               })
+                       }()
+               }
+       }
 }
 
 func (eng *Engine) free() {
@@ -317,106 +358,20 @@ func (eng *Engine) Run() error {
        defer eng.free()
        eng.transport.Bind(eng.connection)
        eng.tick() // Start ticking if needed
-
-       // Channels for read and write buffers going in and out of the 
read/write goroutines.
-       // The channels are unbuffered: we want to exchange buffers in sequence.
-       readsIn, writesIn := make(chan []byte), make(chan []byte)
-       readsOut, writesOut := make(chan []byte), make(chan []byte)
-
-       wait := sync.WaitGroup{}
-       wait.Add(2) // Read and write goroutines
-
-       go func() { // Read goroutine
-               defer wait.Done()
-               for {
-                       rbuf, ok := <-readsIn
-                       if !ok {
-                               return
-                       }
-                       n, err := eng.conn.Read(rbuf)
-                       if n > 0 {
-                               readsOut <- rbuf[:n]
-                       } else if err != nil {
-                               _ = eng.Inject(func() {
-                                       
eng.Transport().Condition().SetError(err)
-                                       eng.Transport().CloseTail()
-                               })
-                               return
-                       }
-               }
-       }()
-
-       go func() { // Write goroutine
-               defer wait.Done()
-               for {
-                       wbuf, ok := <-writesIn
-                       if !ok {
-                               return
-                       }
-                       n, err := eng.conn.Write(wbuf)
-                       if n > 0 {
-                               writesOut <- wbuf[:n]
-                       } else if err != nil {
-                               _ = eng.Inject(func() {
-                                       
eng.Transport().Condition().SetError(err)
-                                       eng.Transport().CloseHead()
-                               })
-                               return
-                       }
-               }
-       }()
-
        for eng.dispatch() {
-               readBuf := eng.readBuffer()
-               writeBuf := eng.writeBuffer()
-               // Note that getting the buffers can generate events (eg. SASL 
events) that
-               // might close the transport. Check if we are already finished 
before
-               // blocking for IO.
-               if !eng.dispatch() {
-                       break
-               }
-
-               // sendReads/sendWrites are nil (not sendable in select) unless 
we have a
-               // buffer to read/write
-               var sendReads, sendWrites chan []byte
-               if readBuf != nil {
-                       sendReads = readsIn
-               }
-               if writeBuf != nil {
-                       sendWrites = writesIn
-               }
-
-               // Send buffers to the read/write goroutines if we have them.
-               // Get buffers from the read/write goroutines and process them
-               // Check for injected functions
+               // Initiate read/write if needed
+               eng.read()
+               eng.write()
                select {
-
-               case sendReads <- readBuf:
-
-               case sendWrites <- writeBuf:
-
-               case buf := <-readsOut:
-                       eng.transport.Process(uint(len(buf)))
-
-               case buf := <-writesOut:
-                       eng.transport.Pop(uint(len(buf)))
-
-               case f, ok := <-eng.inject: // Function injected from another 
goroutine
-                       if ok {
-                               f()
-                       }
-
+               case f := <-eng.inject: // User or IO action
+                       f()
                case <-eng.timer.C:
                        eng.tick()
                }
        }
-
        eng.err.Set(EndpointError(eng.Connection()))
        eng.err.Set(eng.Transport().Condition().Error())
-       close(readsIn)
-       close(writesIn)
        close(eng.running)   // Signal goroutines have exited and Error is set, 
disable Inject()
-       _ = eng.conn.Close() // Close conn, force read/write goroutines to exit 
(they will Inject)
-       wait.Wait()          // Wait for goroutines
+       _ = eng.conn.Close() // Close conn, force read goroutine to exit 
(Inject will fail)
        return eng.err.Get()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to