Nuria has submitted this change and it was merged.
Change subject: Changes to batch events more efficiently
......................................................................
Changes to batch events more efficiently
Do not set ready on a child thread if it is already working
Keep batches to a known size
Log exceptions to log before rethrowing
Bug: T90029
Change-Id: I633ba2ed026a66a5126407976701433933de1a7c
---
M server/eventlogging/handlers.py
M server/eventlogging/jrm.py
M server/eventlogging/utils.py
3 files changed, 48 insertions(+), 24 deletions(-)
Approvals:
Nuria: Looks good to me, approved
jenkins-bot: Verified
diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index b08fdd0..5479128 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -20,8 +20,8 @@
import os
import socket
import sys
-
import sqlalchemy
+import traceback
from .utils import PeriodicThread, uri_delete_query_item
from .factory import writes, reads
@@ -88,12 +88,13 @@
"""Writes to an RDBMS, creating tables for SCIDs and rows for events."""
# Don't pass 'replace' parameter to SQLAlchemy.
uri = uri_delete_query_item(uri, 'replace')
-
+ logger = logging.getLogger('Log')
meta = sqlalchemy.MetaData(bind=uri)
events = collections.deque()
+ eventsBatch = collections.deque()
worker = PeriodicThread(interval=DB_FLUSH_INTERVAL,
target=store_sql_events,
- args=(meta, events, replace))
+ args=(meta, eventsBatch, replace))
worker.start()
if meta.bind.dialect.name == 'mysql':
@@ -102,25 +103,35 @@
# Just before executing an insert, call mysql_ping() to verify
# that the connection is alive, and reconnect if necessary.
dbapi_connection.ping(True)
-
try:
# Link the main thread to the worker thread so we
# don't keep filling the queue if the worker died.
+ batchSize = 1000
while worker.is_alive():
event = (yield)
events.append(event)
- if len(events) >= 100:
+ if len(events) >= batchSize and not worker.ready.isSet():
+ logger.debug('Queue is large, sending to child thread')
+ for i in range(0, batchSize):
+ eventsBatch.append(events.popleft())
worker.ready.set()
except GeneratorExit:
# Allow the worker to complete any work that is
# already in progress before shutting down.
+ logger.debug('Stopped main thread via GeneratorExit')
+ logger.debug('Events when stopped %s', len(events))
worker.stop()
worker.join()
+ except Exception:
+ t = traceback.format_exc()
+ logger.debug('Exception caught %s', t)
+ raise
finally:
# If there are any events remaining in the queue,
# process them in the main thread before exiting.
if events:
store_sql_events(meta, events)
+ store_sql_events(meta, eventsBatch)
@writes('file')
diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index 16ef9d7..4ee809f 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -12,7 +12,7 @@
import collections
import datetime
import itertools
-
+import logging
import sqlalchemy
from .compat import items
@@ -212,7 +212,9 @@
def store_sql_events(meta, events, replace=False):
"""Store events in the database."""
- queue = [flatten(events.pop()) for _ in range(len(events))]
+ logger = logging.getLogger('Log')
+ lenEvents = len(events)
+ queue = [flatten(events.pop()) for _ in range(lenEvents)]
queue.sort(key=insert_sort_key)
dialect = meta.bind.dialect
@@ -226,6 +228,7 @@
prepared_events = [prepare(event) for event in events]
table = get_table(meta, scid)
insert(table, prepared_events, replace)
+ logger.debug("Data inserted %s", str(lenEvents))
def _property_getter(item):
diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py
index 124ee0c..ed4d6e1 100644
--- a/server/eventlogging/utils.py
+++ b/server/eventlogging/utils.py
@@ -14,6 +14,7 @@
import re
import sys
import threading
+import traceback
from .compat import items, monotonic_clock
from .factory import get_reader
@@ -31,32 +32,41 @@
self.interval = interval
self.ready = threading.Event()
self.stopping = threading.Event()
+ self.logger = logging.getLogger('Log')
super(PeriodicThread, self).__init__(*args, **kwargs)
def run(self):
while not self.stopping.is_set():
- # Run the target function. Check the clock before
- # and after to determine how long it took to run.
- time_start = monotonic_clock()
- self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
- time_stop = monotonic_clock()
+ try:
+ # Run the target function. Check the clock before
+ # and after to determine how long it took to run.
+ time_start = monotonic_clock()
+ self._Thread__target(*self._Thread__args,
+ **self._Thread__kwargs)
+ time_stop = monotonic_clock()
- run_duration = time_stop - time_start
+ run_duration = time_stop - time_start
- # Subtract the time it took the target function to run
- # from the desired run interval. The result is how long
- # we have to sleep before the next run.
- time_to_next_run = self.interval - run_duration
-
- if self.ready.wait(time_to_next_run):
- # If the internal flag of `self.ready` was set, we were
- # interrupted mid-nap to run immediately. But before we
- # do, we reset the flag.
- self.ready.clear()
+ # Subtract the time it took the target function to run
+ # from the desired run interval. The result is how long
+ # we have to sleep before the next run.
+ time_to_next_run = self.interval - run_duration
+ self.logger.debug('Run duration of thread execution: %s',
+ str(run_duration))
+ if self.ready.wait(time_to_next_run):
+ # If the internal flag of `self.ready` was set, we were
+ # interrupted mid-nap to run immediately. But before we
+ # do, we reset the flag.
+ self.ready.clear()
+ except Exception, e:
+ trace = traceback.format_exc()
+ self.logger.debug('Child thread exiting, exception %s', trace)
+ raise e
def stop(self):
"""Graceful stop: stop once the current iteration is complete."""
self.stopping.set()
+ self.logger.debug('Stopping child thread gracefully')
def uri_delete_query_item(uri, key):
@@ -158,4 +168,4 @@
def setup_logging():
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG,
- format='%(asctime)s %(message)s')
+ format='%(asctime)s (%(threadName)-10s) %(message)s')
--
To view, visit https://gerrit.wikimedia.org/r/191231
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I633ba2ed026a66a5126407976701433933de1a7c
Gerrit-PatchSet: 6
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Nuria <[email protected]>
Gerrit-Reviewer: Mattflaschen <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits