commit c3ada1b54521927809b2ae67b45684dd412dc612
Author: Serene Han <[email protected]>
Date:   Fri Feb 19 16:17:17 2016 -0800

    Use a channel to safely synchronize datachannel writes, (#12)
    clean up ice candidate log message.
    still need to debug the copy loop break.
---
 client/client_test.go | 11 +++++++++--
 client/snowflake.go   | 54 ++++++++++++++++++++++++++++++++++++---------------
 2 files changed, 47 insertions(+), 18 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index 6ee36d9..85c2144 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -9,10 +9,12 @@ import (
 
 type MockDataChannel struct {
        destination bytes.Buffer
+       done        chan bool
 }
 
 func (m *MockDataChannel) Send(data []byte) {
        m.destination.Write(data)
+       m.done <- true
 }
 
 func (*MockDataChannel) Close() error {
@@ -24,6 +26,7 @@ func TestConnect(t *testing.T) {
 
                Convey("WebRTC Connection", func() {
                        c := new(webRTCConn)
+
                        c.BytesInfo = &BytesInfo{
                                inboundChan: make(chan int), outboundChan: 
make(chan int),
                                inbound: 0, outbound: 0, inEvents: 0, 
outEvents: 0,
@@ -31,15 +34,19 @@ func TestConnect(t *testing.T) {
                        So(c.buffer.Bytes(), ShouldEqual, nil)
 
                        Convey("SendData buffers when datachannel is nil", 
func() {
-                               c.sendData([]byte("test"))
+                               c.SendData([]byte("test"))
                                c.snowflake = nil
                                So(c.buffer.Bytes(), ShouldResemble, 
[]byte("test"))
                        })
 
                        Convey("SendData sends to datachannel when not nil", 
func() {
                                mock := new(MockDataChannel)
+                               mock.done = make(chan bool)
+                               go c.SendLoop()
+                               c.writeChannel = make(chan []byte)
                                c.snowflake = mock
-                               c.sendData([]byte("test"))
+                               c.SendData([]byte("test"))
+                               <-mock.done
                                So(c.buffer.Bytes(), ShouldEqual, nil)
                                So(mock.destination.Bytes(), ShouldResemble, 
[]byte("test"))
                        })
diff --git a/client/snowflake.go b/client/snowflake.go
index eba68eb..d01ec5e 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -36,12 +36,15 @@ const (
 func copyLoop(a, b net.Conn) {
        var wg sync.WaitGroup
        wg.Add(2)
+       // TODO fix the copy loop.
        go func() {
                io.Copy(b, a)
+               log.Println("copy loop b-a break")
                wg.Done()
        }()
        go func() {
                io.Copy(a, b)
+               log.Println("copy loop a-b break")
                wg.Done()
        }()
        wg.Wait()
@@ -63,6 +66,7 @@ type webRTCConn struct {
        offerChannel  chan *webrtc.SessionDescription
        answerChannel chan *webrtc.SessionDescription
        errorChannel  chan error
+       writeChannel  chan []byte
        recvPipe      *io.PipeReader
        writePipe     *io.PipeWriter
        buffer        bytes.Buffer
@@ -77,7 +81,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 }
 
 func (c *webRTCConn) Write(b []byte) (int, error) {
-       c.sendData(b)
+       c.SendData(b)
        return len(b), nil
 }
 
@@ -133,9 +137,9 @@ func (c *webRTCConn) PreparePeerConnection() {
                        }
                }()
        }
+       // Allow candidates to accumulate until OnIceComplete.
        pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
-               log.Printf("WebRTC: OnIceCandidate %s", candidate.Serialize())
-               // Allow candidates to accumulate until OnIceComplete.
+               log.Printf(candidate.Candidate)
        }
        // TODO: This may soon be deprecated, consider 
OnIceGatheringStateChange.
        pc.OnIceComplete = func() {
@@ -169,10 +173,11 @@ func (c *webRTCConn) EstablishDataChannel() error {
                // }
                // Flush the buffer, then enable datachannel.
                // TODO: Make this more safe
-               dc.Send(c.buffer.Bytes())
-               log.Println("Flushed", c.buffer.Len(), "bytes")
-               c.buffer.Reset()
+               // dc.Send(c.buffer.Bytes())
+               // log.Println("Flushed", c.buffer.Len(), "bytes")
+               // c.buffer.Reset()
                c.snowflake = dc
+               c.SendData(nil)
        }
        dc.OnClose = func() {
                // Disable the DataChannel as a write destination.
@@ -180,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
                log.Println("WebRTC: DataChannel.OnClose")
                if nil != c.snowflake {
                        c.snowflake = nil
-                       // Only reset if this OnClose triggered
+                       // Only reset if this OnClose was triggered remotely.
                        c.Reset()
                }
        }
@@ -247,21 +252,32 @@ func (c *webRTCConn) ReceiveAnswer() {
        }()
 }
 
-func (c *webRTCConn) sendData(data []byte) {
-       c.BytesInfo.AddOutbound(len(data))
+func (c *webRTCConn) SendData(data []byte) {
        // Buffer the data in case datachannel isn't available yet.
        if nil == c.snowflake {
                log.Printf("Buffered %d bytes --> WebRTC", len(data))
                c.buffer.Write(data)
                return
        }
-       // Otherwise, flush buffer if necessary.
-       for c.buffer.Len() > 0 {
-               c.snowflake.Send(c.buffer.Bytes())
-               log.Println("Flushed", c.buffer.Len(), "bytes")
-               c.buffer.Reset()
+       go func() {
+               c.writeChannel <- data
+       }()
+}
+
+// Expected in own goroutine.
+func (c *webRTCConn) SendLoop() {
+       log.Println("send loop")
+       for data := range c.writeChannel {
+               // Flush buffer if necessary.
+               for c.buffer.Len() > 0 {
+                       c.snowflake.Send(c.buffer.Bytes())
+                       log.Println("Flushed", c.buffer.Len(), "bytes")
+                       c.buffer.Reset()
+               }
+
+               c.BytesInfo.AddOutbound(len(data))
+               c.snowflake.Send(data)
        }
-       c.snowflake.Send(data)
 }
 
 // WebRTC re-establishment loop. Expected in own goroutine.
@@ -296,6 +312,7 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
        connection.broker = broker
        connection.offerChannel = make(chan *webrtc.SessionDescription)
        connection.answerChannel = make(chan *webrtc.SessionDescription)
+       connection.writeChannel = make(chan []byte)
        connection.errorChannel = make(chan error)
        connection.reset = make(chan struct{})
        connection.BytesInfo = &BytesInfo{
@@ -308,6 +325,7 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
        connection.recvPipe, connection.writePipe = io.Pipe()
 
        go connection.ConnectLoop()
+       go connection.SendLoop()
        return connection, nil
 }
 
@@ -317,8 +335,10 @@ func endWebRTC() {
                return
        }
        if nil != webrtcRemote.snowflake {
+               s := webrtcRemote.snowflake
+               webrtcRemote.snowflake = nil
                log.Printf("WebRTC: closing DataChannel")
-               webrtcRemote.snowflake.Close()
+               s.Close()
        }
        if nil != webrtcRemote.pc {
                log.Printf("WebRTC: closing PeerConnection")
@@ -333,6 +353,7 @@ func handler(conn *pt.SocksConn) error {
                handlerChan <- -1
        }()
        defer conn.Close()
+       log.Println("handler", conn)
 
        // TODO: [#3] Fetch ICE server information from Broker.
        // TODO: [#18] Consider TURN servers here too.
@@ -357,6 +378,7 @@ func handler(conn *pt.SocksConn) error {
        }
 
        copyLoop(conn, remote)
+       log.Println("----END---")
        return nil
 }
 



_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to