commit 816cff15f425d0cb87a1b996366989aa01833f99
Author: Yawning Angel <[email protected]>
Date:   Sun Jan 20 16:14:28 2019 +0000

    transports/meeklite: Cleanups, bugfixes and improvements
    
     * Properly close the response body on HTTP error.
     * Cleanup close signaling.
     * Write() should return faster on closed connections.
---
 transports/meeklite/meek.go | 70 +++++++++++++++++++++------------------------
 1 file changed, 32 insertions(+), 38 deletions(-)

diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go
index a99556b..39a6d4b 100644
--- a/transports/meeklite/meek.go
+++ b/transports/meeklite/meek.go
@@ -39,6 +39,7 @@ import (
        "net"
        "net/http"
        gourl "net/url"
+       "os"
        "runtime"
        "sync"
        "time"
@@ -107,16 +108,14 @@ func newClientArgs(args *pt.Args) (ca *meekClientArgs, 
err error) {
 }
 
 type meekConn struct {
-       sync.Mutex
-
        args      *meekClientArgs
        sessionID string
        transport *http.Transport
 
-       workerRunning   bool
+       closeOnce       sync.Once
        workerWrChan    chan []byte
        workerRdChan    chan []byte
-       workerCloseChan chan bool
+       workerCloseChan chan struct{}
        rdBuf           *bytes.Buffer
 }
 
@@ -154,11 +153,10 @@ func (c *meekConn) Read(p []byte) (n int, err error) {
 
 func (c *meekConn) Write(b []byte) (n int, err error) {
        // Check to see if the connection is actually open.
-       c.Lock()
-       closed := !c.workerRunning
-       c.Unlock()
-       if closed {
+       select {
+       case <-c.workerCloseChan:
                return 0, io.ErrClosedPipe
+       default:
        }
 
        if len(b) == 0 {
@@ -168,9 +166,7 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
        // Copy the data to be written to a new slice, since
        // we return immediately after queuing and the peer can
        // happily reuse `b` before data has been sent.
-       toWrite := len(b)
-       b2 := make([]byte, toWrite)
-       copy(b2, b)
+       b2 := append([]byte{}, b...)
        if ok := c.enqueueWrite(b2); !ok {
                // Technically we did enqueue data, but the worker's
                // got closed out from under us.
@@ -181,18 +177,15 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
 }
 
 func (c *meekConn) Close() error {
-       // Ensure that we do this once and only once.
-       c.Lock()
-       defer c.Unlock()
-       if !c.workerRunning {
-               return nil
-       }
+       err := os.ErrClosed
 
-       // Tear down the worker.
-       c.workerRunning = false
-       c.workerCloseChan <- true
+       c.closeOnce.Do(func() {
+               // Tear down the worker, if it is still running.
+               close(c.workerCloseChan)
+               err = nil
+       })
 
-       return nil
+       return err
 }
 
 func (c *meekConn) LocalAddr() net.Addr {
@@ -216,7 +209,11 @@ func (c *meekConn) SetWriteDeadline(t time.Time) error {
 }
 
 func (c *meekConn) enqueueWrite(b []byte) (ok bool) {
-       defer func() { _ = recover() }()
+       defer func() {
+               if err := recover(); err != nil {
+                       ok = false
+               }
+       }()
        c.workerWrChan <- b
        return true
 }
@@ -249,14 +246,16 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf 
[]byte, err error) {
                if err != nil {
                        return nil, err
                }
-               if resp.StatusCode != http.StatusOK {
-                       err = fmt.Errorf("status code was %d, not %d", 
resp.StatusCode, http.StatusOK)
-                       time.Sleep(retryDelay)
-               } else {
-                       defer resp.Body.Close()
+
+               if resp.StatusCode == http.StatusOK {
                        recvBuf, err = ioutil.ReadAll(io.LimitReader(resp.Body, 
maxPayloadLength))
+                       resp.Body.Close()
                        return
                }
+
+               resp.Body.Close()
+               err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, 
http.StatusOK)
+               time.Sleep(retryDelay)
        }
        return
 }
@@ -264,8 +263,8 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf 
[]byte, err error) {
 func (c *meekConn) ioWorker() {
        interval := initPollInterval
        var sndBuf, leftBuf []byte
-loop:
 
+loop:
        for {
                sndBuf = nil
                select {
@@ -316,7 +315,7 @@ loop:
                        // Sent data, poll immediately.
                        interval = 0
                } else if interval == 0 {
-                       // Neither sent nor received data, initialize the delay.
+                       // Neither sent nor received data after a poll, 
re-initialize the delay.
                        interval = initPollInterval
                } else {
                        // Apply a multiplicative backoff.
@@ -334,11 +333,8 @@ loop:
        close(c.workerRdChan)
        close(c.workerWrChan)
 
-       // In case the close was done on an error condition, update the state
-       // variable so that further calls to Write() will fail.
-       c.Lock()
-       defer c.Unlock()
-       c.workerRunning = false
+       // Close the connection (extra calls to Close() are harmless).
+       _ = c.Close()
 }
 
 func newMeekConn(network, addr string, dialFn base.DialFunc, ca 
*meekClientArgs) (net.Conn, error) {
@@ -347,15 +343,13 @@ func newMeekConn(network, addr string, dialFn 
base.DialFunc, ca *meekClientArgs)
                return nil, err
        }
 
-       tr := &http.Transport{Dial: dialFn}
        conn := &meekConn{
                args:            ca,
                sessionID:       id,
-               transport:       tr,
-               workerRunning:   true,
+               transport:       &http.Transport{Dial: dialFn},
                workerWrChan:    make(chan []byte, maxChanBacklog),
                workerRdChan:    make(chan []byte, maxChanBacklog),
-               workerCloseChan: make(chan bool),
+               workerCloseChan: make(chan struct{}),
        }
 
        // Start the I/O worker.



_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to