commit 7187f1009ef7aaae6aa557fe1f724aa1df718b24
Author: Cecylia Bocovich <coh...@torproject.org>
Date:   Mon Jan 25 13:01:37 2021 -0500

    Log a throughput summary for each connection
    
    This will increase transparency for people running standalone proxies
    and help us debug any potential issues with proxies behaving unreliably.
---
 proxy/snowflake.go |  6 ++++
 proxy/util.go      | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git a/proxy/snowflake.go b/proxy/snowflake.go
index 1bc21ab..86ae0b2 100644
--- a/proxy/snowflake.go
+++ b/proxy/snowflake.go
@@ -118,6 +118,8 @@ type webRTCConn struct {
 
        lock sync.Mutex // Synchronization for DataChannel destruction
        once sync.Once  // Synchronization for PeerConnection destruction
+
+       bytesLogger BytesLogger
 }
 
 func (c *webRTCConn) Read(b []byte) (int, error) {
@@ -125,6 +127,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 }
 
 func (c *webRTCConn) Write(b []byte) (int, error) {
+       c.bytesLogger.AddInbound(len(b))
        c.lock.Lock()
        defer c.lock.Unlock()
        if c.dc != nil {
@@ -368,6 +371,7 @@ func makePeerConnectionFromOffer(sdp 
*webrtc.SessionDescription,
 
                pr, pw := io.Pipe()
                conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+               conn.bytesLogger = NewBytesSyncLogger()
 
                dc.OnOpen(func() {
                        log.Println("OnOpen channel")
@@ -376,6 +380,7 @@ func makePeerConnectionFromOffer(sdp 
*webrtc.SessionDescription,
                        conn.lock.Lock()
                        defer conn.lock.Unlock()
                        log.Println("OnClose channel")
+                       log.Println(conn.bytesLogger.ThroughputSummary())
                        conn.dc = nil
                        dc.Close()
                        pw.Close()
@@ -388,6 +393,7 @@ func makePeerConnectionFromOffer(sdp 
*webrtc.SessionDescription,
                                        log.Printf("close with error generated 
an error: %v", inerr)
                                }
                        }
+                       conn.bytesLogger.AddOutbound(n)
                        if n != len(msg.Data) {
                                panic("short write")
                        }
diff --git a/proxy/util.go b/proxy/util.go
new file mode 100644
index 0000000..d737056
--- /dev/null
+++ b/proxy/util.go
@@ -0,0 +1,84 @@
+package main
+
+import (
+       "fmt"
+       "time"
+)
+
+type BytesLogger interface {
+       AddOutbound(int)
+       AddInbound(int)
+       ThroughputSummary() string
+}
+
+// Default BytesLogger does nothing.
+type BytesNullLogger struct{}
+
+func (b BytesNullLogger) AddOutbound(amount int)    {}
+func (b BytesNullLogger) AddInbound(amount int)     {}
+func (b BytesNullLogger) ThroughputSummary() string { return "" }
+
+// BytesSyncLogger uses channels to safely log from multiple sources with 
output
+// occuring at reasonable intervals.
+type BytesSyncLogger struct {
+       outboundChan, inboundChan              chan int
+       outbound, inbound, outEvents, inEvents int
+       start                                  time.Time
+}
+
+// NewBytesSyncLogger returns a new BytesSyncLogger and starts it loggin.
+func NewBytesSyncLogger() *BytesSyncLogger {
+       b := &BytesSyncLogger{
+               outboundChan: make(chan int, 5),
+               inboundChan:  make(chan int, 5),
+       }
+       go b.log()
+       b.start = time.Now()
+       return b
+}
+
+func (b *BytesSyncLogger) log() {
+       for {
+               select {
+               case amount := <-b.outboundChan:
+                       b.outbound += amount
+                       b.outEvents++
+               case amount := <-b.inboundChan:
+                       b.inbound += amount
+                       b.inEvents++
+               }
+       }
+}
+
+func (b *BytesSyncLogger) AddOutbound(amount int) {
+       b.outboundChan <- amount
+}
+
+func (b *BytesSyncLogger) AddInbound(amount int) {
+       b.inboundChan <- amount
+}
+
+func (b *BytesSyncLogger) ThroughputSummary() string {
+       var inUnit, outUnit string
+       units := []string{"B", "KB", "MB", "GB"}
+
+       inbound := b.inbound
+       outbound := b.outbound
+
+       for i, u := range units {
+               inUnit = u
+               if (inbound < 1000) || (i == len(units)-1) {
+                       break
+               }
+               inbound = inbound / 1000
+       }
+       for i, u := range units {
+               outUnit = u
+               if (outbound < 1000) || (i == len(units)-1) {
+                       break
+               }
+               outbound = outbound / 1000
+       }
+       t := time.Now()
+       return fmt.Sprintf("Traffic throughput (up|down): %d %s|%d %s -- (%d 
OnMessages, %d Sends, over %d seconds)", inbound, inUnit, outbound, outUnit, 
b.outEvents, b.inEvents, int(t.Sub(b.start).Seconds()))
+}

_______________________________________________
tor-commits mailing list
tor-commits@lists.torproject.org
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to