commit 43cdc20e7e7f136c96814bf752ef1fbc9b6fec33
Author: Yawning Angel <[email protected]>
Date:   Fri Oct 30 09:45:26 2015 +0000

    meek-lite: combine small writes at request dispatch time.
    
    This dramatically improves bulk upload performance, from totally shit
    to just shit.
---
 transports/meeklite/meek.go |   70 ++++++++++++++++++++++++++-----------------
 1 file changed, 42 insertions(+), 28 deletions(-)

diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go
index 5842704..8957ceb 100644
--- a/transports/meeklite/meek.go
+++ b/transports/meeklite/meek.go
@@ -161,33 +161,22 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
                return 0, io.ErrClosedPipe
        }
 
-       if len(b) > 0 {
-               // Copy the data to be written to a new slice, since
-               // we return immediately after queuing and the peer can
-               // happily reuse `b` before data has been sent.
-               toWrite := len(b)
-               b2 := make([]byte, toWrite)
-               copy(b2, b)
-               offset := 0
-               for toWrite > 0 {
-                       // Chunk up the writes to keep them under the maximum
-                       // payload length.
-                       sz := toWrite
-                       if sz > maxPayloadLength {
-                               sz = maxPayloadLength
-                       }
+       if len(b) == 0 {
+               return 0, nil
+       }
 
-                       // Enqueue a properly sized subslice of our copy.
-                       if ok := c.enqueueWrite(b2[offset : offset+sz]); !ok {
-                               // Technically we did enqueue data, but the 
worker's
-                               // got closed out from under us.
-                               return 0, io.ErrClosedPipe
-                       }
-                       toWrite -= sz
-                       offset += sz
-                       runtime.Gosched()
-               }
+       // Copy the data to be written to a new slice, since
+       // we return immediately after queuing and the peer can
+       // happily reuse `b` before data has been sent.
+       toWrite := len(b)
+       b2 := make([]byte, toWrite)
+       copy(b2, b)
+       if ok := c.enqueueWrite(b2); !ok {
+               // Technically we did enqueue data, but the worker's
+               // got closed out from under us.
+               return 0, io.ErrClosedPipe
        }
+       runtime.Gosched()
        return len(b), nil
 }
 
@@ -269,9 +258,11 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf 
[]byte, err error) {
 
 func (c *meekConn) ioWorker() {
        interval := initPollInterval
+       var sndBuf, leftBuf []byte
 loop:
+
        for {
-               var sndBuf []byte
+               sndBuf = nil
                select {
                case <-time.After(interval):
                        // If the poll interval has elapsed, issue a request.
@@ -281,19 +272,42 @@ loop:
                        break loop
                }
 
+               // Combine short writes as long as data is available to be
+               // sent immediately and it will not put us over the max
+               // payload limit.  Any excess data is stored and dispatched
+               // as the next request).
+               sndBuf = append(leftBuf, sndBuf...)
+               wrSz := len(sndBuf)
+               for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength {
+                       b := <-c.workerWrChan
+                       sndBuf = append(sndBuf, b...)
+                       wrSz = len(sndBuf)
+               }
+               if wrSz > maxPayloadLength {
+                       wrSz = maxPayloadLength
+               }
+
                // Issue a request.
-               rdBuf, err := c.roundTrip(sndBuf)
+               rdBuf, err := c.roundTrip(sndBuf[:wrSz])
                if err != nil {
                        // Welp, something went horrifically wrong.
                        break loop
                }
+
+               // Stash the remaining payload if any.
+               leftBuf = sndBuf[wrSz:] // Store the remaining data
+               if len(leftBuf) == 0 {
+                       leftBuf = nil
+               }
+
+               // Determine the next poll interval.
                if len(rdBuf) > 0 {
                        // Received data, enqueue the read.
                        c.workerRdChan <- rdBuf
 
                        // And poll immediately.
                        interval = 0
-               } else if sndBuf != nil {
+               } else if wrSz > 0 {
                        // Sent data, poll immediately.
                        interval = 0
                } else if interval == 0 {



_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to