Mforns has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/374823 )

Change subject: [WIP] Optimize EventLogging purging script using timestamps
......................................................................

[WIP] Optimize EventLogging purging script using timestamps

Bug: T156933
Change-Id: Ib9f7b9559425bf0496c3cca82df63cd101261c8b
---
M modules/role/files/mariadb/eventlogging_cleaner.py
1 file changed, 44 insertions(+), 67 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/23/374823/1

diff --git a/modules/role/files/mariadb/eventlogging_cleaner.py 
b/modules/role/files/mariadb/eventlogging_cleaner.py
index ee08333..df4583e 100644
--- a/modules/role/files/mariadb/eventlogging_cleaner.py
+++ b/modules/role/files/mariadb/eventlogging_cleaner.py
@@ -27,8 +27,8 @@
    if any of DB username/password are provided by the user as my.cnf 
configuration
    file (the conf file needs to have a [client] section with 'user' and 
'password').
 2) If a table is listed in the whitelist, then some of its fields are 
automatically
-   added to it (see COMMON_PERSISTENT_FIELDS). This ensures that important 
fields
-   like timestamp or primary keys are preserved.
+   added to it (see COMMON_PERSISTENT_FIELDS). This ensures that important 
non-sensitive
+   fields like timestamp or primary keys are preserved.
 3) The script runs updates/deletes in batches to avoid blocking the database 
for too
    long creating contention with other write operations (like inserts).
 """
@@ -44,7 +44,6 @@
 import sys
 import time
 import unittest
-import uuid
 
 from datetime import datetime, timedelta
 from unittest.mock import MagicMock, Mock, call, patch
@@ -225,53 +224,26 @@
             result = self.database.execute(command, params, 
dry_run=self.dry_run)
             time.sleep(self.sleep_between_batches)
 
-    def _get_uuids_and_last_ts(self, table, start_ts, 
override_batch_size=None):
+    def _get_last_ts(self, table, start_ts):
         """
-        Return the first <batch_size> uuids of the events between start_ts
-        and self.end. Also return the timestamp of the last of those events.
-        NOTE: If there exist several events that share the last timestamp,
-        it might be that some of them are listed in the uuid batch, and some
-        others aren't (do not fit in the batch size limit). In the next 
iteration
-        start_ts will be this iteration's last_ts, and so the script might
-        re-purge some events, which is OK, because the outcome does not change.
+        Return the timestamp of the Nth event between start_ts and self.end,
+        where N is equal to the batch size. If there are less than N events
+        between start_ts and self.end, return the timestamp of the last one.
+        If there are no events between start_ts and self.end, return None.
         """
-        batch_size = override_batch_size or self.batch_size
-        # July 2017
-        # There are currently some tables on analtics-store that have their 
uuid
-        # field set as 'binary', not 'char' as in the master
-        # and the analytics-slave.
-        # Since altering all the inconsistent tables is a demanding task for 
the
-        # current hardware, we just force an explicit cast to char in the 
query.
         command = (
-            "SELECT timestamp, CAST(uuid AS CHAR) from {} WHERE timestamp >= 
%(start_ts)s "
+            "SELECT timestamp from `{}` WHERE timestamp >= %(start_ts)s "
             "AND timestamp < %(end_ts)s ORDER BY timestamp LIMIT 
%(batch_size)s"
             .format(table)
         )
         params = {
             'start_ts': start_ts,
             'end_ts': self.end,
-            'batch_size': batch_size,
+            'batch_size': self.batch_size,
         }
         result = self.database.execute(command, params, self.dry_run)
         if result['rows']:
-            last_ts = result['rows'][-1][0]
-            if last_ts == start_ts:
-                if batch_size > 4 * self.batch_size:
-                    raise RuntimeError(
-                        "The number of events with the same timestamp ({}) "
-                        "for table {} exceeded 4 times the configured batch 
size. "
-                        "Aborting as a precautionary measure."
-                        .format(start_ts, table)
-                    )
-                log.warning("All events in the batch have the same timestamp 
({}) for table {}. "
-                            "Growing the batch size to {}."
-                            .format(start_ts, table, 2 * batch_size))
-                return self._get_uuids_and_last_ts(table, start_ts,
-                                                   
override_batch_size=batch_size*2)
-            uuids = [x[1] for x in result['rows']]
-            return (uuids, last_ts)
-        else:
-            return ([], None)
+            return result['rows'][-1][0]
 
     def sanitize(self, table):
         """
