commit a71c98c0aef198e7cf7308dee414e3e17a14059b
Author: Serene Han <[email protected]>
Date:   Mon Jun 13 15:11:55 2016 -0700

    able to break out of ConnectLoop, try separate webrtcConfigs as well
---
 client/client_test.go | 11 ++++++-----
 client/interfaces.go  |  3 +++
 client/peers.go       | 23 +++++++++++++++++++----
 client/rendezvous.go  | 11 +++++++----
 client/snowflake.go   | 16 ++++++++++------
 client/webrtc.go      | 49 +++++++++++++++++++++++--------------------------
 6 files changed, 68 insertions(+), 45 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index 87fb2cb..41d4870 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -65,8 +65,9 @@ func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { 
return nil }
 
 type FakePeers struct{ toRelease *webRTCConn }
 
-func (f FakePeers) Collect() error { return nil }
-func (f FakePeers) Pop() Snowflake { return nil }
+func (f FakePeers) Collect() error          { return nil }
+func (f FakePeers) Pop() Snowflake          { return nil }
+func (f FakePeers) Melted() <-chan struct{} { return nil }
 
 func TestSnowflakeClient(t *testing.T) {
 
@@ -144,6 +145,7 @@ func TestSnowflakeClient(t *testing.T) {
                        }
                        So(p.Count(), ShouldEqual, cnt)
                        p.End()
+                       <-p.Melted()
                        So(p.Count(), ShouldEqual, 0)
                })
 
