Hello Ori.livneh,
I'd like you to do a code review. Please visit
https://gerrit.wikimedia.org/r/174329
to review the following change.
Change subject: Revert "Batch event insertion"
......................................................................
Revert "Batch event insertion"
EventLogging's mysql-m2-consumer is currently failing a lot with
RuntimeError: maximum recursion depth exceeded while calling a Python object
and in the stack-traces, eventlogging/utils.py:28 is occurring a
lot. It seems this error also grabs all available memory, and OOM gets
active.
So it seems the multi-inserts are not morking smoothly, and we revert
for now.
This reverts commit b406af8397737d31937666f30450682b1a1ba523.
Change-Id: Id6e506fdcef061f44a2cc7d45535449c6867d6d0
---
M .travis.yml
M server/eventlogging/__init__.py
M server/eventlogging/handlers.py
M server/eventlogging/jrm.py
M server/eventlogging/schema.py
D server/eventlogging/utils.py
A server/test-requirements.txt
M server/tests/fixtures.py
M server/tests/test_jrm.py
M server/tox.ini
10 files changed, 32 insertions(+), 161 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging
refs/changes/29/174329/1
diff --git a/.travis.yml b/.travis.yml
index 9c1beb5..4d04c3c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,6 +2,7 @@
language: python
python:
- "2.7"
+ - "3.2"
- "3.4"
install:
- pip install -q pyzmq --install-option="--zmq=bundled" --use-mirrors
diff --git a/server/eventlogging/__init__.py b/server/eventlogging/__init__.py
index 0749a86..f80ae06 100644
--- a/server/eventlogging/__init__.py
+++ b/server/eventlogging/__init__.py
@@ -24,7 +24,6 @@
from .schema import *
from .streams import *
from .crypto import *
-from .utils import *
# The fact that schema validation is entrusted to a third-party module
# is an implementation detail that a consumer of this package's API
diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index 6e4bac0..cfe67bc 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -10,7 +10,6 @@
:func:`eventlogging.drive` pumps data through a reader-writer pair.
"""
-import collections
import datetime
import glob
import imp
@@ -23,10 +22,9 @@
import sqlalchemy
-from .utils import PeriodicThread
from .factory import writes, reads
from .streams import stream, pub_socket, sub_socket, udp_socket
-from .jrm import store_sql_events
+from .jrm import store_sql_event
__all__ = ('load_plugins',)
@@ -88,14 +86,8 @@
engine = sqlalchemy.create_engine(uri)
meta = sqlalchemy.MetaData(bind=engine)
- events = collections.deque()
- worker = PeriodicThread(
- interval=2, target=store_sql_events, args=(meta, events))
- worker.start()
-
while 1:
- event = (yield)
- events.append(event)
+ store_sql_event(meta, (yield))
@writes('file')
diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index 09c5fae..02491a6 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -11,16 +11,14 @@
import collections
import datetime
-import itertools
import sqlalchemy
-from .schema import get_schema, get_scid
+from .schema import get_schema
from .compat import items
-__all__ = ('store_sql_events',)
-
+__all__ = ('store_sql_event',)
#: Format string for :func:`datetime.datetime.strptime` for MediaWiki
#: timestamps. See `<http://www.mediawiki.org/wiki/Manual:Timestamp>`_.
@@ -166,48 +164,21 @@
return table
-def _insert_sequential(table, events, replace=False):
- """Insert events into the database by issuing an INSERT for each one."""
- for event in events:
- insert = table.insert(values=event)
- try:
- insert.execute()
- except sqlalchemy.exc.IntegrityError as e:
- if not replace or 'unique' not in str(e).lower():
- raise
- except sqlalchemy.exc.ProgrammingError:
- table.create()
- insert.execute()
-
-
-def _insert_multi(table, events, replace=False):
- """Insert events into the database using a single INSERT."""
- insert = table.insert(values=events)
- if replace:
- insert = (insert
- .prefix_with('IGNORE', dialect='mysql')
- .prefix_with('OR REPLACE', dialect='sqlite'))
+def store_sql_event(meta, event, ignore_dupes=False):
+ """Store an event in the database."""
+ scid = (event['schema'], event['revision'])
+ table = get_table(meta, scid)
+ event = flatten(event)
+ event = {k: v for k, v in items(event) if k not in NO_DB_PROPERTIES}
+ insert = table.insert(values=event)
try:
insert.execute()
- except sqlalchemy.exc.SQLAlchemyError:
- table.create(checkfirst=True)
+ except sqlalchemy.exc.IntegrityError as e:
+ if not ignore_dupes or 'unique' not in str(e).lower():
+ raise
+ except sqlalchemy.exc.ProgrammingError:
+ table.create()
insert.execute()
-
-
-def store_sql_events(meta, events, replace=False):
- """Store events in the database."""
- queue = [events.pop() for _ in range(len(events))]
- queue.sort(key=get_scid)
-
- if meta.bind.dialect.supports_multivalues_insert:
- insert = _insert_multi
- else:
- insert = _insert_sequential
-
- for scid, events in itertools.groupby(queue, get_scid):
- prepared_events = [prepare(event) for event in events]
- table = get_table(meta, scid)
- insert(table, prepared_events, replace)
def _property_getter(item):
@@ -220,14 +191,6 @@
elif 'type' in val:
val = typecast(val)
return key, val
-
-
-def prepare(event):
- """Prepare an event for insertion into the database."""
- event = flatten(event)
- for prop in NO_DB_PROPERTIES:
- event.pop(prop, None)
- return event
def flatten(d, sep='_', f=None):
diff --git a/server/eventlogging/schema.py b/server/eventlogging/schema.py
index 300b396..fa338a9 100644
--- a/server/eventlogging/schema.py
+++ b/server/eventlogging/schema.py
@@ -137,17 +137,12 @@
delete_if_exists_and_length_mismatches(capsule, 'originCountry', 2)
-def get_scid(event):
- """Extract a SCID from an event."""
- return event['schema'], event['revision']
-
-
def validate(capsule):
"""Validates an encapsulated event.
:raises :exc:`jsonschema.ValidationError`: If event is invalid.
"""
try:
- scid = get_scid(capsule)
+ scid = capsule['schema'], capsule['revision']
except KeyError as ex:
# If `schema` or `revision` keys are missing, a KeyError
# exception will be raised. We re-raise it as a
diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py
deleted file mode 100644
index 9c7e02c..0000000
--- a/server/eventlogging/utils.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- eventlogging.utils
- ~~~~~~~~~~~~~~~~~~
-
- This module contains generic routines that aren't associated with
- a particular function.
-
-"""
-import threading
-
-
-__all__ = ('PeriodicThread',)
-
-
-class PeriodicThread(threading.Thread):
- """Represents a threaded job that runs repeatedly at regular intervals."""
-
- def __init__(self, interval, *args, **kwargs):
- self.interval = interval
- self.ready = threading.Event()
- super(PeriodicThread, self).__init__(*args, **kwargs)
-
- def run(self):
- self.ready.clear()
- self.ready.wait(self.interval)
- self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
- self.run()
diff --git a/server/test-requirements.txt b/server/test-requirements.txt
new file mode 100644
index 0000000..33f4945
--- /dev/null
+++ b/server/test-requirements.txt
@@ -0,0 +1,2 @@
+coverage
+nose
diff --git a/server/tests/fixtures.py b/server/tests/fixtures.py
index ed41c47..01ef674 100644
--- a/server/tests/fixtures.py
+++ b/server/tests/fixtures.py
@@ -152,14 +152,6 @@
raise HttpRequestAttempted('Attempted HTTP fetch: %s' % (scid,))
-def _get_event():
- """ Creates events on demand with unique ids"""
- for i in range(1, 100):
- event = copy.deepcopy(_event)
- event['uuid'] = i
- yield event
-
-
class SchemaTestMixin(object):
"""A :class:`unittest.TestCase` mix-in for test cases that depend on
schema look-ups."""
@@ -172,7 +164,6 @@
_incorrectly_serialized_empty_event)
eventlogging.schema.schema_cache = copy.deepcopy(_schemas)
eventlogging.schema.http_get_schema = mock_http_get_schema
- self.event_generator = _get_event()
def tearDown(self):
"""Clear schema cache and restore stubbed `http_get_schema`."""
@@ -198,7 +189,7 @@
"""Configure :class:`sqlalchemy.engine.Engine` and
:class:`sqlalchemy.schema.MetaData` objects."""
super(DatabaseTestMixin, self).setUp()
- self.engine = sqlalchemy.create_engine('sqlite://', echo=False)
+ self.engine = sqlalchemy.create_engine('sqlite://', echo=True)
self.meta = sqlalchemy.MetaData(bind=self.engine)
def tearDown(self):
diff --git a/server/tests/test_jrm.py b/server/tests/test_jrm.py
index b3bfcba..166a621 100644
--- a/server/tests/test_jrm.py
+++ b/server/tests/test_jrm.py
@@ -13,7 +13,7 @@
import eventlogging
import sqlalchemy
-from sqlalchemy.sql import select
+
from .fixtures import DatabaseTestMixin, TEST_SCHEMA_SCID
@@ -24,15 +24,8 @@
"""If an attempt is made to store an event for which no table
exists, the schema is automatically retrieved and a suitable
table generated."""
- eventlogging.store_sql_events(self.meta, [self.event])
+ eventlogging.store_sql_event(self.meta, self.event)
self.assertIn('TestSchema_123', self.meta.tables)
- table = self.meta.tables['TestSchema_123']
- # is the table on the db and does it have the right data?
- s = select([table])
- results = self.engine.execute(s)
- row = results.fetchone()
- # see columns with print table.c
- self.assertEquals(row['clientIp'], self.event['clientIp'])
def test_column_names(self):
"""Generated tables contain columns for each relevant field."""
@@ -58,7 +51,7 @@
def test_encoding(self):
"""Timestamps and unicode strings are correctly encoded."""
- eventlogging.jrm.store_sql_events(self.meta, [self.event])
+ eventlogging.jrm.store_sql_event(self.meta, self.event)
table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
row = table.select().execute().fetchone()
self.assertEqual(row['event_value'], '☆ 彡')
@@ -71,7 +64,7 @@
def test_reflection(self):
"""Tables which exist in the database but not in the MetaData cache are
correctly reflected."""
- eventlogging.store_sql_events(self.meta, [self.event])
+ eventlogging.store_sql_event(self.meta, self.event)
# Tell Python to forget everything it knows about this database
# by purging ``MetaData``. The actual data in the database is
@@ -86,43 +79,5 @@
# The ``checkfirst`` arg to :func:`sqlalchemy.Table.create`
# will ensure that we don't attempt to CREATE TABLE on the
# already-existing table:
- eventlogging.store_sql_events(self.meta, [self.event], True)
+ eventlogging.store_sql_event(self.meta, self.event, True)
self.assertIn('TestSchema_123', self.meta.tables)
-
- def test_happy_case_insert_more_than_one_event(self):
- """Insert more than one event on database using batch_write"""
- another_event = next(self.event_generator)
- event_list = [another_event, self.event]
- eventlogging.store_sql_events(self.meta, event_list)
- table = self.meta.tables['TestSchema_123']
- # is the table on the db and does it have the right data?
- s = select([table])
- results = self.engine.execute(s)
- # the number of records in table must be the list size
- rows = results.fetchall()
- self.assertEquals(len(rows), 2)
-
- def test_insertion_of_multiple_events_with_a_duplicate(self):
- """"If an insert with multiple events includes
- a duplicate and replace=True we have to
- insert the other items.
- """
- # insert event
- eventlogging.jrm.store_sql_events(self.meta, [self.event])
- # now try to insert list of events in which this event is included
- another_event = next(self.event_generator)
- event_list = [another_event, self.event]
- eventlogging.store_sql_events(self.meta, event_list, replace=True)
-
- # we should still have to insert the other record though
- table = self.meta.tables['TestSchema_123']
- s = select([table])
- results = self.engine.execute(s)
- rows = results.fetchall()
- self.assertEquals(len(rows), 2)
-
- def test_event_queue_is_empty(self):
- """An empty event queue is handled well
- No exception is raised"""
- event_list = []
- eventlogging.store_sql_events(self.meta, event_list)
diff --git a/server/tox.ini b/server/tox.ini
index ea24863..0c0f0f9 100644
--- a/server/tox.ini
+++ b/server/tox.ini
@@ -6,21 +6,22 @@
# pip install tox
#
# And then run ``tox`` from this directory.
-
-# To run just one test file:
-# tox -e py27 -- -s tests.test_jrm
#
# .. _tox: http://tox.readthedocs.org/en/latest/
# .. _virtualenv: http://pypi.python.org/pypi/virtualenv
[tox]
-envlist = py27, py34, flake8
+envlist = py27, py32, flake8
skipsdist = true
[testenv]
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
-commands = python setup.py test {posargs}
+ -r{toxinidir}/test-requirements.txt
+commands = nosetests \
+ --verbose \
+ --with-coverage \
+ --cover-package=eventlogging
[testenv:flake8]
commands = flake8
--
To view, visit https://gerrit.wikimedia.org/r/174329
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id6e506fdcef061f44a2cc7d45535449c6867d6d0
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: QChris <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits