commit eb7eb04ac01b7a7d2466958b35403c6729014a1f
Author: Serene Han <[email protected]>
Date:   Wed Feb 17 18:38:40 2016 -0800

    Buffer writes to DataChannel, remove blocking on openChannel (#12)
---
 client/client_test.go | 44 +++++++++++++++++++++++++
 client/snowflake.go   | 89 ++++++++++++++++++++++++++++-----------------------
 2 files changed, 93 insertions(+), 40 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
new file mode 100644
index 0000000..7b8dad2
--- /dev/null
+++ b/client/client_test.go
@@ -0,0 +1,44 @@
+package main
+
+import (
+       "bytes"
+       . "github.com/smartystreets/goconvey/convey"
+       "testing"
+)
+
+type MockDataChannel struct {
+       destination bytes.Buffer
+}
+
+func (m *MockDataChannel) Send(data []byte) {
+       m.destination.Write(data)
+}
+
+func (*MockDataChannel) Close() error {
+       return nil
+}
+
+func TestConnect(t *testing.T) {
+       Convey("Snowflake", t, func() {
+
+               Convey("WebRTC Connection", func() {
+                       c := new(webRTCConn)
+                       So(c.buffer.Bytes(), ShouldEqual, nil)
+
+                       Convey("SendData buffers when datachannel is nil", 
func() {
+                               c.sendData([]byte("test"))
+                               c.snowflake = nil
+                               So(c.buffer.Bytes(), ShouldResemble, 
[]byte("test"))
+                       })
+
+                       Convey("SendData sends to datachannel when not nil", 
func() {
+                               mock := new(MockDataChannel)
+                               c.snowflake = mock
+                               c.sendData([]byte("test"))
+                               So(c.buffer.Bytes(), ShouldEqual, nil)
+                               So(mock.destination.Bytes(), ShouldResemble, 
[]byte("test"))
+                       })
+               })
+
+       })
+}
diff --git a/client/snowflake.go b/client/snowflake.go
index 7c47fbb..771e90b 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -3,6 +3,7 @@ package main
 
 import (
        "bufio"
+       "bytes"
        "errors"
        "flag"
        "fmt"
@@ -46,16 +47,22 @@ func copyLoop(a, b net.Conn) {
        wg.Wait()
 }
 
+// Interface that matches both webrc.DataChannel and for testing.
+type SnowflakeChannel interface {
+       Send([]byte)
+       Close() error
+}
+
 // Implements net.Conn interface
 type webRTCConn struct {
        pc           *webrtc.PeerConnection
-       dc           *webrtc.DataChannel
+       snowflake    SnowflakeChannel  // Interface holding the WebRTC 
DataChannel.
        broker       *BrokerChannel
-       recvPipe     *io.PipeReader
-       writePipe    *io.PipeWriter
        offerChannel chan *webrtc.SessionDescription
        errorChannel chan error
-       openChannel  chan struct{}
+       recvPipe     *io.PipeReader
+       writePipe    *io.PipeWriter
+       buffer       bytes.Buffer
 }
 
 var webrtcRemote *webRTCConn
@@ -66,9 +73,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 
 func (c *webRTCConn) Write(b []byte) (int, error) {
        // log.Printf("webrtc Write %d %+q", len(b), string(b))
-       log.Printf("Write %d bytes --> WebRTC", len(b))
-       // Buffer in case datachannel isn't available.
-       c.dc.Send(b)
+       c.sendData(b)
        return len(b), nil
 }
 
@@ -98,21 +103,29 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
 }
 
 // Create a WebRTC DataChannel locally.
-// This triggers "OnNegotiationNeeded" which should prepare an SDP offer.
 func (c *webRTCConn) EstablishDataChannel() error {
        dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
+       // Triggers "OnNegotiationNeeded" on the PeerConnection, which will  
prepare
+       // an SDP offer while other goroutines operating on this struct handle 
the
+       // signaling. Eventually fires "OnOpen".
        if err != nil {
                log.Printf("CreateDataChannel: %s", err)
                return err
        }
        dc.OnOpen = func() {
-               log.Println("OnOpen channel")
-               c.openChannel <- struct{}{}
+               log.Println("WebRTC: DataChannel.OnOpen")
+               // Flush the buffer, then enable datachannel.
+               // TODO: Make this more safe
+               dc.Send(c.buffer.Bytes())
+               log.Println("Flushed ", c.buffer.Len(), " bytes")
+               c.buffer.Reset()
+               c.snowflake = dc
        }
        dc.OnClose = func() {
-               log.Println("OnClose channel")
-               // writePipe.Close()
-               close(c.openChannel)
+               // Disable the DataChannel as a write destination.
+               // Future writes will go to the buffer until a new DataChannel 
is available.
+               log.Println("WebRTC: DataChannel.OnClose")
+               c.snowflake = nil
                // TODO: (Issue #12) Should attempt to renegotiate at this 
point.
        }
        dc.OnMessage = func(msg []byte) {
@@ -126,7 +139,6 @@ func (c *webRTCConn) EstablishDataChannel() error {
                        panic("short write")
                }
        }
-       c.dc = dc
        return nil
 }
 
@@ -153,8 +165,8 @@ func (c *webRTCConn) sendOffer() error {
                        }
                        if nil == answer {
                                log.Printf("BrokerChannel: No answer received.")
+                               // TODO: Should try again here.
                                return
-                               // return errors.New("No answer received.")
                        }
                        answerChannel <- answer
                }()
