Mforns has uploaded a new change for review.
https://gerrit.wikimedia.org/r/210017
Change subject: Further optimize sql insertion
......................................................................
Further optimize sql insertion
This changeset tries to reduce the number of batch insertions
by increasing the size of the event queues.
To avoid that low-traffic schemas have to wait for a long time
until the queue is full, schema queues whose first and last
events are more than 5 minutes apart, are flushed to insertion
as well.
Bug: T98588
Change-Id: I7804f83dfaa6fd824b9641e13240518eb430cde1
---
M server/eventlogging/handlers.py
1 file changed, 12 insertions(+), 8 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging
refs/changes/17/210017/1
diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index 5fb0751..a3833b5 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -12,6 +12,7 @@
"""
import collections
import datetime
+import time
import glob
import imp
import json
@@ -90,7 +91,8 @@
uri = uri_delete_query_item(uri, 'replace')
logger = logging.getLogger('Log')
meta = sqlalchemy.MetaData(bind=uri)
- events = collections.defaultdict(list)
+ # Each scid stores a buffer and the timestamp of the first insertion.
+ events = collections.defaultdict(lambda: ([], time.time()))
events_batch = collections.deque()
worker = PeriodicThread(interval=DB_FLUSH_INTERVAL,
target=store_sql_events,
@@ -105,19 +107,20 @@
# that the connection is alive, and reconnect if necessary.
dbapi_connection.ping(True)
try:
+ batch_size = 400
+ batch_time = 300 # in seconds
# Link the main thread to the worker thread so we
# don't keep filling the queue if the worker died.
- batch_size = 100
while worker.is_alive():
event = (yield)
# Break the event stream by schema (and revision)
scid = (event['schema'], event['revision'])
- scid_events = events[scid]
+ scid_events, first_timestamp = events[scid]
scid_events.append(event)
- # TODO: Don't wait until len(scid_events) >= batch_size
- # if the scid is very low traffic (could take too long).
- if len(scid_events) >= batch_size:
- logger.debug('%s_%s queue is large, sending to worker', *scid)
+ # Check if the schema queue is too long or too old
+ if (len(scid_events) >= batch_size or
+ time.time() - first_timestamp >= batch_time):
+ logger.debug('%s_%s queue is large or old, flushing', *scid)
events_batch.append((scid, scid_events))
del events[scid]
except GeneratorExit:
@@ -134,7 +137,8 @@
finally:
# If there are any events remaining in the queue,
# process them in the main thread before exiting.
- events_batch.extend(events.items())
+ for scid, (scid_events, _) in events.iteritems():
+ events_batch.append((scid, scid_events))
store_sql_events(meta, events_batch, replace=replace)
--
To view, visit https://gerrit.wikimedia.org/r/210017
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7804f83dfaa6fd824b9641e13240518eb430cde1
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits