Jcrespo has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/404647 )

Change subject: compare.py: Implement progress reporting, more than 2 servers 
comp.
......................................................................

compare.py: Implement progress reporting, more than 2 servers comp.

Now we have progress reporting (which can be disabled), and we
can compare more than 2 servers at a time, all in parallel.

Change-Id: Iad85358462e3196f5322765b03006a682d93329c
---
M wmfmariadbpy/compare.py
1 file changed, 57 insertions(+), 45 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/software/wmfmariadbpy 
refs/changes/47/404647/1

diff --git a/wmfmariadbpy/compare.py b/wmfmariadbpy/compare.py
index d50a501..3273a57 100755
--- a/wmfmariadbpy/compare.py
+++ b/wmfmariadbpy/compare.py
@@ -13,26 +13,27 @@
     containing them
     """
     parser = argparse.ArgumentParser(description='Compares the table contents 
between 2 WMF MySQL/MariaDB servers.')
-    parser.add_argument('host1', help='First instance, with format 
host1[:port]')
-    parser.add_argument('host2', help='Second instance, with format 
host2[:port]')
     parser.add_argument('database', help='Database to connect on both 
instances')
     parser.add_argument('table', help='Table to compare on both instances')
     parser.add_argument('column', help='Numeric id to loop on, normally an 
autoincrement field')
-    parser.add_argument('--step', type=int, default=1000, help='How many rows 
to compare each time. The larger number of rows, the faster the full comparison 
will be, but also more aggressive. Also, there is an upper limit due to 
group_concat_max_len, depending on the size of the rows. Default: 1000 rows')
+    parser.add_argument('--step', type=int, default=10000, help='How many rows 
to compare each time. The larger number of rows, the faster the full comparison 
will be, but also more aggressive. Also, there is an upper limit due to 
group_concat_max_len, depending on the size of the rows. Default: 10000 rows')
     parser.add_argument('--group_concat_max_len', type=int, default=10000000, 
help='Sets group_concat_max_len. Normally needed when increasing the "step" or 
the rows are very large. Default: 10MB')
     parser.add_argument('--from-value', type=int, dest='from_value', 
help='Start from given column value. Useful if only part of the table has to be 
scanned. Defaults to min column value for the table on the first given 
instance, at start.')
     parser.add_argument('--to-value', type=int, dest='to_value', help='Start 
from given column value. Useful if only part of the table has to be scanned. 
Defaults to max column value for the table on the first given instance, at 
start.')
     parser.add_argument('--order-by', dest='order_by', help='Ordering when 
doing the comparions. Useful when the column is not an unique field, and 
results could be returned out of order. By default, we order by column, which 
could give false positives if the column is not unique.')
-    parser.add_argument('--print-every', type=int, dest='print_every', 
default=100, help='How often output should print progress report. Default: 100 
queries/chunks.')
+    parser.add_argument('--print-every', type=int, dest='print_every', 
default=100, help='How often output should print progress report. Default: 100 
queries/chunks. Set to 0 to disable progress reporting.')
+    parser.add_argument('--threads', type=int, default=1, help='Parallelize 
requests on the given number of connections per server. By default, only 1 
connection is used per server.')
     parser.add_argument('--verbose', dest='verbose', action='store_true', 
help="Enables verbose logging, where all SQL commands sent to the server are 
sent to the standard output. Defaults to disabled.")
+    parser.add_argument('hosts', nargs='+', help='2 or more database 
instances, expresed with format host[:port]')
+
     return parser.parse_args()
 
 
-def connect_in_parallel(host1, host2, database):
-    pool = ThreadPool(processes=2)
+def connect_in_parallel(hosts, database, threads):
+    pool = ThreadPool(processes=len(hosts))
     async_result = dict()
     conn = list()
-    for host in (host1, host2):
+    for host in hosts:
         if ':' in host:
             # we do not support ipv6 yet
             host, port = host.split(':')
@@ -41,24 +42,27 @@
             port = 3306
         async_result[host] = pool.apply_async(WMFMariaDB, (host, port, 
database))
 
-    for host in (host1, host2):
+    for host in hosts:
         mysql = async_result[host].get()
         if mysql.connection is None:
-            sys.stderr.write("Could not connect to {}:{}\n".format(host))
+            sys.stderr.write("Could not connect to {}\n".format(host))
             sys.exit(-1)
         conn.append(mysql)
     pool.close()
     pool.join()
     return tuple(conn)
 
-def execute_in_parallel(conn1, conn2, query, verbose):
+def execute_in_parallel(connections, query, verbose):
+    pool = ThreadPool(processes=len(connections))
+    async_result = dict()
+    result = list()
     if verbose:
         print(query)
-    pool = ThreadPool(processes=2)
-    async_result1 = pool.apply_async(conn1.execute, (query, False))
-    async_result2 = pool.apply_async(conn2.execute, (query, False))
+    for conn in connections:
+        async_result[conn.host + ':' + str(conn.port)] = 
pool.apply_async(conn.execute, (query, False))
 
-    result = (async_result1.get(), async_result2.get())
+    for conn in connections:
+        result.append(async_result[conn.host + ':' + str(conn.port)].get())
     pool.close()
     pool.join()
     return result
@@ -67,23 +71,20 @@
 def main():
     options = parse_args()
 
-    if options.host1 == options.host2:
-        print('ERROR: You are trying to compare {} to itself, 
exiting.'.format(options.host1))
-        sys.exit(-1)
-    (conn1, conn2) = connect_in_parallel(options.host1, options.host2, 
options.database)
+    connections = connect_in_parallel(options.hosts, options.database, 
options.threads)
 
     # calulate ranges to perform comparison and check if tables are empty
     command = 'SELECT min({0}), max({0}) FROM {1}'.format(options.column, 
options.table)
-    (result1, result2) = execute_in_parallel(conn1, conn2, command, 
options.verbose)
+    results = execute_in_parallel(connections, command, options.verbose)
 
-    for result in (result1, result2):
+    for result in results:
         if not result['success']:
             print('ERROR: Minimum/maximum id could not be retrieved, exiting.')
             sys.exit(-1)
-    min_id1 = result1['rows'][0][0]
-    max_id1 = result1['rows'][0][1]
-    min_id2 = result2['rows'][0][0]
-    max_id2 = result2['rows'][0][1]
+    min_id1 = result['rows'][0][0]
+    max_id1 = result['rows'][0][1]
+    min_id2 = result['rows'][0][0]
+    max_id2 = result['rows'][0][1]
 
     if min_id1 is None and min_id2 is None and max_id1 is None and max_id2 is 
None:
         print('No rows found on both tables.')
@@ -109,16 +110,18 @@
         print('starting id is greater than ending id')
         sys.exit(-1)
 
-    print('Starting comparison between id {} and {}'.format(min_id, max_id))
+    if options.print_every != 0:
+        print('Starting comparison between id {} and {}'.format(min_id, 
max_id))
     # setup query for comparison
     ## TODO: make column(s) iterator
     command = 'DESCRIBE {}'.format(options.table)
-    (describe_result1, describe_result2) = execute_in_parallel(conn1, conn2, 
command, options.verbose)
-    if not describe_result1['success'] or not describe_result2['success']:
-        print('ERROR: Could not describe the table, exiting.')
-        sys.exit(-1)
+    describe_results = execute_in_parallel(connections, command, 
options.verbose)
+    for describe_result in describe_results:
+        if not result['success']:
+            print('ERROR: Could not describe the table, exiting.')
+            sys.exit(-1)
 
-    all_columns = ','.join({"IFNULL(" + x[0] + ", '\\0'),'|'" for x in 
describe_result1['rows']})
+    all_columns = ','.join({"IFNULL(" + x[0] + ", '\\0'),'|'" for x in 
describe_results[0]['rows']})
     if options.order_by is None or options.order_by == '':
         order_by = options.column
     else:
@@ -126,42 +129,51 @@
 
     # increase group_concat_max_len
     command = 'SET SESSION group_concat_max_len = 
{}'.format(options.group_concat_max_len)
-    execute_in_parallel(conn1, conn2, command, options.verbose)
+    execute_in_parallel(connections, command, options.verbose)
 
     # main comparison loop
     differences = 0
     iterations = 0
+    start_time = datetime.now()
     try:
         for lower_limit in range(min_id, max_id, options.step):
             upper_limit = lower_limit + options.step - 1
             if upper_limit > max_id:
                 upper_limit = max_id
 
-            iterations += 1
-            if iterations % options.print_every == 0:
-                print('{}: row id {}/{}, {} chunk(s) found 
different'.format(datetime.now().isoformat(),
-                                                                               
     lower_limit,
-                                                                               
     max_id,
-                                                                               
     differences))
+            if options.print_every != 0:
+                iterations += 1
+                if iterations % options.print_every == 0:
+                    speed = (lower_limit - min_id) / (datetime.now() - 
start_time).total_seconds()
+                    eta = int((max_id - lower_limit) / speed)
+                    print('{}: row id {}/{}, ETA: {:02}m{:02}s, {} chunk(s) 
found different'
+                          .format(datetime.now().isoformat(), lower_limit, 
max_id, eta//60, eta%60, differences))
 
             command = 'SELECT crc32(GROUP_CONCAT({4})) FROM {0} WHERE {1} 
BETWEEN {2} AND {3} ORDER BY {5}'.format(options.table, options.column, 
lower_limit, upper_limit, all_columns, order_by)
-            (result1, result2) = execute_in_parallel(conn1, conn2, command, 
options.verbose)
-            if not(result1['success'] and result2['success'] and 
result1['rows'][0][0] ==   result2['rows'][0][0]):
-                # chunk detected as different
-                print('DIFFERENCE: WHERE {} BETWEEN {} AND 
{}'.format(options.column, lower_limit, upper_limit))
+            results = execute_in_parallel(connections, command, 
options.verbose)
+            # only count each chunck once
+            difference_detected = False
+            for i in range(1, len(results)):
+                if not(results[0]['success'] and results[i]['success'] and 
results[0]['rows'][0][0] ==   results[i]['rows'][0][0]):
+                    # chunk detected as different
+                    print('DIFFERENCE on {}: WHERE {} BETWEEN {} AND 
{}'.format(connections[i].host + ':' + str(connections[i].port), 
options.column, lower_limit, upper_limit))
+                    difference_detected = True
+            if difference_detected:
                 differences = differences + 1
     except KeyboardInterrupt:
         print('Ctrl-c pressed ...')
         sys.exit(1)
 
-    conn1.disconnect()
-    conn2.disconnect()
+    for connection in connections:
+        connection.disconnect()
 
     if differences == 0:
-        print('Execution ended, no differences found.')
+        if options.print_every != 0:
+            print('Execution ended, no differences found.')
         sys.exit(0)
     else:
-        print("Execution ended, a total of {} chunk(s) are 
different.".format(differences))
+        if options.print_every != 0:
+            print("Execution ended, a total of {} chunk(s) are 
different.".format(differences))
         sys.exit(1)
 
 

-- 
To view, visit https://gerrit.wikimedia.org/r/404647
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iad85358462e3196f5322765b03006a682d93329c
Gerrit-PatchSet: 1
Gerrit-Project: operations/software/wmfmariadbpy
Gerrit-Branch: master
Gerrit-Owner: Jcrespo <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to