commit 2caa47988dc2e7d78db083d2609b8656e4731079
Author: Serene Han <[email protected]>
Date:   Sat Jun 11 23:18:38 2016 -0700

    fix Peers.Count() using activePeers list, mark for delete on Close, and 
remove
    maxedChan
---
 client/client_test.go | 77 +++++++++++++++++++++++++++++++--------------------
 client/peers.go       | 72 ++++++++++++++++++++++++++---------------------
 client/snowflake.go   |  8 ++++--
 client/webrtc.go      |  5 ++--
 4 files changed, 95 insertions(+), 67 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index f58aeb0..de83768 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -75,7 +75,7 @@ func TestSnowflakeClient(t *testing.T) {
                        snowflakes.Tongue = FakeDialer{}
 
                        go ConnectLoop(snowflakes)
-                       <-snowflakes.maxedChan
+                       // <-snowflakes.maxedChan
 
                        So(snowflakes.Count(), ShouldEqual, 1)
                        r := <-snowflakes.snowflakeChan
@@ -88,7 +88,7 @@ func TestSnowflakeClient(t *testing.T) {
                        snowflakes.Tongue = FakeDialer{}
 
                        go ConnectLoop(snowflakes)
-                       <-snowflakes.maxedChan
+                       // <-snowflakes.maxedChan
                        So(snowflakes.Count(), ShouldEqual, 3)
                        <-snowflakes.snowflakeChan
                        <-snowflakes.snowflakeChan
@@ -101,13 +101,13 @@ func TestSnowflakeClient(t *testing.T) {
                        snowflakes.Tongue = FakeDialer{}
 
                        go ConnectLoop(snowflakes)
-                       <-snowflakes.maxedChan
+                       // <-snowflakes.maxedChan
                        So(snowflakes.Count(), ShouldEqual, 3)
 
                        r := <-snowflakes.snowflakeChan
                        So(snowflakes.Count(), ShouldEqual, 2)
                        r.Close()
-                       <-snowflakes.maxedChan
+                       // <-snowflakes.maxedChan
                        So(snowflakes.Count(), ShouldEqual, 3)
 
                        <-snowflakes.snowflakeChan
@@ -121,7 +121,6 @@ func TestSnowflakeClient(t *testing.T) {
                Convey("Can construct", func() {
                        p := NewPeers(1)
                        So(p.capacity, ShouldEqual, 1)
-                       So(p.current, ShouldEqual, nil)
                        So(p.snowflakeChan, ShouldNotBeNil)
                        So(cap(p.snowflakeChan), ShouldEqual, 1)
                })
@@ -136,36 +135,54 @@ func TestSnowflakeClient(t *testing.T) {
                        err = p.Collect()
                        So(err, ShouldBeNil)
                        So(p.Count(), ShouldEqual, 1)
-      // S
+                       // S
                        err = p.Collect()
                })
 
                Convey("Collection continues until capacity.", func() {
-      c := 5
+                       c := 5
                        p := NewPeers(c)
-      p.Tongue = FakeDialer{}
-      // Fill up to capacity.
-      for i := 0 ; i < c ; i++ {
-             fmt.Println("Adding snowflake ", i)
-                         err := p.Collect()
-                         So(err, ShouldBeNil)
-               So(p.Count(), ShouldEqual, i + 1)
-      }
-      // But adding another gives an error.
-               So(p.Count(), ShouldEqual, c)
-               err := p.Collect()
-               So(err, ShouldNotBeNil)
-               So(p.Count(), ShouldEqual, c)
-
-      // But popping allows it to continue.
-      s := p.Pop()
-      So(s, ShouldNotBeNil)
-               So(p.Count(), ShouldEqual, c)
-
-               // err = p.Collect()
-               // So(err, ShouldNotBeNil)
-               // So(p.Count(), ShouldEqual, c)
-    })
+                       p.Tongue = FakeDialer{}
+                       // Fill up to capacity.
+                       for i := 0; i < c; i++ {
+                               fmt.Println("Adding snowflake ", i)
+                               err := p.Collect()
+                               So(err, ShouldBeNil)
+                               So(p.Count(), ShouldEqual, i+1)
+                       }
+                       // But adding another gives an error.
+                       So(p.Count(), ShouldEqual, c)
+                       err := p.Collect()
+                       So(err, ShouldNotBeNil)
+                       So(p.Count(), ShouldEqual, c)
+
+                       // But popping and closing allows it to continue.
+                       s := p.Pop()
+                       s.Close()
+                       So(s, ShouldNotBeNil)
+                       So(p.Count(), ShouldEqual, c-1)
+
+                       err = p.Collect()
+                       So(err, ShouldBeNil)
+                       So(p.Count(), ShouldEqual, c)
+               })
+
+               Convey("Count correctly purges peers marked for deletion.", 
func() {
+                       p := NewPeers(4)
+                       p.Tongue = FakeDialer{}
+                       p.Collect()
+                       p.Collect()
+                       p.Collect()
+                       p.Collect()
+                       So(p.Count(), ShouldEqual, 4)
+                       s := p.Pop()
+                       s.Close()
+                       So(p.Count(), ShouldEqual, 3)
+                       s = p.Pop()
+                       s.Close()
+                       So(p.Count(), ShouldEqual, 2)
+               })
+
        })
 
        Convey("Snowflake", t, func() {
diff --git a/client/peers.go b/client/peers.go
index 769174b..57570bf 100644
--- a/client/peers.go
+++ b/client/peers.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "container/list"
        "errors"
        "fmt"
        "log"
@@ -22,70 +23,77 @@ type Peers struct {
        BytesLogger
 
        snowflakeChan chan *webRTCConn
-       current       *webRTCConn
+       activePeers   *list.List
        capacity      int
-       // TODO: Probably not necessary.
-       maxedChan chan struct{}
 }
 
 // Construct a fresh container of remote peers.
 func NewPeers(max int) *Peers {
-       p := &Peers{capacity: max, current: nil}
+       p := &Peers{capacity: max}
        // Use buffered go channel to pass new snowflakes onwards to the SOCKS 
handler.
        p.snowflakeChan = make(chan *webRTCConn, max)
-       p.maxedChan = make(chan struct{}, 1)
+       p.activePeers = list.New()
        return p
 }
 
-// TODO: Needs fixing.
-func (p *Peers) Count() int {
-       count := 0
-       if p.current != nil {
-               count = 1
-       }
-       return count + len(p.snowflakeChan)
-}
-
 // As part of |SnowflakeCollector| interface.
 func (p *Peers) Collect() error {
-       if p.Count() >= p.capacity {
-               s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
-               p.maxedChan <- struct{}{}
+       cnt := p.Count()
+       if cnt >= p.capacity {
+               s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
                return errors.New(s)
        }
-  // Engage the Snowflake Catching interface, which must be available.
+       // Engage the Snowflake Catching interface, which must be available.
        if nil == p.Tongue {
                return errors.New("Missing Tongue to catch Snowflakes with.")
        }
        connection, err := p.Tongue.Catch()
-  if nil == connection || nil != err {
-    return err
-  }
-  // Use the same rate-limited traffic logger to keep consistency.
-       connection.BytesLogger = p.BytesLogger
+       if nil == connection || nil != err {
+               return err
+       }
+       // Track new valid Snowflake in internal collection and pass along.
+       p.activePeers.PushBack(connection)
        p.snowflakeChan <- connection
        return nil
 }
 
 // As part of |SnowflakeCollector| interface.
 func (p *Peers) Pop() *webRTCConn {
-  // Blocks until an available snowflake appears.
+       // Blocks until an available snowflake appears.
        snowflake, ok := <-p.snowflakeChan
        if !ok {
                return nil
        }
-       p.current = snowflake
+       // Set to use the same rate-limited traffic logger to keep consistency.
        snowflake.BytesLogger = p.BytesLogger
        return snowflake
 }
 
-// Close all remote peers.
-func (p *Peers) End() {
-       log.Printf("WebRTC: interruped")
-       if nil != p.current {
-               p.current.Close()
+// Returns total available Snowflakes (including the active one)
+// The count only reduces when connections themselves close, rather than when
+// they are popped.
+func (p *Peers) Count() int {
+       p.purgeClosedPeers()
+       return p.activePeers.Len()
+}
+
+func (p *Peers) purgeClosedPeers() {
+       for e := p.activePeers.Front(); e != nil; {
+               next := e.Next()
+               conn := e.Value.(*webRTCConn)
+               // Purge those marked for deletion.
+               if conn.closed {
+                       p.activePeers.Remove(e)
+               }
+               e = next
        }
-       for r := range p.snowflakeChan {
-               r.Close()
+}
+
+// Close all Peers contained here.
+func (p *Peers) End() {
+       log.Printf("WebRTC: Ending all peer connections.")
+       for e := p.activePeers.Front(); e != nil; e = e.Next() {
+               conn := e.Value.(*webRTCConn)
+               conn.Close()
        }
 }
diff --git a/client/snowflake.go b/client/snowflake.go
index f8edc2a..aa0e470 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -61,6 +61,7 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
                        continue
                }
                // Successful collection gets rate limited to once per second.
+               log.Println("ConnectLoop success.")
                <-time.After(time.Second)
        }
 }
@@ -68,10 +69,10 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
 // Accept local SOCKS connections and pass them to the handler.
 func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
        defer ln.Close()
+       log.Println("Started SOCKS listener.")
        for {
-               log.Println("SOCKS listening...", ln)
                conn, err := ln.AcceptSocks()
-               log.Println("accepting", conn, err)
+               log.Println("SOCKS accepted ", conn.Req)
                if err != nil {
                        if e, ok := err.(net.Error); ok && e.Temporary() {
                                continue
@@ -138,7 +139,8 @@ func readSignalingMessages(f *os.File) {
 
 func main() {
        webrtc.SetLoggingVerbosity(1)
-       logFile, err := os.OpenFile("snowflake.log", 
os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
+       logFile, err := os.OpenFile("snowflake.log",
+               os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
        if err != nil {
                log.Fatal(err)
        }
diff --git a/client/webrtc.go b/client/webrtc.go
index 2466a1d..87a1d19 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -18,9 +18,10 @@ type WebRTCDialer struct {
 }
 
 func NewWebRTCDialer(broker *BrokerChannel) *WebRTCDialer {
+       config := webrtc.NewConfiguration(iceServers...)
        return &WebRTCDialer{
-               broker,
-               webrtc.NewConfiguration(iceServers...),
+               BrokerChannel: broker,
+               webrtcConfig:  config,
        }
 }
 



_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to