Nuria has submitted this change and it was merged.

Change subject: Changes to batch events more efficiently
......................................................................


Changes to batch events more efficiently

Do not set ready on a child thread if it is already working
Keep batches to a known size
Log exceptions to log before rethrowing

Bug: T90029

Change-Id: I633ba2ed026a66a5126407976701433933de1a7c
---
M server/eventlogging/handlers.py
M server/eventlogging/jrm.py
M server/eventlogging/utils.py
3 files changed, 48 insertions(+), 24 deletions(-)

Approvals:
  Nuria: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index b08fdd0..5479128 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -20,8 +20,8 @@
 import os
 import socket
 import sys
-
 import sqlalchemy
+import traceback
 
 from .utils import PeriodicThread, uri_delete_query_item
 from .factory import writes, reads
@@ -88,12 +88,13 @@
     """Writes to an RDBMS, creating tables for SCIDs and rows for events."""
     # Don't pass 'replace' parameter to SQLAlchemy.
     uri = uri_delete_query_item(uri, 'replace')
-
+    logger = logging.getLogger('Log')
     meta = sqlalchemy.MetaData(bind=uri)
     events = collections.deque()
+    eventsBatch = collections.deque()
     worker = PeriodicThread(interval=DB_FLUSH_INTERVAL,
                             target=store_sql_events,
-                            args=(meta, events, replace))
+                            args=(meta, eventsBatch, replace))
     worker.start()
 
     if meta.bind.dialect.name == 'mysql':
@@ -102,25 +103,35 @@
             # Just before executing an insert, call mysql_ping() to verify
             # that the connection is alive, and reconnect if necessary.
             dbapi_connection.ping(True)
-
     try:
         # Link the main thread to the worker thread so we
         # don't keep filling the queue if the worker died.
+        batchSize = 1000
         while worker.is_alive():
             event = (yield)
             events.append(event)
-            if len(events) >= 100:
+            if len(events) >= batchSize and not worker.ready.isSet():
+                logger.debug('Queue is large, sending to child thread')
+                for i in range(0, batchSize):
+                    eventsBatch.append(events.popleft())
                 worker.ready.set()
     except GeneratorExit:
         # Allow the worker to complete any work that is
         # already in progress before shutting down.
+        logger.debug('Stopped main thread via GeneratorExit')
+        logger.debug('Events when stopped %s', len(events))
         worker.stop()
         worker.join()
+    except Exception:
+        t = traceback.format_exc()
+        logger.debug('Exception caught %s', t)
+        raise
     finally:
         # If there are any events remaining in the queue,
         # process them in the main thread before exiting.
         if events:
             store_sql_events(meta, events)
+            store_sql_events(meta, eventsBatch)
 
 
 @writes('file')
diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index 16ef9d7..4ee809f 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -12,7 +12,7 @@
 import collections
 import datetime
 import itertools
-
+import logging
 import sqlalchemy
 
 from .compat import items
@@ -212,7 +212,9 @@
 
 def store_sql_events(meta, events, replace=False):
     """Store events in the database."""
-    queue = [flatten(events.pop()) for _ in range(len(events))]
+    logger = logging.getLogger('Log')
+    lenEvents = len(events)
+    queue = [flatten(events.pop()) for _ in range(lenEvents)]
     queue.sort(key=insert_sort_key)
 
     dialect = meta.bind.dialect
@@ -226,6 +228,7 @@
         prepared_events = [prepare(event) for event in events]
         table = get_table(meta, scid)
         insert(table, prepared_events, replace)
+    logger.debug("Data inserted %s", str(lenEvents))
 
 
 def _property_getter(item):
diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py
index 124ee0c..ed4d6e1 100644
--- a/server/eventlogging/utils.py
+++ b/server/eventlogging/utils.py
@@ -14,6 +14,7 @@
 import re
 import sys
 import threading
+import traceback
 
 from .compat import items, monotonic_clock
 from .factory import get_reader
@@ -31,32 +32,41 @@
         self.interval = interval
         self.ready = threading.Event()
         self.stopping = threading.Event()
+        self.logger = logging.getLogger('Log')
         super(PeriodicThread, self).__init__(*args, **kwargs)
 
     def run(self):
         while not self.stopping.is_set():
-            # Run the target function. Check the clock before
-            # and after to determine how long it took to run.
-            time_start = monotonic_clock()
-            self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
-            time_stop = monotonic_clock()
+            try:
+                # Run the target function. Check the clock before
+                # and after to determine how long it took to run.
+                time_start = monotonic_clock()
+                self._Thread__target(*self._Thread__args,
+                                     **self._Thread__kwargs)
+                time_stop = monotonic_clock()
 
-            run_duration = time_stop - time_start
+                run_duration = time_stop - time_start
 
-            # Subtract the time it took the target function to run
-            # from the desired run interval. The result is how long
-            # we have to sleep before the next run.
-            time_to_next_run = self.interval - run_duration
-
-            if self.ready.wait(time_to_next_run):
-                # If the internal flag of `self.ready` was set, we were
-                # interrupted mid-nap to run immediately. But before we
-                # do, we reset the flag.
-                self.ready.clear()
+                # Subtract the time it took the target function to run
+                # from the desired run interval. The result is how long
+                # we have to sleep before the next run.
+                time_to_next_run = self.interval - run_duration
+                self.logger.debug('Run duration of thread execution: %s',
+                                  str(run_duration))
+                if self.ready.wait(time_to_next_run):
+                    # If the internal flag of `self.ready` was set, we were
+                    # interrupted mid-nap to run immediately. But before we
+                    # do, we reset the flag.
+                    self.ready.clear()
+            except Exception, e:
+                trace = traceback.format_exc()
+                self.logger.debug('Child thread exiting, exception %s', trace)
+                raise e
 
     def stop(self):
         """Graceful stop: stop once the current iteration is complete."""
         self.stopping.set()
+        self.logger.debug('Stopping child thread gracefully')
 
 
 def uri_delete_query_item(uri, key):
@@ -158,4 +168,4 @@
 
 def setup_logging():
     logging.basicConfig(stream=sys.stderr, level=logging.DEBUG,
-                        format='%(asctime)s %(message)s')
+                        format='%(asctime)s (%(threadName)-10s) %(message)s')

-- 
To view, visit https://gerrit.wikimedia.org/r/191231
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I633ba2ed026a66a5126407976701433933de1a7c
Gerrit-PatchSet: 6
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Nuria <[email protected]>
Gerrit-Reviewer: Mattflaschen <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to