@@ -297,42 +269,47 @@
             log.warning("No fields to purge for table {}.".format(table))
             return
 
+        select_template = (
+            "SELECT COUNT(*) FROM `{}` "
+            "WHERE timestamp >= %(start_ts)s AND timestamp <= %(end_ts)s"
+        ).format(table)
         values_string = ','.join([field + ' = NULL' for field in 
fields_to_purge])
-        uuids_current_batch, last_ts = self._get_uuids_and_last_ts(table, 
self.start)
-        command_template = (
-            "UPDATE {0} "
-            "SET {1} "
-            "WHERE uuid IN ({{}})"
+        update_template = (
+            "UPDATE `{}` "
+            "SET {} "
+            "WHERE timestamp >= %(start_ts)s AND timestamp <= %(end_ts)s"
         ).format(table, values_string)
 
-        while uuids_current_batch:
-            uuids_no = len(uuids_current_batch)
-            if uuids_no > self.batch_size:
-                log.warning("The number of uuids to sanitize {} is bigger "
-                            "than the batch size {}, this condition should not 
"
-                            "be possible, please review the code/data. "
-                            .format(str(uuids_no), str(self.batch_size)))
-
-            uuids_current_batch_escaped = ["'" + x + "'" for x in 
uuids_current_batch]
-            result = self.database.execute(
-                command_template.format(",".join(uuids_current_batch_escaped)),
+        start_ts = self.start
+        end_ts = self._get_last_ts(table, start_ts)
+        while end_ts:
+            # First, check if the start_ts-last_ts interval has an expected
+            # number of events (<= 2 * batch_size), given that the last_ts
+            # may contain a theoretically undefined number of events.
+            select_result = self.database.execute(
+                select_template,
+                {'start_ts': start_ts, 'end_ts': end_ts},
                 dry_run=self.dry_run
             )
-            if result['numrows'] > uuids_no:
-                log.error("The number of uuids to sanitize {} is lower "
-                          "than the number of updated rows in this batch {}. "
-                          "This is definitely an error in the code, please 
review it."
-                          .format(uuids_no, result['numrows']))
+            if select_result['numrows'] > 2 * self.batch_size:
+                log.error("The table {} has more than 2 * batch size events "
+                          "between {} and {}. You may need to increase the "
+                          "batch size or review the elements in the time"
+                          "window."
+                          .format(table, start_ts, end_ts)
                 raise RuntimeError('Sanitization stopped as precautionary 
step.')
 
-            if uuids_no < self.batch_size:
-                # Avoid an extra SQL query to the database if the number of
-                # uuids returned are less than BATCH_SIZE, since this value
-                # means that we have already reached the last batch of uuids
-                # to sanitize.
-                uuids_current_batch = []
-            else:
-                uuids_current_batch, last_ts = 
self._get_uuids_and_last_ts(table, last_ts)
+            # Then, proceed to sanitize the start_ts-last_ts interval.
+            result = self.database.execute(
+                update_template,
+                {'start_ts': start_ts, 'end_ts': end_ts},
+                dry_run=self.dry_run
+            )
+
+            # As end_ts is inclusive in the update statement
+            # start next batch 1 second after end_ts.
+            start_ts = self._add_one_second(end_ts)
+            end_ts = self._get_last_ts(table, start_ts)
             time.sleep(self.sleep_between_batches)
 
 

-- 
To view, visit https://gerrit.wikimedia.org/r/374823
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib9f7b9559425bf0496c3cca82df63cd101261c8b
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to