commit 760dee8a0f3efc1a71780bfde277ae7a5a7a6d9b
Author: Serene Han <[email protected]>
Date:   Wed Feb 17 17:39:09 2016 -0800

    prepare snowflake client for buffered datachannel writes, separate out 
dialWebRTC (#12)
---
 client/snowflake.go | 196 ++++++++++++++++++++++++++++++----------------------
 1 file changed, 115 insertions(+), 81 deletions(-)

diff --git a/client/snowflake.go b/client/snowflake.go
index ba1561e..7c47fbb 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -28,7 +28,7 @@ var frontDomain string
 // ends, -1 is written.
 var handlerChan = make(chan int)
 
-var signalChan = make(chan *webrtc.SessionDescription)
+var answerChannel = make(chan *webrtc.SessionDescription)
 
 func copyLoop(a, b net.Conn) {
        var wg sync.WaitGroup
@@ -46,10 +46,16 @@ func copyLoop(a, b net.Conn) {
        wg.Wait()
 }
 
+// Implements net.Conn interface
 type webRTCConn struct {
-       pc       *webrtc.PeerConnection
-       dc       *webrtc.DataChannel
-       recvPipe *io.PipeReader
+       pc           *webrtc.PeerConnection
+       dc           *webrtc.DataChannel
+       broker       *BrokerChannel
+       recvPipe     *io.PipeReader
+       writePipe    *io.PipeWriter
+       offerChannel chan *webrtc.SessionDescription
+       errorChannel chan error
+       openChannel  chan struct{}
 }
 
 var webrtcRemote *webRTCConn
@@ -61,6 +67,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 func (c *webRTCConn) Write(b []byte) (int, error) {
        // log.Printf("webrtc Write %d %+q", len(b), string(b))
        log.Printf("Write %d bytes --> WebRTC", len(b))
+       // Buffer in case datachannel isn't available.
        c.dc.Send(b)
        return len(b), nil
 }
@@ -90,18 +97,87 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
        return fmt.Errorf("SetWriteDeadline not implemented")
 }
 
-func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
-       *webRTCConn, error) {
+// Create a WebRTC DataChannel locally.
+// This triggers "OnNegotiationNeeded" which should prepare an SDP offer.
+func (c *webRTCConn) EstablishDataChannel() error {
+       dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
+       if err != nil {
+               log.Printf("CreateDataChannel: %s", err)
+               return err
+       }
+       dc.OnOpen = func() {
+               log.Println("OnOpen channel")
+               c.openChannel <- struct{}{}
+       }
+       dc.OnClose = func() {
+               log.Println("OnClose channel")
+               // writePipe.Close()
+               close(c.openChannel)
+               // TODO: (Issue #12) Should attempt to renegotiate at this 
point.
+       }
+       dc.OnMessage = func(msg []byte) {
+               log.Printf("OnMessage <--- %d bytes", len(msg))
+               n, err := c.writePipe.Write(msg)
+               if err != nil {
+                       // TODO: Maybe shouldn't actually close.
+                       c.writePipe.CloseWithError(err)
+               }
+               if n != len(msg) {
+                       panic("short write")
+               }
+       }
+       c.dc = dc
+       return nil
+}
 
-       offerChan := make(chan *webrtc.SessionDescription)
-       errChan := make(chan error)
-       openChan := make(chan struct{})
+// Block until an offer is available, then send it to either
+// the Broker or signal pipe.
+func (c *webRTCConn) sendOffer() error {
+       select {
+       case offer := <-c.offerChannel:
+               if "" == brokerURL {
+                       log.Printf("Please Copy & Paste the following to the 
peer:")
+                       log.Printf("----------------")
+                       fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n")
+                       log.Printf("----------------")
+                       return nil
+               }
+               // Use Broker...
+               go func() {
+                       log.Println("Sending offer via BrokerChannel...\nTarget 
URL: ", brokerURL,
+                               "\nFront URL:  ", frontDomain)
+                       answer, err := 
c.broker.Negotiate(c.pc.LocalDescription())
+                       if nil != err {
+                               log.Printf("BrokerChannel signaling error: %s", 
err)
+                               return
+                       }
+                       if nil == answer {
+                               log.Printf("BrokerChannel: No answer received.")
+                               return
+                               // return errors.New("No answer received.")
+                       }
+                       answerChannel <- answer
+               }()
+       case err := <-c.errorChannel:
+               c.pc.Close()
+               return err
+       }
+       return nil
+}
 
+func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
+       *webRTCConn, error) {
        pc, err := webrtc.NewPeerConnection(config)
        if err != nil {
                log.Printf("NewPeerConnection: %s", err)
                return nil, err
        }
+       connection := new(webRTCConn)
+       connection.broker = broker
+       connection.pc = pc
+       connection.offerChannel = make(chan *webrtc.SessionDescription)
+       connection.errorChannel = make(chan error)
+       connection.openChannel = make(chan struct{})
 
        // Triggered by CreateDataChannel.
        pc.OnNegotiationNeeded = func() {
@@ -109,12 +185,12 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
                go func() {
                        offer, err := pc.CreateOffer()
                        if err != nil {
-                               errChan <- err
+                               connection.errorChannel <- err
                                return
                        }
                        err = pc.SetLocalDescription(offer)
                        if err != nil {
-                               errChan <- err
+                               connection.errorChannel <- err
                                return
                        }
                }()
@@ -126,7 +202,7 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
        // TODO: This may soon be deprecated, consider 
OnIceGatheringStateChange.
        pc.OnIceComplete = func() {
                log.Printf("OnIceComplete")
-               offerChan <- pc.LocalDescription()
+               connection.offerChannel <- pc.LocalDescription()
        }
        // This callback is not expected, as the Client initiates the creation
        // of the data channel, not the remote peer.
@@ -135,62 +211,14 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
                panic("OnDataChannel")
        }
 
-       pr, pw := io.Pipe()
+       // Pipes remain the same even when DataChannel gets switched.
+       connection.recvPipe, connection.writePipe = io.Pipe()
 
-       dc, err := pc.CreateDataChannel("test", webrtc.Init{})
-       if err != nil {
-               log.Printf("CreateDataChannel: %s", err)
-               return nil, err
-       }
-       dc.OnOpen = func() {
-               log.Println("OnOpen channel")
-               openChan <- struct{}{}
-       }
-       dc.OnClose = func() {
-               log.Println("OnClose channel")
-               pw.Close()
-               close(openChan)
-               // TODO: (Issue #12) Should attempt to renegotiate at this 
point.
-       }
-       dc.OnMessage = func(msg []byte) {
-               log.Printf("OnMessage <--- %d bytes", len(msg))
-               n, err := pw.Write(msg)
-               if err != nil {
-                       pw.CloseWithError(err)
-               }
-               if n != len(msg) {
-                       panic("short write")
-               }
-       }
-
-       select {
-       case err := <-errChan:
-               pc.Close()
-               return nil, err
-       case offer := <-offerChan:
-               log.Printf("----------------")
-               fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n")
-               log.Printf("----------------")
-               go func() {
-                       if "" != brokerURL {
-                               log.Println("Sending offer via 
BrokerChannel...\nTarget URL: ", brokerURL,
-                                       "\nFront URL:  ", frontDomain)
-                               answer, err := 
broker.Negotiate(pc.LocalDescription())
-                               if nil != err {
-                                       log.Printf("BrokerChannel signaling 
error: %s", err)
-                               }
-                               if nil == answer {
-                                       log.Printf("BrokerChannel: No answer 
received.")
-                               } else {
-                                       signalChan <- answer
-                               }
-                       }
-               }()
-       }
-
-       log.Printf("waiting for answer")
-       answer, ok := <-signalChan
+       connection.EstablishDataChannel()
+       connection.sendOffer()
 
+       log.Printf("waiting for answer...")
+       answer, ok := <-answerChannel
        if !ok {
                pc.Close()
                return nil, fmt.Errorf("no answer received")
@@ -205,13 +233,13 @@ func dialWebRTC(config *webrtc.Configuration, broker 
*BrokerChannel) (
        // Wait until data channel is open; otherwise for example sends may get
        // lost.
        // TODO: Buffering *should* work though.
-       _, ok = <-openChan
+       _, ok = <-connection.openChannel
        if !ok {
                pc.Close()
                return nil, fmt.Errorf("failed to open data channel")
        }
 
-       return &webRTCConn{pc: pc, dc: dc, recvPipe: pr}, nil
+       return connection, nil
 }
 
 func endWebRTC() {
@@ -229,6 +257,7 @@ func endWebRTC() {
        }
 }
 
+// Establish a WebRTC channel for SOCKS connections.
 func handler(conn *pt.SocksConn) error {
        handlerChan <- 1
        defer func() {
@@ -259,7 +288,6 @@ func handler(conn *pt.SocksConn) error {
        }
 
        copyLoop(conn, remote)
-
        return nil
 }
 
@@ -293,10 +321,10 @@ func readSignalingMessages(f *os.File) {
                        log.Printf("ignoring invalid signal message %+q", msg)
                        continue
                }
-               signalChan <- sdp
+               answerChannel <- sdp
        }
-       log.Printf("close signalChan")
-       close(signalChan)
+       log.Printf("close answerChannel")
+       close(answerChannel)
        if err := s.Err(); err != nil {
                log.Printf("signal FIFO: %s", err)
        }
@@ -308,19 +336,25 @@ func main() {
        flag.StringVar(&brokerURL, "url", "", "URL of signaling broker")
        flag.StringVar(&frontDomain, "front", "", "front domain")
        flag.Parse()
-
        logFile, err = os.OpenFile("snowflake.log", 
os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
        if err != nil {
                log.Fatal(err)
        }
        defer logFile.Close()
        log.SetOutput(logFile)
-       log.Println("starting")
-
-       if "" == brokerURL {
+       log.Println("\nStarting Snowflake Client...")
+
+       // Expect user to copy-paste if
+       // TODO: Maybe just get rid of copy-paste entirely.
+       if "" != brokerURL {
+               log.Println("Rendezvous using Broker at: ", brokerURL)
+               if "" != frontDomain {
+                       log.Println("Domain fronting using:", frontDomain)
+               }
+       } else {
                log.Println("No HTTP signaling detected. Waiting for a 
\"signal\" pipe...")
                // This FIFO receives signaling messages.
-               err = syscall.Mkfifo("signal", 0600)
+               err := syscall.Mkfifo("signal", 0600)
                if err != nil {
                        if err.(syscall.Errno) != syscall.EEXIST {
                                log.Fatal(err)
@@ -363,6 +397,7 @@ func main() {
                }
        }
        pt.CmethodsDone()
+       defer endWebRTC()
 
        var numHandlers int = 0
        var sig os.Signal
@@ -382,10 +417,9 @@ func main() {
                ln.Close()
        }
 
-       if syscall.SIGTERM == sig || syscall.SIGINT == sig {
-               endWebRTC()
-               return
-       }
+       // if syscall.SIGTERM == sig || syscall.SIGINT == sig {
+       // return
+       // }
 
        // wait for second signal or no more handlers
        sig = nil



_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to