@@ -165,6 +177,18 @@ func (c *webRTCConn) sendOffer() error {
        return nil
 }
 
+func (c *webRTCConn) sendData(data []byte) {
+       // Buffer the data in case datachannel isn't available yet.
+       if nil == c.snowflake {
+               log.Printf("Buffered %d bytes --> WebRTC", len(data))
+               c.buffer.Write(data)
+               return
+       }
+       log.Printf("Write %d bytes --> WebRTC", len(data))
+       c.snowflake.Send(data)
+}
+
+// Initialize a WebRTC Connection.
 func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
        *webRTCConn, error) {
        pc, err := webrtc.NewPeerConnection(config)
@@ -177,13 +201,14 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
        connection.pc = pc
        connection.offerChannel = make(chan *webrtc.SessionDescription)
        connection.errorChannel = make(chan error)
-       connection.openChannel = make(chan struct{})
+       // Pipes remain the same even when DataChannel gets switched.
+       connection.recvPipe, connection.writePipe = io.Pipe()
 
-       // Triggered by CreateDataChannel.
        pc.OnNegotiationNeeded = func() {
                log.Println("OnNegotiationNeeded")
                go func() {
                        offer, err := pc.CreateOffer()
+                       // TODO: Potentially timeout and retry if ICE isn't 
working.
                        if err != nil {
                                connection.errorChannel <- err
                                return
@@ -208,18 +233,17 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
        // of the data channel, not the remote peer.
        pc.OnDataChannel = func(channel *webrtc.DataChannel) {
                log.Println("OnDataChannel")
-               panic("OnDataChannel")
+               panic("Unexpected OnDataChannel!")
        }
 
-       // Pipes remain the same even when DataChannel gets switched.
-       connection.recvPipe, connection.writePipe = io.Pipe()
-
        connection.EstablishDataChannel()
-       connection.sendOffer()
 
+       // TODO: Make this part of a re-establishment loop.
+       connection.sendOffer()
        log.Printf("waiting for answer...")
        answer, ok := <-answerChannel
        if !ok {
+               // TODO: Don't just fail, try again!
                pc.Close()
                return nil, fmt.Errorf("no answer received")
        }
@@ -230,15 +254,6 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
                return nil, err
        }
 
-       // Wait until data channel is open; otherwise for example sends may get
-       // lost.
-       // TODO: Buffering *should* work though.
-       _, ok = <-connection.openChannel
-       if !ok {
-               pc.Close()
-               return nil, fmt.Errorf("failed to open data channel")
-       }
-
        return connection, nil
 }
 
@@ -247,9 +262,9 @@ func endWebRTC() {
        if nil == webrtcRemote {
                return
        }
-       if nil != webrtcRemote.dc {
+       if nil != webrtcRemote.snowflake {
                log.Printf("WebRTC: closing DataChannel")
-               webrtcRemote.dc.Close()
+               webrtcRemote.snowflake.Close()
        }
        if nil != webrtcRemote.pc {
                log.Printf("WebRTC: closing PeerConnection")
@@ -332,7 +347,7 @@ func readSignalingMessages(f *os.File) {
 
 func main() {
        var err error
-
+       webrtc.SetLoggingVerbosity(1)
        flag.StringVar(&brokerURL, "url", "", "URL of signaling broker")
        flag.StringVar(&frontDomain, "front", "", "front domain")
        flag.Parse()
@@ -368,8 +383,6 @@ func main() {
                go readSignalingMessages(signalFile)
        }
 
-       webrtc.SetLoggingVerbosity(1)
-
        ptInfo, err = pt.ClientSetup(nil)
        if err != nil {
                log.Fatal(err)
@@ -417,10 +430,6 @@ func main() {
                ln.Close()
        }
 
-       // if syscall.SIGTERM == sig || syscall.SIGINT == sig {
-       // return
-       // }
-
        // wait for second signal or no more handlers
        sig = nil
        for sig == nil && numHandlers != 0 {



_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to