commit dccc15a6e9d620298f77fb7ae14692723b434306
Author: Cecylia Bocovich <[email protected]>
Date:   Fri Nov 22 17:15:06 2019 -0500

    Add synchronization to prevent race in broker
    
    There's a race condition in the broker where both the proxy and the
    client processes try to pop/remove the same snowflake from the heap.
    This patch adds synchronization to prevent simultaneous accesses to
    snowflakes.
---
 broker/broker.go | 34 +++++++++++++++++++++++++++-------
 1 file changed, 27 insertions(+), 7 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index c166f1a..a5b0edf 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -18,6 +18,7 @@ import (
        "os"
        "os/signal"
        "strings"
+       "sync"
        "syscall"
        "time"
 
@@ -37,6 +38,8 @@ type BrokerContext struct {
        // Map keeping track of snowflakeIDs required to match SDP answers from
        // the second http POST.
        idToSnowflake map[string]*Snowflake
+       // Synchronization for the
+       snowflakeLock sync.Mutex
        proxyPolls    chan *ProxyPoll
        metrics       *Metrics
 }
@@ -127,10 +130,13 @@ func (ctx *BrokerContext) Broker() {
                                request.offerChannel <- offer
                        case <-time.After(time.Second * ProxyTimeout):
                                // This snowflake is no longer available to 
serve clients.
-                               // TODO: Fix race using a delete channel
-                               heap.Remove(ctx.snowflakes, snowflake.index)
-                               delete(ctx.idToSnowflake, snowflake.id)
-                               request.offerChannel <- nil
+                               ctx.snowflakeLock.Lock()
+                               defer ctx.snowflakeLock.Unlock()
+                               if snowflake.index != -1 {
+                                       heap.Remove(ctx.snowflakes, 
snowflake.index)
+                                       delete(ctx.idToSnowflake, snowflake.id)
+                                       close(request.offerChannel)
+                               }
                        }
                }(request)
        }
@@ -146,7 +152,9 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType 
string) *Snowflake {
        snowflake.proxyType = proxyType
        snowflake.offerChannel = make(chan []byte)
        snowflake.answerChannel = make(chan []byte)
+       ctx.snowflakeLock.Lock()
        heap.Push(ctx.snowflakes, snowflake)
+       ctx.snowflakeLock.Unlock()
        ctx.idToSnowflake[id] = snowflake
        return snowflake
 }
@@ -215,15 +223,19 @@ func clientOffers(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
                return
        }
        // Immediately fail if there are no snowflakes available.
-       if ctx.snowflakes.Len() <= 0 {
+       ctx.snowflakeLock.Lock()
+       numSnowflakes := ctx.snowflakes.Len()
+       ctx.snowflakeLock.Unlock()
+       if numSnowflakes <= 0 {
                ctx.metrics.clientDeniedCount++
                w.WriteHeader(http.StatusServiceUnavailable)
                return
        }
        // Otherwise, find the most available snowflake proxy, and pass the 
offer to it.
        // Delete must be deferred in order to correctly process answer request 
later.
+       ctx.snowflakeLock.Lock()
        snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
-       defer delete(ctx.idToSnowflake, snowflake.id)
+       ctx.snowflakeLock.Unlock()
        snowflake.offerChannel <- offer
 
        // Wait for the answer to be returned on the channel or timeout.
@@ -243,6 +255,10 @@ func clientOffers(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
                        log.Printf("unable to write timeout error, failed with 
error: %v", err)
                }
        }
+
+       ctx.snowflakeLock.Lock()
+       delete(ctx.idToSnowflake, snowflake.id)
+       ctx.snowflakeLock.Unlock()
 }
 
 /*
@@ -266,7 +282,9 @@ func proxyAnswers(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
        }
 
        var success = true
+       ctx.snowflakeLock.Lock()
        snowflake, ok := ctx.idToSnowflake[id]
+       ctx.snowflakeLock.Unlock()
        if !ok || nil == snowflake {
                // The snowflake took too long to respond with an answer, so 
its client
                // disappeared / the snowflake is no longer recognized by the 
Broker.
@@ -287,9 +305,10 @@ func proxyAnswers(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
 }
 
 func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
-       s := fmt.Sprintf("current snowflakes available: %d\n", 
ctx.snowflakes.Len())
 
        var webexts, browsers, standalones, unknowns int
+       ctx.snowflakeLock.Lock()
+       s := fmt.Sprintf("current snowflakes available: %d\n", 
len(ctx.idToSnowflake))
        for _, snowflake := range ctx.idToSnowflake {
                if snowflake.proxyType == "badge" {
                        browsers++
@@ -302,6 +321,7 @@ func debugHandler(ctx *BrokerContext, w 
http.ResponseWriter, r *http.Request) {
                }
 
        }
+       ctx.snowflakeLock.Unlock()
        s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
        s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
        s += fmt.Sprintf("\n\twebext proxies: %d", webexts)



_______________________________________________
tor-commits mailing list
[email protected]
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to