commit ac8669b38f8b2a363b5f46036ea8e5de5a69c121
Author: David Fifield <da...@bamsoftware.com>
Date:   Mon Jan 18 21:13:08 2016 -0800

    Refactor signal receiving in server.
    
    There's one FIFO reader goroutine instead of one per bindaddr.
    makePeerConnectionFromOffer gives you a PeerConnection with an answer
    and also sets up callbacks to pass a webRTCConn to datachannelHandler
    when ready.
---
 server/snowflake.go |  196 ++++++++++++++++++++++++++-------------------------
 1 file changed, 99 insertions(+), 97 deletions(-)

diff --git a/server/snowflake.go b/server/snowflake.go
index d046998..88a07d8 100644
--- a/server/snowflake.go
+++ b/server/snowflake.go
@@ -81,6 +81,8 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
 }
 
 func datachannelHandler(conn *webRTCConn) {
+       defer conn.Close()
+
        handlerChan <- 1
        defer func() {
                handlerChan <- -1
@@ -91,121 +93,123 @@ func datachannelHandler(conn *webRTCConn) {
                log.Printf("Failed to connect to ORPort: " + err.Error())
                return
        }
-       //defer or.Close()
-
-       pr, pw := io.Pipe()
-       conn.pr = pr
-
-       dc := conn.dc
-       dc.OnOpen = func() {
-               log.Println("OnOpen channel")
-       }
-       dc.OnClose = func() {
-               log.Println("OnClose channel")
-               pw.Close()
-       }
-       dc.OnMessage = func(msg []byte) {
-               // log.Printf("OnMessage channel %d %+q", len(msg), msg)
-               log.Printf("OnMessage <--- %d bytes", len(msg))
-               n, err := pw.Write(msg)
-               if err != nil {
-                       pw.CloseWithError(err)
-               }
-               if n != len(msg) {
-                       panic("short write")
-               }
-       }
+       defer or.Close()
 
-       go copyLoop(conn, or)
+       copyLoop(conn, or)
 }
 
-func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, 
error) {
-       pc, err := webrtc.NewPeerConnection(config)
+// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE
+// candidates is complete and and answer is available in LocalDescription.
+// Installs an OnDataChannel callback that creates a webRTCConn and passes it 
to
+// datachannelHandler.
+func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config 
*webrtc.Configuration) (*webrtc.PeerConnection, error) {
+       errChan := make(chan error)
+       answerChan := make(chan *webrtc.SessionDescription)
 
+       pc, err := webrtc.NewPeerConnection(config)
        if err != nil {
-               log.Printf("NewPeerConnection: %s", err)
-               return nil, err
+               return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
        }
        pc.OnNegotiationNeeded = func() {
                panic("OnNegotiationNeeded")
        }
-       pc.OnDataChannel = func(dc *data.Channel) {
-               log.Println("OnDataChannel")
-               datachannelHandler(&webRTCConn{pc: pc, dc: dc})
-       }
        pc.OnIceComplete = func() {
-               log.Printf("----------------")
-               fmt.Fprintln(logFile, pc.LocalDescription().Serialize())
-               log.Printf("----------------")
+               answerChan <- pc.LocalDescription()
        }
-       return pc, nil
-}
+       pc.OnDataChannel = func(dc *data.Channel) {
+               log.Println("OnDataChannel")
 
-func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f 
*os.File) {
-       s := bufio.NewScanner(f)
-       for s.Scan() {
-               msg := s.Text()
-               sdp := webrtc.DeserializeSessionDescription(msg)
-               if sdp == nil {
-                       log.Printf("ignoring invalid signal message %+q", msg)
-                       continue
+               pr, pw := io.Pipe()
+
+               dc.OnOpen = func() {
+                       log.Println("OnOpen channel")
                }
-               signalChan <- sdp
-               continue
-       }
-       if err := s.Err(); err != nil {
-               log.Printf("signal FIFO: %s", err)
+               dc.OnClose = func() {
+                       log.Println("OnClose channel")
+                       pw.Close()
+               }
+               dc.OnMessage = func(msg []byte) {
+                       log.Printf("OnMessage <--- %d bytes", len(msg))
+                       n, err := pw.Write(msg)
+                       if err != nil {
+                               pw.CloseWithError(err)
+                       }
+                       if n != len(msg) {
+                               panic("short write")
+                       }
+               }
+
+               conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+               go datachannelHandler(conn)
        }
-}
 
-func generateAnswer(pc *webrtc.PeerConnection) {
-       fmt.Println("Generating answer...")
-       answer, err := pc.CreateAnswer() // blocking
+       err = pc.SetRemoteDescription(sdp)
        if err != nil {
-               fmt.Println(err)
-               return
+               pc.Close()
+               return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
        }
-       pc.SetLocalDescription(answer)
+       log.Println("sdp offer successfully received.")
+
+       go func() {
+               log.Println("Generating answer...")
+               answer, err := pc.CreateAnswer() // blocking
+               if err != nil {
+                       errChan <- err
+                       return
+               }
+               err = pc.SetLocalDescription(answer)
+               if err != nil {
+                       errChan <- err
+                       return
+               }
+       }()
+
+       // Wait until answer is ready.
+       select {
+       case err = <-errChan:
+               pc.Close()
+               return nil, err
+       case <-answerChan:
+       }
+
+       return pc, nil
 }
 
-func listenWebRTC(config *webrtc.Configuration, signal string) (*os.File, 
error) {
-       err := syscall.Mkfifo(signal, 0600)
+// Create a signaling named pipe and feed offers from it into
+// makePeerConnectionFromOffer.
+func receiveSignalsFIFO(filename string, config *webrtc.Configuration) error {
+       err := syscall.Mkfifo(filename, 0600)
        if err != nil {
                if err.(syscall.Errno) != syscall.EEXIST {
-                       return nil, err
+                       return err
                }
        }
-       signalFile, err := os.OpenFile(signal, os.O_RDONLY, 0600)
+       signalFile, err := os.OpenFile(filename, os.O_RDONLY, 0600)
        if err != nil {
-               return nil, err
+               return err
        }
-       //defer signalFile.Close()
+       defer signalFile.Close()
 
-       var signalChan = make(chan *webrtc.SessionDescription)
-
-       go func() {
-               for {
-                       select {
-                       case sdp := <-signalChan:
-                               pc, err := makePeerConnection(config)
-                               if err != nil {
-                                       log.Printf("makePeerConnection: %s", 
err)
-                                       break
-                               }
-                               err = pc.SetRemoteDescription(sdp)
-                               if err != nil {
-                                       fmt.Println("ERROR", err)
-                                       break
-                               }
-                               fmt.Println("sdp offer successfully received.")
-                               go generateAnswer(pc)
-                       }
+       s := bufio.NewScanner(signalFile)
+       for s.Scan() {
+               msg := s.Text()
+               sdp := webrtc.DeserializeSessionDescription(msg)
+               if sdp == nil {
+                       log.Printf("ignoring invalid signal message %+q", msg)
+                       continue
                }
-       }()
 
-       go readSignalingMessages(signalChan, signalFile)
-       log.Printf("waiting for offer")
-       return signalFile, nil
+               pc, err := makePeerConnectionFromOffer(sdp, config)
+               if err != nil {
+                       log.Printf("makePeerConnectionFromOffer: %s", err)
+                       continue
+               }
+               // Write offer to log for manual signaling.
+               log.Printf("----------------")
+               fmt.Fprintln(logFile, pc.LocalDescription().Serialize())
+               log.Printf("----------------")
+       }
+       return s.Err()
 }
 
 func main() {
@@ -228,18 +232,19 @@ func main() {
 
        webRTCConfig := 
webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302"))
 
-       listeners := make([]*os.File, 0)
+       // Start FIFO-based signaling receiver.
+       go func() {
+               err := receiveSignalsFIFO("signal", webRTCConfig)
+               if err != nil {
+                       log.Printf("receiveSignalsFIFO: %s", err)
+               }
+       }()
+
        for _, bindaddr := range ptInfo.Bindaddrs {
                switch bindaddr.MethodName {
                case ptMethodName:
-                       ln, err := listenWebRTC(webRTCConfig, "signal") // meh
-                       if err != nil {
-                               pt.SmethodError(bindaddr.MethodName, 
err.Error())
-                               break
-                       }
                        bindaddr.Addr.Port = 12345 // lies!!!
                        pt.Smethod(bindaddr.MethodName, bindaddr.Addr)
-                       listeners = append(listeners, ln)
                default:
                        pt.SmethodError(bindaddr.MethodName, "no such method")
                }
@@ -260,9 +265,6 @@ func main() {
                case sig = <-sigChan:
                }
        }
-       for _, ln := range listeners {
-               ln.Close()
-       }
 
        if sig == syscall.SIGTERM {
                return



_______________________________________________
tor-commits mailing list
tor-commits@lists.torproject.org
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to