Mforns has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/210017

Change subject: Further optimize sql insertion
......................................................................

Further optimize sql insertion

This changeset tries to reduce the number of batch insertions
by increasing the size of the event queues.

To avoid that low-traffic schemas have to wait for a long time
until the queue is full, schema queues whose first and last
events are more than 5 minutes apart, are flushed to insertion
as well.

Bug: T98588
Change-Id: I7804f83dfaa6fd824b9641e13240518eb430cde1
---
M server/eventlogging/handlers.py
1 file changed, 12 insertions(+), 8 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging 
refs/changes/17/210017/1

diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index 5fb0751..a3833b5 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -12,6 +12,7 @@
 """
 import collections
 import datetime
+import time
 import glob
 import imp
 import json
@@ -90,7 +91,8 @@
     uri = uri_delete_query_item(uri, 'replace')
     logger = logging.getLogger('Log')
     meta = sqlalchemy.MetaData(bind=uri)
-    events = collections.defaultdict(list)
+    # Each scid stores a buffer and the timestamp of the first insertion.
+    events = collections.defaultdict(lambda: ([], time.time()))
     events_batch = collections.deque()
     worker = PeriodicThread(interval=DB_FLUSH_INTERVAL,
                             target=store_sql_events,
@@ -105,19 +107,20 @@
             # that the connection is alive, and reconnect if necessary.
             dbapi_connection.ping(True)
     try:
+        batch_size = 400
+        batch_time = 300  # in seconds
         # Link the main thread to the worker thread so we
         # don't keep filling the queue if the worker died.
-        batch_size = 100
         while worker.is_alive():
             event = (yield)
             # Break the event stream by schema (and revision)
             scid = (event['schema'], event['revision'])
-            scid_events = events[scid]
+            scid_events, first_timestamp = events[scid]
             scid_events.append(event)
-            # TODO: Don't wait until len(scid_events) >= batch_size
-            # if the scid is very low traffic (could take too long).
-            if len(scid_events) >= batch_size:
-                logger.debug('%s_%s queue is large, sending to worker', *scid)
+            # Check if the schema queue is too long or too old
+            if (len(scid_events) >= batch_size or
+                    time.time() - first_timestamp >= batch_time):
+                logger.debug('%s_%s queue is large or old, flushing', *scid)
                 events_batch.append((scid, scid_events))
                 del events[scid]
     except GeneratorExit:
@@ -134,7 +137,8 @@
     finally:
         # If there are any events remaining in the queue,
         # process them in the main thread before exiting.
-        events_batch.extend(events.items())
+        for scid, (scid_events, _) in events.iteritems():
+            events_batch.append((scid, scid_events))
         store_sql_events(meta, events_batch, replace=replace)
 
 

-- 
To view, visit https://gerrit.wikimedia.org/r/210017
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7804f83dfaa6fd824b9641e13240518eb430cde1
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to