@@ -168,7 +170,6 @@ func TestSnowflakeClient(t *testing.T) {
                        Convey("Can construct a WebRTCConn", func() {
                                s := NewWebRTCConnection(nil, nil)
                                So(s, ShouldNotBeNil)
-                               So(s.index, ShouldEqual, 0)
                                So(s.offerChannel, ShouldNotBeNil)
                                So(s.answerChannel, ShouldNotBeNil)
                                s.Close()
@@ -176,13 +177,13 @@ func TestSnowflakeClient(t *testing.T) {
 
                        Convey("Write buffers when datachannel is nil", func() {
                                c.Write([]byte("test"))
-                               c.snowflake = nil
+                               c.transport = nil
                                So(c.buffer.Bytes(), ShouldResemble, 
[]byte("test"))
                        })
 
                        Convey("Write sends to datachannel when not nil", 
func() {
                                mock := new(MockDataChannel)
-                               c.snowflake = mock
+                               c.transport = mock
                                mock.done = make(chan bool, 1)
                                c.Write([]byte("test"))
                                <-mock.done
diff --git a/client/interfaces.go b/client/interfaces.go
index 4fb0dcf..502eefb 100644
--- a/client/interfaces.go
+++ b/client/interfaces.go
@@ -38,6 +38,9 @@ type SnowflakeCollector interface {
 
        // Remove and return the most available Snowflake from the collection.
        Pop() Snowflake
+
+       // Signal when the collector has stopped collecting.
+       Melted() <-chan struct{}
 }
 
 // Interface to adapt to goptlib's SocksConn struct.
diff --git a/client/peers.go b/client/peers.go
index eb435fd..74b804f 100644
--- a/client/peers.go
+++ b/client/peers.go
@@ -25,6 +25,8 @@ type Peers struct {
        snowflakeChan chan Snowflake
        activePeers   *list.List
        capacity      int
+
+       melt chan struct{}
 }
 
 // Construct a fresh container of remote peers.
@@ -33,17 +35,19 @@ func NewPeers(max int) *Peers {
        // Use buffered go channel to pass snowflakes onwards to the SOCKS 
handler.
        p.snowflakeChan = make(chan Snowflake, max)
        p.activePeers = list.New()
+       p.melt = make(chan struct{}, 1)
        return p
 }
 
 // As part of |SnowflakeCollector| interface.
 func (p *Peers) Collect() error {
-
        cnt := p.Count()
+       s := fmt.Sprintf("Currently at [%d/%d]", cnt, p.capacity)
        if cnt >= p.capacity {
                s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
                return errors.New(s)
        }
+       log.Println("WebRTC: Collecting a new Snowflake.", s)
        // Engage the Snowflake Catching interface, which must be available.
        if nil == p.Tongue {
                return errors.New("Missing Tongue to catch Snowflakes with.")
@@ -60,7 +64,6 @@ func (p *Peers) Collect() error {
 
 // As part of |SnowflakeCollector| interface.
 func (p *Peers) Pop() Snowflake {
-
        // Blocks until an available snowflake appears.
        snowflake, ok := <-p.snowflakeChan
        if !ok {
@@ -71,6 +74,11 @@ func (p *Peers) Pop() Snowflake {
        return snowflake
 }
 
+// As part of |SnowflakeCollector| interface.
+func (p *Peers) Melted() <-chan struct{} {
+       return p.melt
+}
+
 // Returns total available Snowflakes (including the active one)
 // The count only reduces when connections themselves close, rather than when
 // they are popped.
@@ -93,9 +101,16 @@ func (p *Peers) purgeClosedPeers() {
 
 // Close all Peers contained here.
 func (p *Peers) End() {
-       log.Printf("WebRTC: Ending all peer connections.")
-       for e := p.activePeers.Front(); e != nil; e = e.Next() {
+       close(p.snowflakeChan)
+       p.melt <- struct{}{}
+       cnt := p.Count()
+       for e := p.activePeers.Front(); e != nil; {
+               log.Println(e, e.Value)
+               next := e.Next()
                conn := e.Value.(*webRTCConn)
                conn.Close()
+               p.activePeers.Remove(e)
+               e = next
        }
+       log.Println("WebRTC: melted all", cnt, "snowflakes.")
 }
diff --git a/client/rendezvous.go b/client/rendezvous.go
index 6c7aa44..2bcce17 100644
--- a/client/rendezvous.go
+++ b/client/rendezvous.go
@@ -115,15 +115,17 @@ func (bc *BrokerChannel) Negotiate(offer 
*webrtc.SessionDescription) (
 // Implements the |Tongue| interface to catch snowflakes, using BrokerChannel.
 type WebRTCDialer struct {
        *BrokerChannel
-       webrtcConfig *webrtc.Configuration
+       // webrtcConfig *webrtc.Configuration
+       iceServers IceServerList
 }
 
 func NewWebRTCDialer(
        broker *BrokerChannel, iceServers IceServerList) *WebRTCDialer {
-       config := webrtc.NewConfiguration(iceServers...)
+
        return &WebRTCDialer{
                BrokerChannel: broker,
-               webrtcConfig:  config,
+               iceServers:    iceServers,
+               // webrtcConfig:  config,
        }
 }
 
@@ -134,7 +136,8 @@ func (w WebRTCDialer) Catch() (Snowflake, error) {
        }
        // TODO: [#3] Fetch ICE server information from Broker.
        // TODO: [#18] Consider TURN servers here too.
-       connection := NewWebRTCConnection(w.webrtcConfig, w.BrokerChannel)
+       config := webrtc.NewConfiguration(w.iceServers...)
+       connection := NewWebRTCConnection(config, w.BrokerChannel)
        err := connection.Connect()
        return connection, err
 }
diff --git a/client/snowflake.go b/client/snowflake.go
index 9f5cdc7..d122494 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -30,17 +30,19 @@ var handlerChan = make(chan int)
 // transfer to the Tor SOCKS handler when needed.
 func ConnectLoop(snowflakes SnowflakeCollector) {
        for {
+               // Check if ending is necessary.
                err := snowflakes.Collect()
                if nil != err {
                        log.Println("WebRTC:", err,
                                " Retrying in", ReconnectTimeout, "seconds...")
-                       // Failed collections get a timeout.
-                       <-time.After(time.Second * ReconnectTimeout)
+               }
+               select {
+               case <-time.After(time.Second * ReconnectTimeout):
                        continue
+               case <-snowflakes.Melted():
+                       log.Println("ConnectLoop: stopped.")
+                       return
                }
-               // Successful collection gets rate limited to once per second.
-               log.Println("WebRTC: Connected to new Snowflake.")
-               <-time.After(time.Second)
        }
 }
 
@@ -50,7 +52,7 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes 
SnowflakeCollector) error
        log.Println("Started SOCKS listener.")
        for {
                conn, err := ln.AcceptSocks()
-               log.Println("SOCKS accepted ", conn.Req)
+               log.Println("SOCKS accepted: ", conn.Req)
                if err != nil {
                        if e, ok := err.(net.Error); ok && e.Temporary() {
                                continue
@@ -72,6 +74,7 @@ func handler(socks SocksConnector, snowflakes 
SnowflakeCollector) error {
                handlerChan <- -1
        }()
        // Obtain an available WebRTC remote. May block.
+       log.Println("handler: awaiting Snowflake...")
        snowflake := snowflakes.Pop()
        if nil == snowflake {
                socks.Reject()
@@ -85,6 +88,7 @@ func handler(socks SocksConnector, snowflakes 
SnowflakeCollector) error {
        }
 
        // Begin exchanging data.
+       // BUG(serene): There's a leak here when multiplexed.
        go copyLoop(socks, snowflake)
 
        // When WebRTC resets, close the SOCKS connection, which induces new 
handler.
diff --git a/client/webrtc.go b/client/webrtc.go
index c7a46ea..6cd5da6 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -15,19 +15,17 @@ import (
 type webRTCConn struct {
        config    *webrtc.Configuration
        pc        *webrtc.PeerConnection
-       snowflake SnowflakeDataChannel // Holds the WebRTC DataChannel.
+       transport SnowflakeDataChannel // Holds the WebRTC DataChannel.
        broker    *BrokerChannel
 
        offerChannel  chan *webrtc.SessionDescription
        answerChannel chan *webrtc.SessionDescription
        errorChannel  chan error
-       endChannel    chan struct{}
        recvPipe      *io.PipeReader
        writePipe     *io.PipeWriter
        buffer        bytes.Buffer
        reset         chan struct{}
 
-       index  int
        closed bool
 
        BytesLogger
@@ -43,11 +41,11 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 // As part of |io.ReadWriter|
 func (c *webRTCConn) Write(b []byte) (int, error) {
        c.BytesLogger.AddOutbound(len(b))
-       if nil == c.snowflake {
+       if nil == c.transport {
                log.Printf("Buffered %d bytes --> WebRTC", len(b))
                c.buffer.Write(b)
        } else {
-               c.snowflake.Send(b)
+               c.transport.Send(b)
        }
        return len(b), nil
 }
@@ -57,15 +55,6 @@ func (c *webRTCConn) Close() error {
        var err error = nil
        log.Printf("WebRTC: Closing")
        c.cleanup()
-       if nil != c.offerChannel {
-               close(c.offerChannel)
-       }
-       if nil != c.answerChannel {
-               close(c.answerChannel)
-       }
-       if nil != c.errorChannel {
-               close(c.errorChannel)
-       }
        // Mark for deletion.
        c.closed = true
        return err
@@ -106,7 +95,6 @@ func NewWebRTCConnection(config *webrtc.Configuration,
 
 // As part of |Connector| interface.
 func (c *webRTCConn) Connect() error {
-       log.Printf("Establishing WebRTC connection #%d...", c.index)
        // TODO: When go-webrtc is more stable, it's possible that a new
        // PeerConnection won't need to be re-prepared each time.
        err := c.preparePeerConnection()
@@ -174,7 +162,7 @@ func (c *webRTCConn) preparePeerConnection() error {
 
 // Create a WebRTC DataChannel locally.
 func (c *webRTCConn) establishDataChannel() error {
-       if c.snowflake != nil {
+       if c.transport != nil {
                panic("Unexpected datachannel already exists!")
        }
        dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
@@ -187,9 +175,8 @@ func (c *webRTCConn) establishDataChannel() error {
        }
        dc.OnOpen = func() {
                log.Println("WebRTC: DataChannel.OnOpen")
-               if nil != c.snowflake {
-                       log.Println("PeerConnection snowflake already exists.")
-                       panic("PeerConnection snowflake already exists.")
+               if nil != c.transport {
+                       panic("WebRTC: transport already exists.")
                }
                // Flush buffered outgoing SOCKS data if necessary.
                if c.buffer.Len() > 0 {
@@ -198,11 +185,11 @@ func (c *webRTCConn) establishDataChannel() error {
                        c.buffer.Reset()
                }
                // Then enable the datachannel.
-               c.snowflake = dc
+               c.transport = dc
        }
        dc.OnClose = func() {
                // Future writes will go to the buffer until a new DataChannel 
is available.
-               if nil == c.snowflake {
+               if nil == c.transport {
                        // Closed locally, as part of a reset.
                        log.Println("WebRTC: DataChannel.OnClose [locally]")
                        return
@@ -210,7 +197,7 @@ func (c *webRTCConn) establishDataChannel() error {
                // Closed remotely, need to reset everything.
                // Disable the DataChannel as a write destination.
                log.Println("WebRTC: DataChannel.OnClose [remotely]")
-               c.snowflake = nil
+               c.transport = nil
                c.Reset()
        }
        dc.OnMessage = func(msg []byte) {
@@ -284,13 +271,23 @@ func (c *webRTCConn) exchangeSDP() error {
        return nil
 }
 
+// Close all channels and transports
 func (c *webRTCConn) cleanup() {
-       if nil != c.snowflake {
+       if nil != c.offerChannel {
+               close(c.offerChannel)
+       }
+       if nil != c.answerChannel {
+               close(c.answerChannel)
+       }
+       if nil != c.errorChannel {
+               close(c.errorChannel)
+       }
+       if nil != c.transport {
                log.Printf("WebRTC: closing DataChannel")
-               dataChannel := c.snowflake
-               // Setting snowflake to nil *before* Close indicates to OnClose 
that it
+               dataChannel := c.transport
+               // Setting dc to nil *before* Close indicates to OnClose that it
                // was locally triggered.
-               c.snowflake = nil
+               c.transport = nil
                dataChannel.Close()
        }
        if nil != c.pc {

_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to