commit e7a7dce049904664f0c1d7ba99ff76779c23e9ea
Author: Tom Ritter <t...@ritter.vg>
Date:   Fri Apr 14 23:42:30 2017 -0500

    Ingest historical bwauth statistics data
    
    Update the parseOldConsensuses.py script to ingest historical
    bwauth data. Note that this script is generally run one time
    and is not intended to be a well-maintained script for future use.
    It needs care and feeding for each major run.
    
    Additionally create a mergeDatabase script. This script...
    probably does not do what we want. It's included mostly as a
    placeholder for future development if we want to correct and use
    it in the future.
---
 mergeDatabases.py      |  66 ++++++++++++++++
 parseOldConsensuses.py | 208 ++++++++++++++++++++++++++++++++++++++++++++-----
 utility.py             |   8 ++
 3 files changed, 263 insertions(+), 19 deletions(-)

diff --git a/mergeDatabases.py b/mergeDatabases.py
new file mode 100755
index 0000000..ac025f5
--- /dev/null
+++ b/mergeDatabases.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import time
+import sqlite3
+import datetime
+import operator
+import traceback
+import subprocess
+
+if __name__ == '__main__':
+       if len(sys.argv) != 3:
+               print "Usage: ", sys.argv[0], "src.db dest.db"
+               print "\tMerge all the data from src into dest"
+               sys.exit(1)
+
+       if not os.path.isfile(sys.argv[1]):
+               print "Source is not a file"
+               sys.exit(1)
+       if not os.path.isfile(sys.argv[2]):
+               print "Dest is not a file"
+               sys.exit(1)
+
+       src = sqlite3.connect(sys.argv[1])
+       dst = sqlite3.connect(sys.argv[2])
+
+       s_tbls = src.execute("SELECT name FROM sqlite_master WHERE type = 
'table'")
+       for t in s_tbls:
+               t = t[0]
+               skip_table = False
+
+               d_tbl = dst.execute("SELECT name FROM sqlite_master WHERE type 
= 'table' and name = ?", (t,))
+               if not d_tbl.fetchone():
+                       print "Skipping table", t, "which is in src but not in 
dst"
+                       continue
+
+               s_cols = src.execute("PRAGMA table_info(" + t + ")")
+               d_cols = dst.execute("PRAGMA table_info(" + t + ")")
+               s_cols = s_cols.fetchall()
+               d_cols = d_cols.fetchall()
+               if len(s_cols) != len(d_cols):
+                       print "Skipping table", t, "which has", len(s_cols), 
"columns in src and", len(d_cols)
+                       continue
+               for i in range(len(s_cols)):
+                       if s_cols[i] != d_cols[i]:
+                               print "Skipping table", t, "because column", 1, 
"is", s_cols[i], "in src and", d_cols[i], "in dst"
+                               skip_table = True
+               
+               if skip_table:
+                       continue
+
+               print "Merging table", t
+               merged = 0
+               s = src.execute("SELECT * FROM " + t)
+               for r in s.fetchall():
+                       date = r[0]
+                       has_value = False
+                       for v in r[1:]:
+                               if v:
+                                       has_value = True
+                       if has_value:
+                               merged += 1
+                               dst.execute("INSERT OR REPLACE INTO " + t + " 
VALUES (" + ",".join("?" * len(r)) + ")", r)
+                               dst.commit()
+               print "Inserted or updated", merged, "rows"
\ No newline at end of file
diff --git a/parseOldConsensuses.py b/parseOldConsensuses.py
index 834c386..defa2f5 100755
--- a/parseOldConsensuses.py
+++ b/parseOldConsensuses.py
@@ -18,6 +18,7 @@ import stem.util.conf
 import stem.util.enum
 
 from stem import Flag
+from stem.descriptor.reader import DescriptorReader
 from stem.util.lru_cache import lru_cache
 
 def get_dirauths_in_tables():
@@ -48,17 +49,23 @@ def get_dirauth_from_filename(filename):
                return "tor26"
        elif key == "0232AF901C31A04EE9848595AF9BB7620D4C5B2E" or key == 
"585769C78764D58426B8B52B6651A5A71137189A":
                return "dannenberg"
-        elif key == "27B6B5996C426270A5C95488AA5BCEB6BCC86956":
-                return "turtles"
+       elif key == "27B6B5996C426270A5C95488AA5BCEB6BCC86956":
+               return "turtles"
        else:
                raise Exception("Unexpcected dirauth key: " + key + " " + 
filename)
 
 def unix_time(dt):
     return (dt - datetime.datetime.utcfromtimestamp(0)).total_seconds() * 
1000.0
 
+def ut_to_datetime(ut):
+       return datetime.datetime.utcfromtimestamp(ut / 1000)
+
+def ut_to_datetime_format(ut):
+       return ut_to_datetime(ut).strftime("%Y-%m-%d-%H-%M-%S")
+
 def get_time_from_filename(filename):
        voteTime = filename.split('-')
-       if len(voteTime) < 9:
+       if len(voteTime) < 7:
                raise Exception("Strange filename: " + filename)
 
        v = [int(x) for x in filename.split('-')[0:6]]
@@ -66,26 +73,30 @@ def get_time_from_filename(filename):
        voteTime = unix_time(voteTime)
        return voteTime
 
-def main(dir):
-       dirAuths = get_dirauths_in_tables()
-       dbc = sqlite3.connect(os.path.join('data', 'historical.db'))
-
+def dirauth_relay_votes(directory, dirAuths, dbc):
        dirauth_columns = ""
        dirauth_columns_questions = ""
        for d in dirAuths:
                dirauth_columns += d + "_known integer, " + d + "_running 
integer, " + d + "_bwauth integer, "
                dirauth_columns_questions += ",?,?,?"
 
+       dbc.execute("CREATE TABLE IF NOT EXISTS vote_data(date integer, " + 
dirauth_columns + "PRIMARY KEY(date ASC))")
+       dbc.commit()
+
        votes = {}
-       for root, dirs, files in os.walk(dir):
+       for root, dirs, files in os.walk(directory):
                for f in files:
-                        filepath = os.path.join(root, f)
-                        print filepath
+                       filepath = os.path.join(root, f)
+                       print filepath
 
                        if '"' in f:
                                raise Exception("Potentially malicious 
filename")
-                        elif "votes-" in f and ".tar" in f:
-                                continue
+                       elif "votes-" in f and ".tar" in f:
+                               continue
+                       elif "consensuses-" in f and ".tar" in f:
+                               continue
+                       elif "-vote-" not in f:
+                               continue
 
                        voteTime = get_time_from_filename(f)
                        if voteTime not in votes:
@@ -104,11 +115,8 @@ def main(dir):
                        votes[voteTime][dirauth]['running'] = 
int(subprocess.check_output('egrep "^s " "' + filepath + '" | grep " Running" | 
wc -l', shell=True))
                        votes[voteTime][dirauth]['bwlines'] = 
int(subprocess.check_output('grep Measured= "' + filepath + '" | wc -l', 
shell=True))
 
-       dbc.execute("CREATE TABLE IF NOT EXISTS vote_data(date integer, " + 
dirauth_columns + "PRIMARY KEY(date ASC))")
-       dbc.commit()
-
        for t in votes:
-               print t
+               print ut_to_datetime(t)
                print "\t", len(votes[t])
                for d in votes[t]:
                        print "\t", d, votes[t][d]['bwlines'], 
votes[t][d]['running']
@@ -127,13 +135,175 @@ def main(dir):
                dbc.execute("INSERT OR REPLACE INTO vote_data VALUES (?" + 
dirauth_columns_questions + ")", insertValues)
                dbc.commit()
 
+def bwauth_measurements(directory, dirAuths, dbc):
+       #Find all the consensuses and votesrm
+       votes = {}
+       consensuses = {}
+       for root, dirs, files in os.walk(directory):
+               for f in files:
+                       filepath = os.path.join(root, f)
+
+                       if '"' in f:
+                               raise Exception("Potentially malicious 
filename")
+                       elif "votes-" in f and ".tar" in f:
+                               continue
+                       elif "consensuses-" in f and ".tar" in f:
+                               continue
+
+                       if "-consensus" in f:
+                               consensusTime = get_time_from_filename(f)
+                               if consensusTime not in consensuses:
+                                       consensuses[consensusTime] = filepath
+                               else:
+                                       print "Found two consensuses with the 
same time:", ut_to_datetime(consensusTime)
+
+                               #print "Consensus:", filepath
+                       elif "-vote-" in f:
+                               voteTime = get_time_from_filename(f)
+
+                               # Test to see if we already processed this one
+                               cur = dbc.cursor()
+                               cur.execute("SELECT * FROM bwauth_data WHERE 
date = ?", (voteTime,))
+                               if cur.fetchone():
+                                       print "Skipping", f, "because we 
already processed it"
+                                       continue
+                               elif voteTime not in votes:
+                                       votes[voteTime] = {}
+
+                               dirauth = get_dirauth_from_filename(f)
+
+                               if dirauth not in dirAuths:
+                                       raise Exception("Found a dirauth I 
don't know about (probably spelling): " + dirauth)
+                               elif dirauth not in votes[voteTime]:
+                                       votes[voteTime][dirauth] = filepath
+                               else:
+                                       print "Found two votes for dirauth " + 
dirauth + ":", filepath, "and", votes[voteTime][dirauth]
+
+                               #print "Vote:", dirauth, filepath
+
+       #Make sure we have a consensus for each vote
+       to_del = []
+       for v in votes:
+               if v not in consensuses:
+                       print "Have votes for time", ut_to_datetime(v), "but no 
consensus!"
+                       to_del.append(v)
+                       #sys.exit(1)
+       for i in to_del:
+               del votes[i]
+
+       #Make the table
+       bwauth_columns = ""
+       bwauth_columns_questions = ""
+       for d in dirAuths:
+               bwauth_columns += d + "_above integer, " + d + "_shared 
integer, " + d + "_exclusive integer, " + d + "_below integer, " + d + 
"_unmeasured integer, "
+               bwauth_columns_questions += ",?,?,?,?,?"
+
+       dbc.execute("CREATE TABLE IF NOT EXISTS bwauth_data(date integer, " + 
bwauth_columns + "PRIMARY KEY(date ASC))")
+       dbc.commit()
+
+       reviewed = 0
+       for v in votes:
+               reviewed += 1
+               print "Reviewing", consensuses[v], "(" + str(reviewed) + "/" + 
str(len(votes)) + ")"
+
+               #Get the consensus data
+               consensusRouters = {}
+               with DescriptorReader(consensuses[v]) as reader:
+                       reader.register_skip_listener(my_listener)
+                       for relay in reader:
+                               consensusRouters[relay.fingerprint] = 
"Unmeasured" if relay.is_unmeasured else relay.bandwidth
+               
+               #The vote data
+               bwauthVotes = {}
+               for d in votes[v]:
+                       if d not in bwauthVotes:
+                               bwauthVotes[d] = {}
+
+                       measured_something = False
+                       with DescriptorReader(votes[v][d]) as reader:
+                               reader.register_skip_listener(my_listener)
+                               for relay in reader:
+                                       if relay.measured:
+                                               
bwauthVotes[d][relay.fingerprint] = relay.measured
+                                               measured_something = True
+                       if not measured_something:
+                               del bwauthVotes[d]
+
+               #Now match them up and store the data
+               thisConsensusResults = {}
+               for r in consensusRouters:
+                       for d in bwauthVotes:
+                               had_any_value = False
+                               if d not in thisConsensusResults:
+                                       thisConsensusResults[d] = {'unmeasured' 
: 0, 'above' : 0, 'below' : 0, 'exclusive' : 0 , 'shared' : 0}
+
+                               if consensusRouters[r] == "Unmeasured":
+                                       continue
+                               elif r not in bwauthVotes[d]:
+                                       had_any_value = True
+                                       thisConsensusResults[d]['unmeasured'] 
+= 1
+                               elif consensusRouters[r] < bwauthVotes[d][r]:
+                                       had_any_value = True
+                                       thisConsensusResults[d]['above'] += 1
+                               elif consensusRouters[r] > bwauthVotes[d][r]:
+                                       had_any_value = True
+                                       thisConsensusResults[d]['below'] += 1
+                               elif consensusRouters[r] == bwauthVotes[d][r] 
and \
+                                        1 == len([1 for d_i in bwauthVotes if 
d_i in bwauthVotes and r in bwauthVotes[d_i] and bwauthVotes[d_i][r] == 
consensusRouters[r]]):
+                                       had_any_value = True
+                                       thisConsensusResults[d]['exclusive'] += 
1
+                               elif consensusRouters[r] == bwauthVotes[d][r] 
and \
+                                        1 != len([1 for d_i in bwauthVotes if 
d_i in bwauthVotes and r in bwauthVotes[d_i] and bwauthVotes[d_i][r] == 
consensusRouters[r] ]):
+                                       had_any_value = True
+                                       thisConsensusResults[d]['shared'] += 1
+                               else:
+                                       print "What case am I in???"
+                                       sys.exit(1)
+
+                               if not had_any_value:
+                                       del thisConsensusResults[d]
+
+               insertValues = [v]
+               for d in dirAuths: 
+                       if d in thisConsensusResults:
+                               
insertValues.append(thisConsensusResults[d]['above'])
+                               
insertValues.append(thisConsensusResults[d]['shared'])
+                               
insertValues.append(thisConsensusResults[d]['exclusive'])
+                               
insertValues.append(thisConsensusResults[d]['below'])
+                               
insertValues.append(thisConsensusResults[d]['unmeasured'])
+                       else:
+                               insertValues.append(None)
+                               insertValues.append(None)
+                               insertValues.append(None)
+                               insertValues.append(None)
+                               insertValues.append(None)
+               
+               dbc.execute("INSERT OR REPLACE INTO bwauth_data VALUES (?" + 
bwauth_columns_questions + ")", insertValues)
+               dbc.commit()
+               
+def my_listener(path, exception):
+       print "Skipped!"
+       print path
+       print exception
+
+
+def main(itype, directory):
+       dirAuths = get_dirauths_in_tables()
+       dbc = sqlite3.connect(os.path.join('data', 'historical.db'))
+
+       if itype == "dirauth_relay_votes":
+               dirauth_relay_votes(directory, dirAuths, dbc)
+       elif itype == "bwauth_measurements":
+               bwauth_measurements(directory, dirAuths, dbc)
+       else:
+               print "Unknown ingestion type"
 
 if __name__ == '__main__':
        try:
-               if len(sys.argv) != 2:
-                       print "Usage: ", sys.argv[0], "vote-directory"
+               if len(sys.argv) != 3:
+                       print "Usage: ", sys.argv[0], "ingestion-type 
vote-directory"
                else:
-                       main(sys.argv[1])
+                       main(sys.argv[1], sys.argv[2])
        except:
                msg = "%s failed with:\n\n%s" % (sys.argv[0], 
traceback.format_exc())
                print "Error: %s" % msg
diff --git a/utility.py b/utility.py
index 6cebe6b..aeb7b8d 100644
--- a/utility.py
+++ b/utility.py
@@ -84,3 +84,11 @@ def _get_documents(label, resource):
 def unix_time(dt):
     return (dt - datetime.datetime.utcfromtimestamp(0)).total_seconds() * 
1000.0
 
+def ut_to_datetime(ut):
+       return datetime.datetime.utcfromtimestamp(ut / 1000)
+
+def ut_to_datetime_format(ut):
+       return consensus_datetime_format(ut_to_datetime(ut))
+
+def consensus_datetime_format(dt):
+       return dt.strftime("%Y-%m-%d-%H-%M-%S")



_______________________________________________
tor-commits mailing list
tor-commits@lists.torproject.org
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits

Reply via email to