Mforns has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/374823 )
Change subject: [WIP] Optimize EventLogging purging script using timestamps
......................................................................
[WIP] Optimize EventLogging purging script using timestamps
Bug: T156933
Change-Id: Ib9f7b9559425bf0496c3cca82df63cd101261c8b
---
M modules/role/files/mariadb/eventlogging_cleaner.py
1 file changed, 44 insertions(+), 67 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/operations/puppet
refs/changes/23/374823/1
diff --git a/modules/role/files/mariadb/eventlogging_cleaner.py
b/modules/role/files/mariadb/eventlogging_cleaner.py
index ee08333..df4583e 100644
--- a/modules/role/files/mariadb/eventlogging_cleaner.py
+++ b/modules/role/files/mariadb/eventlogging_cleaner.py
@@ -27,8 +27,8 @@
if any of DB username/password are provided by the user as my.cnf
configuration
file (the conf file needs to have a [client] section with 'user' and
'password').
2) If a table is listed in the whitelist, then some of its fields are
automatically
- added to it (see COMMON_PERSISTENT_FIELDS). This ensures that important
fields
- like timestamp or primary keys are preserved.
+ added to it (see COMMON_PERSISTENT_FIELDS). This ensures that important
non-sensitive
+ fields like timestamp or primary keys are preserved.
3) The script runs updates/deletes in batches to avoid blocking the database
for too
long creating contention with other write operations (like inserts).
"""
@@ -44,7 +44,6 @@
import sys
import time
import unittest
-import uuid
from datetime import datetime, timedelta
from unittest.mock import MagicMock, Mock, call, patch
@@ -225,53 +224,26 @@
result = self.database.execute(command, params,
dry_run=self.dry_run)
time.sleep(self.sleep_between_batches)
- def _get_uuids_and_last_ts(self, table, start_ts,
override_batch_size=None):
+ def _get_last_ts(self, table, start_ts):
"""
- Return the first <batch_size> uuids of the events between start_ts
- and self.end. Also return the timestamp of the last of those events.
- NOTE: If there exist several events that share the last timestamp,
- it might be that some of them are listed in the uuid batch, and some
- others aren't (do not fit in the batch size limit). In the next
iteration
- start_ts will be this iteration's last_ts, and so the script might
- re-purge some events, which is OK, because the outcome does not change.
+ Return the timestamp of the Nth event between start_ts and self.end,
+ where N is equal to the batch size. If there are less than N events
+ between start_ts and self.end, return the timestamp of the last one.
+ If there are no events between start_ts and self.end, return None.
"""
- batch_size = override_batch_size or self.batch_size
- # July 2017
- # There are currently some tables on analtics-store that have their
uuid
- # field set as 'binary', not 'char' as in the master
- # and the analytics-slave.
- # Since altering all the inconsistent tables is a demanding task for
the
- # current hardware, we just force an explicit cast to char in the
query.
command = (
- "SELECT timestamp, CAST(uuid AS CHAR) from {} WHERE timestamp >=
%(start_ts)s "
+ "SELECT timestamp from `{}` WHERE timestamp >= %(start_ts)s "
"AND timestamp < %(end_ts)s ORDER BY timestamp LIMIT
%(batch_size)s"
.format(table)
)
params = {
'start_ts': start_ts,
'end_ts': self.end,
- 'batch_size': batch_size,
+ 'batch_size': self.batch_size,
}
result = self.database.execute(command, params, self.dry_run)
if result['rows']:
- last_ts = result['rows'][-1][0]
- if last_ts == start_ts:
- if batch_size > 4 * self.batch_size:
- raise RuntimeError(
- "The number of events with the same timestamp ({}) "
- "for table {} exceeded 4 times the configured batch
size. "
- "Aborting as a precautionary measure."
- .format(start_ts, table)
- )
- log.warning("All events in the batch have the same timestamp
({}) for table {}. "
- "Growing the batch size to {}."
- .format(start_ts, table, 2 * batch_size))
- return self._get_uuids_and_last_ts(table, start_ts,
-
override_batch_size=batch_size*2)
- uuids = [x[1] for x in result['rows']]
- return (uuids, last_ts)
- else:
- return ([], None)
+ return result['rows'][-1][0]
def sanitize(self, table):
"""
@@ -297,42 +269,47 @@
log.warning("No fields to purge for table {}.".format(table))
return
+ select_template = (
+ "SELECT COUNT(*) FROM `{}` "
+ "WHERE timestamp >= %(start_ts)s AND timestamp <= %(end_ts)s"
+ ).format(table)
values_string = ','.join([field + ' = NULL' for field in
fields_to_purge])
- uuids_current_batch, last_ts = self._get_uuids_and_last_ts(table,
self.start)
- command_template = (
- "UPDATE {0} "
- "SET {1} "
- "WHERE uuid IN ({{}})"
+ update_template = (
+ "UPDATE `{}` "
+ "SET {} "
+ "WHERE timestamp >= %(start_ts)s AND timestamp <= %(end_ts)s"
).format(table, values_string)
- while uuids_current_batch:
- uuids_no = len(uuids_current_batch)
- if uuids_no > self.batch_size:
- log.warning("The number of uuids to sanitize {} is bigger "
- "than the batch size {}, this condition should not
"
- "be possible, please review the code/data. "
- .format(str(uuids_no), str(self.batch_size)))
-
- uuids_current_batch_escaped = ["'" + x + "'" for x in
uuids_current_batch]
- result = self.database.execute(
- command_template.format(",".join(uuids_current_batch_escaped)),
+ start_ts = self.start
+ end_ts = self._get_last_ts(table, start_ts)
+ while end_ts:
+ # First, check if the start_ts-last_ts interval has an expected
+ # number of events (<= 2 * batch_size), given that the last_ts
+ # may contain a theoretically undefined number of events.
+ select_result = self.database.execute(
+ select_template,
+ {'start_ts': start_ts, 'end_ts': end_ts},
dry_run=self.dry_run
)
- if result['numrows'] > uuids_no:
- log.error("The number of uuids to sanitize {} is lower "
- "than the number of updated rows in this batch {}. "
- "This is definitely an error in the code, please
review it."
- .format(uuids_no, result['numrows']))
+ if select_result['numrows'] > 2 * self.batch_size:
+ log.error("The table {} has more than 2 * batch size events "
+ "between {} and {}. You may need to increase the "
+ "batch size or review the elements in the time"
+ "window."
+ .format(table, start_ts, end_ts)
raise RuntimeError('Sanitization stopped as precautionary
step.')
- if uuids_no < self.batch_size:
- # Avoid an extra SQL query to the database if the number of
- # uuids returned are less than BATCH_SIZE, since this value
- # means that we have already reached the last batch of uuids
- # to sanitize.
- uuids_current_batch = []
- else:
- uuids_current_batch, last_ts =
self._get_uuids_and_last_ts(table, last_ts)
+ # Then, proceed to sanitize the start_ts-last_ts interval.
+ result = self.database.execute(
+ update_template,
+ {'start_ts': start_ts, 'end_ts': end_ts},
+ dry_run=self.dry_run
+ )
+
+ # As end_ts is inclusive in the update statement
+ # start next batch 1 second after end_ts.
+ start_ts = self._add_one_second(end_ts)
+ end_ts = self._get_last_ts(table, start_ts)
time.sleep(self.sleep_between_batches)
--
To view, visit https://gerrit.wikimedia.org/r/374823
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib9f7b9559425bf0496c3cca82df63cd101261c8b
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Mforns <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits