Mforns has uploaded a new change for review.
https://gerrit.wikimedia.org/r/258217
Change subject: Fix bad push/pop from event batch deque
......................................................................
Fix bad push/pop from event batch deque
Former code was pushing/popping event batches to the same end
of the deque. Also, changed the batch size and the deque size
to avoid out-of-memory errors when backfilling or when having
kafka hiccups, now that we have 4 mysql consumers in parallel
Bug: T120209
Change-Id: I086753a504462396520f768934b7c038f1874bd6
---
M eventlogging/handlers.py
M eventlogging/jrm.py
M tests/test_jrm.py
3 files changed, 12 insertions(+), 11 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/eventlogging
refs/changes/17/258217/1
diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index a7c8993..79bf236 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -304,10 +304,10 @@
# that the connection is alive, and reconnect if necessary.
dbapi_connection.ping(True)
try:
- batch_size = 5000
+ batch_size = 3000
batch_time = 300 # in seconds
# Max number of batches pending insertion.
- queue_size = 1000
+ queue_size = 100
sleep_seconds = 5
# Link the main thread to the worker thread so we
# don't keep filling the queue if the worker died.
diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py
index 20234f4..27c6d12 100644
--- a/eventlogging/jrm.py
+++ b/eventlogging/jrm.py
@@ -242,7 +242,7 @@
insert = _insert_sequential
while len(events_batch) > 0:
- scid, scid_events = events_batch.pop()
+ scid, scid_events = events_batch.popleft()
prepared_events = [prepare(e) for e in scid_events]
# TODO: Avoid breaking the inserts down by same set of fields,
# instead force a default NULL, 0 or '' value for optional fields.
diff --git a/tests/test_jrm.py b/tests/test_jrm.py
index 75cfc60..5b7d408 100644
--- a/tests/test_jrm.py
+++ b/tests/test_jrm.py
@@ -7,6 +7,7 @@
"""
from __future__ import unicode_literals
+from collections import deque
import datetime
import itertools
@@ -26,7 +27,7 @@
"""If an attempt is made to store an event for which no table
exists, the schema is automatically retrieved and a suitable
table generated."""
- events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+ events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
eventlogging.store_sql_events(self.meta, events_batch)
self.assertIn('TestSchema_123', self.meta.tables)
table = self.meta.tables['TestSchema_123']
@@ -61,7 +62,7 @@
def test_encoding(self):
"""Timestamps and unicode strings are correctly encoded."""
- events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+ events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
eventlogging.jrm.store_sql_events(self.meta, events_batch)
table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
row = table.select().execute().fetchone()
@@ -75,7 +76,7 @@
def test_reflection(self):
"""Tables which exist in the database but not in the MetaData cache are
correctly reflected."""
- events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+ events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
eventlogging.store_sql_events(self.meta, events_batch)
# Tell Python to forget everything it knows about this database
@@ -91,14 +92,14 @@
# The ``checkfirst`` arg to :func:`sqlalchemy.Table.create`
# will ensure that we don't attempt to CREATE TABLE on the
# already-existing table:
- events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+ events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
eventlogging.store_sql_events(self.meta, events_batch, True)
self.assertIn('TestSchema_123', self.meta.tables)
def test_happy_case_insert_more_than_one_event(self):
"""Insert more than one event on database using batch_write"""
another_event = next(self.event_generator)
- events_batch = [(TEST_SCHEMA_SCID, [another_event, self.event])]
+ events_batch = deque([(TEST_SCHEMA_SCID, [another_event, self.event])])
eventlogging.store_sql_events(self.meta, events_batch)
table = self.meta.tables['TestSchema_123']
# is the table on the db and does it have the right data?
@@ -114,12 +115,12 @@
insert the other items.
"""
# insert event
- events_batch = [(TEST_SCHEMA_SCID, [self.event])]
+ events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
eventlogging.jrm.store_sql_events(self.meta, events_batch)
# now try to insert list of events in which this event is included
another_event = next(self.event_generator)
event_list = [another_event, self.event]
- events_batch = [(TEST_SCHEMA_SCID, event_list)]
+ events_batch = deque([(TEST_SCHEMA_SCID, event_list)])
eventlogging.store_sql_events(self.meta, events_batch, replace=True)
# we should still have to insert the other record though
@@ -158,7 +159,7 @@
set of optional fields are inserted correctly"""
another_event = next(self.event_generator)
# ensure both events get inserted?
- events_batch = [(TEST_SCHEMA_SCID, [another_event, self.event])]
+ events_batch = deque([(TEST_SCHEMA_SCID, [another_event, self.event])])
eventlogging.store_sql_events(self.meta, events_batch)
table = self.meta.tables['TestSchema_123']
# is the table on the db and does it have the right data?
--
To view, visit https://gerrit.wikimedia.org/r/258217
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I086753a504462396520f768934b7c038f1874bd6
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits