commit 791f6925ec749a28ad95c76325f802bc4de2d75c
Author: Serene Han <[email protected]>
Date:   Tue Feb 16 20:50:00 2016 -0800

    Simplify proxy poll handler, and broker match test
---
 broker/broker.go                | 89 ++++++++++++++++++++++-------------------
 broker/snowflake-broker_test.go | 25 +++++++++---
 broker/snowflake-heap.go        |  4 ++
 3 files changed, 70 insertions(+), 48 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index 69b8369..9e5ee30 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -28,17 +28,17 @@ type BrokerContext struct {
        snowflakes *SnowflakeHeap
        // Map keeping track of snowflakeIDs required to match SDP answers from
        // the second http POST.
-       snowflakeMap  map[string]*Snowflake
-       createChannel chan *ProxyRequest
+       snowflakeMap map[string]*Snowflake
+       proxyPolls   chan *ProxyPoll
 }
 
 func NewBrokerContext() *BrokerContext {
        snowflakes := new(SnowflakeHeap)
        heap.Init(snowflakes)
        return &BrokerContext{
-               snowflakes:    snowflakes,
-               snowflakeMap:  make(map[string]*Snowflake),
-               createChannel: make(chan *ProxyRequest),
+               snowflakes:   snowflakes,
+               snowflakeMap: make(map[string]*Snowflake),
+               proxyPolls:   make(chan *ProxyPoll),
        }
 }
 
@@ -51,46 +51,58 @@ func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, 
r *http.Request) {
        sh.h(sh.BrokerContext, w, r)
 }
 
-type ProxyRequest struct {
-       id        string
-       offerChan chan []byte
+// Proxies may poll for client offers concurrently.
+type ProxyPoll struct {
+       id           string
+       offerChannel chan []byte
 }
 
-// Create and add a Snowflake to the heap.
-func (sc *BrokerContext) AddSnowflake(id string) *Snowflake {
-       snowflake := new(Snowflake)
-       snowflake.id = id
-       snowflake.clients = 0
-       snowflake.offerChannel = make(chan []byte)
-       snowflake.answerChannel = make(chan []byte)
-       heap.Push(sc.snowflakes, snowflake)
-       sc.snowflakeMap[id] = snowflake
-       return snowflake
+// Registers a Snowflake and waits for some Client to send an offer,
+// as part of the polling logic of the proxy handler.
+func (ctx *BrokerContext) RequestOffer(id string) []byte {
+       request := new(ProxyPoll)
+       request.id = id
+       request.offerChannel = make(chan []byte)
+       ctx.proxyPolls <- request
+       // Block until an offer is available...
+       offer := <-request.offerChannel
+       return offer
 }
 
-// Match proxies to clients.
-// func (ctx *BrokerContext) Broker(proxies <-chan *ProxyRequest) {
+// goroutine which match proxies to clients.
+// Safely processes proxy requests, responding to them with either an available
+// client offer or nil on timeout / none are available.
 func (ctx *BrokerContext) Broker() {
-       // for p := range proxies {
-       for p := range ctx.createChannel {
-               snowflake := ctx.AddSnowflake(p.id)
-               // Wait for a client to avail an offer to the snowflake, or 
timeout
-               // and ask the snowflake to poll later.
-               go func(p *ProxyRequest) {
+       for request := range ctx.proxyPolls {
+               snowflake := ctx.AddSnowflake(request.id)
+               // Wait for a client to avail an offer to the snowflake.
+               go func(request *ProxyPoll) {
                        select {
                        case offer := <-snowflake.offerChannel:
-                               log.Println("Passing client offer to 
snowflake.")
-                               p.offerChan <- offer
+                               log.Println("Passing client offer to snowflake 
proxy.")
+                               request.offerChannel <- offer
                        case <-time.After(time.Second * ProxyTimeout):
                                // This snowflake is no longer available to 
serve clients.
                                heap.Remove(ctx.snowflakes, snowflake.index)
                                delete(ctx.snowflakeMap, snowflake.id)
-                               p.offerChan <- nil
+                               request.offerChannel <- nil
                        }
-               }(p)
+               }(request)
        }
 }
 
+// Create and add a Snowflake to the heap.
+func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake {
+       snowflake := new(Snowflake)
+       snowflake.id = id
+       snowflake.clients = 0
+       snowflake.offerChannel = make(chan []byte)
+       snowflake.answerChannel = make(chan []byte)
+       heap.Push(ctx.snowflakes, snowflake)
+       ctx.snowflakeMap[id] = snowflake
+       return snowflake
+}
+
 func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "text/plain; charset=utf-8")
        w.Write([]byte("User-agent: *\nDisallow:\n"))
@@ -145,14 +157,15 @@ func clientHandler(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
        case answer := <-snowflake.answerChannel:
                log.Println("Client: Retrieving answer")
                w.Write(answer)
-               // Only remove from the snowflake map once the answer is set.
-               delete(ctx.snowflakeMap, snowflake.id)
 
        case <-time.After(time.Second * ClientTimeout):
                log.Println("Client: Timed out.")
                w.WriteHeader(http.StatusGatewayTimeout)
                w.Write([]byte("timed out waiting for answer!"))
        }
+       // Remove from the snowflake map whether answer was sent or not, because
+       // this client request is now over.
+       delete(ctx.snowflakeMap, snowflake.id)
 }
 
 /*
@@ -172,17 +185,9 @@ func proxyHandler(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
        if string(body) != id { // Mismatched IDs!
                w.WriteHeader(http.StatusBadRequest)
        }
-       // Maybe confirm that X-Session-ID is the same.
        log.Println("Received snowflake: ", id)
-
-       p := new(ProxyRequest)
-       p.id = id
-       p.offerChan = make(chan []byte)
-       ctx.createChannel <- p
-
-       // Wait for a client to avail an offer to the snowflake, or timeout
-       // and ask the snowflake to poll later.
-       offer := <-p.offerChan
+       // Wait for a client to avail an offer to the snowflake, or timeout if 
nil.
+       offer := ctx.RequestOffer(id)
        if nil == offer {
                log.Println("Proxy " + id + " did not receive a Client offer.")
                w.WriteHeader(http.StatusGatewayTimeout)
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index ee984b0..b9432d8 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -22,6 +22,19 @@ func TestBroker(t *testing.T) {
                        So(len(ctx.snowflakeMap), ShouldEqual, 1)
                })
 
+               Convey("Broker goroutine matches clients with proxies", func() {
+                       p := new(ProxyPoll)
+                       p.id = "test"
+                       p.offerChannel = make(chan []byte)
+                       go func() {
+                               ctx.proxyPolls <- p
+                               close(ctx.proxyPolls)
+                       }()
+                       ctx.Broker()
+                       So(ctx.snowflakes.Len(), ShouldEqual, 1)
+                       So(ctx.snowflakes.Len(), ShouldEqual, 1)
+               })
+
                Convey("Responds to client offers...", func() {
                        w := httptest.NewRecorder()
                        data := bytes.NewReader([]byte("test"))
@@ -83,9 +96,9 @@ func TestBroker(t *testing.T) {
                                        done <- true
                                }(ctx)
                                // Pass a fake client offer to this proxy
-                               p := <-ctx.createChannel
+                               p := <-ctx.proxyPolls
                                So(p.id, ShouldEqual, "test")
-                               p.offerChan <- []byte("fake offer")
+                               p.offerChannel <- []byte("fake offer")
                                <-done
                                So(w.Code, ShouldEqual, http.StatusOK)
                                So(w.Body.String(), ShouldEqual, "fake offer")
@@ -96,10 +109,10 @@ func TestBroker(t *testing.T) {
                                        proxyHandler(ctx, w, r)
                                        done <- true
                                }(ctx)
-                               p := <-ctx.createChannel
+                               p := <-ctx.proxyPolls
                                So(p.id, ShouldEqual, "test")
                                // nil means timeout
-                               p.offerChan <- nil
+                               p.offerChannel <- nil
                                <-done
                                So(w.Body.String(), ShouldEqual, "")
                                So(w.Code, ShouldEqual, 
http.StatusGatewayTimeout)
@@ -159,12 +172,12 @@ func TestBroker(t *testing.T) {
                }()
 
                // Manually do the Broker goroutine action here for full 
control.
-               p := <-ctx.createChannel
+               p := <-ctx.proxyPolls
                So(p.id, ShouldEqual, "test")
                s := ctx.AddSnowflake(p.id)
                go func() {
                        offer := <-s.offerChannel
-                       p.offerChan <- offer
+                       p.offerChannel <- offer
                }()
                So(ctx.snowflakeMap["test"], ShouldNotBeNil)
 
diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go
index d37228f..cf249fe 100644
--- a/broker/snowflake-heap.go
+++ b/broker/snowflake-heap.go
@@ -4,6 +4,10 @@ Keeping track of pending available snowflake proxies.
 
 package snowflake_broker
 
+/*
+The Snowflake struct contains a single interaction
+over the offer and answer channels.
+*/
 type Snowflake struct {
        id            string
        offerChannel  chan []byte



_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to