Hello Ori.livneh,

I'd like you to do a code review.  Please visit

    https://gerrit.wikimedia.org/r/174329

to review the following change.

Change subject: Revert "Batch event insertion"
......................................................................

Revert "Batch event insertion"

EventLogging's mysql-m2-consumer is currently failing a lot with

  RuntimeError: maximum recursion depth exceeded while calling a Python object

and in the stack-traces, eventlogging/utils.py:28 is occurring a
lot. It seems this error also grabs all available memory, and OOM gets
active.

So it seems the multi-inserts are not morking smoothly, and we revert
for now.

This reverts commit b406af8397737d31937666f30450682b1a1ba523.

Change-Id: Id6e506fdcef061f44a2cc7d45535449c6867d6d0
---
M .travis.yml
M server/eventlogging/__init__.py
M server/eventlogging/handlers.py
M server/eventlogging/jrm.py
M server/eventlogging/schema.py
D server/eventlogging/utils.py
A server/test-requirements.txt
M server/tests/fixtures.py
M server/tests/test_jrm.py
M server/tox.ini
10 files changed, 32 insertions(+), 161 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging 
refs/changes/29/174329/1

diff --git a/.travis.yml b/.travis.yml
index 9c1beb5..4d04c3c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,6 +2,7 @@
 language: python
 python:
     - "2.7"
+    - "3.2"
     - "3.4"
 install:
     - pip install -q pyzmq --install-option="--zmq=bundled" --use-mirrors
diff --git a/server/eventlogging/__init__.py b/server/eventlogging/__init__.py
index 0749a86..f80ae06 100644
--- a/server/eventlogging/__init__.py
+++ b/server/eventlogging/__init__.py
@@ -24,7 +24,6 @@
 from .schema import *
 from .streams import *
 from .crypto import *
-from .utils import *
 
 # The fact that schema validation is entrusted to a third-party module
 # is an implementation detail that a consumer of this package's API
diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index 6e4bac0..cfe67bc 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -10,7 +10,6 @@
   :func:`eventlogging.drive` pumps data through a reader-writer pair.
 
 """
-import collections
 import datetime
 import glob
 import imp
@@ -23,10 +22,9 @@
 
 import sqlalchemy
 
-from .utils import PeriodicThread
 from .factory import writes, reads
 from .streams import stream, pub_socket, sub_socket, udp_socket
-from .jrm import store_sql_events
+from .jrm import store_sql_event
 
 
 __all__ = ('load_plugins',)
@@ -88,14 +86,8 @@
     engine = sqlalchemy.create_engine(uri)
     meta = sqlalchemy.MetaData(bind=engine)
 
-    events = collections.deque()
-    worker = PeriodicThread(
-        interval=2, target=store_sql_events, args=(meta, events))
-    worker.start()
-
     while 1:
-        event = (yield)
-        events.append(event)
+        store_sql_event(meta, (yield))
 
 
 @writes('file')
diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index 09c5fae..02491a6 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -11,16 +11,14 @@
 
 import collections
 import datetime
-import itertools
 
 import sqlalchemy
 
-from .schema import get_schema, get_scid
+from .schema import get_schema
 from .compat import items
 
 
-__all__ = ('store_sql_events',)
-
+__all__ = ('store_sql_event',)
 
 #: Format string for :func:`datetime.datetime.strptime` for MediaWiki
 #: timestamps. See `<http://www.mediawiki.org/wiki/Manual:Timestamp>`_.
@@ -166,48 +164,21 @@
     return table
 
 
-def _insert_sequential(table, events, replace=False):
-    """Insert events into the database by issuing an INSERT for each one."""
-    for event in events:
-        insert = table.insert(values=event)
-        try:
-            insert.execute()
-        except sqlalchemy.exc.IntegrityError as e:
-            if not replace or 'unique' not in str(e).lower():
-                raise
-        except sqlalchemy.exc.ProgrammingError:
-            table.create()
-            insert.execute()
-
-
-def _insert_multi(table, events, replace=False):
-    """Insert events into the database using a single INSERT."""
-    insert = table.insert(values=events)
-    if replace:
-        insert = (insert
-                  .prefix_with('IGNORE', dialect='mysql')
-                  .prefix_with('OR REPLACE', dialect='sqlite'))
+def store_sql_event(meta, event, ignore_dupes=False):
+    """Store an event in the database."""
+    scid = (event['schema'], event['revision'])
+    table = get_table(meta, scid)
+    event = flatten(event)
+    event = {k: v for k, v in items(event) if k not in NO_DB_PROPERTIES}
+    insert = table.insert(values=event)
     try:
         insert.execute()
-    except sqlalchemy.exc.SQLAlchemyError:
-        table.create(checkfirst=True)
+    except sqlalchemy.exc.IntegrityError as e:
+        if not ignore_dupes or 'unique' not in str(e).lower():
+            raise
+    except sqlalchemy.exc.ProgrammingError:
+        table.create()
         insert.execute()
-
-
-def store_sql_events(meta, events, replace=False):
-    """Store events in the database."""
-    queue = [events.pop() for _ in range(len(events))]
-    queue.sort(key=get_scid)
-
-    if meta.bind.dialect.supports_multivalues_insert:
-        insert = _insert_multi
-    else:
-        insert = _insert_sequential
-
-    for scid, events in itertools.groupby(queue, get_scid):
-        prepared_events = [prepare(event) for event in events]
-        table = get_table(meta, scid)
-        insert(table, prepared_events, replace)
 
 
 def _property_getter(item):
@@ -220,14 +191,6 @@
         elif 'type' in val:
             val = typecast(val)
     return key, val
-
-
-def prepare(event):
-    """Prepare an event for insertion into the database."""
-    event = flatten(event)
-    for prop in NO_DB_PROPERTIES:
-        event.pop(prop, None)
-    return event
 
 
 def flatten(d, sep='_', f=None):
diff --git a/server/eventlogging/schema.py b/server/eventlogging/schema.py
index 300b396..fa338a9 100644
--- a/server/eventlogging/schema.py
+++ b/server/eventlogging/schema.py
@@ -137,17 +137,12 @@
             delete_if_exists_and_length_mismatches(capsule, 'originCountry', 2)
 
 
-def get_scid(event):
-    """Extract a SCID from an event."""
-    return event['schema'], event['revision']
-
-
 def validate(capsule):
     """Validates an encapsulated event.
     :raises :exc:`jsonschema.ValidationError`: If event is invalid.
     """
     try:
-        scid = get_scid(capsule)
+        scid = capsule['schema'], capsule['revision']
     except KeyError as ex:
         # If `schema` or `revision` keys are missing, a KeyError
         # exception will be raised. We re-raise it as a
diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py
deleted file mode 100644
index 9c7e02c..0000000
--- a/server/eventlogging/utils.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-  eventlogging.utils
-  ~~~~~~~~~~~~~~~~~~
-
-  This module contains generic routines that aren't associated with
-  a particular function.
-
-"""
-import threading
-
-
-__all__ = ('PeriodicThread',)
-
-
-class PeriodicThread(threading.Thread):
-    """Represents a threaded job that runs repeatedly at regular intervals."""
-
-    def __init__(self, interval, *args, **kwargs):
-        self.interval = interval
-        self.ready = threading.Event()
-        super(PeriodicThread, self).__init__(*args, **kwargs)
-
-    def run(self):
-        self.ready.clear()
-        self.ready.wait(self.interval)
-        self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
-        self.run()
diff --git a/server/test-requirements.txt b/server/test-requirements.txt
new file mode 100644
index 0000000..33f4945
--- /dev/null
+++ b/server/test-requirements.txt
@@ -0,0 +1,2 @@
+coverage
+nose
diff --git a/server/tests/fixtures.py b/server/tests/fixtures.py
index ed41c47..01ef674 100644
--- a/server/tests/fixtures.py
+++ b/server/tests/fixtures.py
@@ -152,14 +152,6 @@
     raise HttpRequestAttempted('Attempted HTTP fetch: %s' % (scid,))
 
 
-def _get_event():
-    """ Creates events on demand with unique ids"""
-    for i in range(1, 100):
-        event = copy.deepcopy(_event)
-        event['uuid'] = i
-        yield event
-
-
 class SchemaTestMixin(object):
     """A :class:`unittest.TestCase` mix-in for test cases that depend on
     schema look-ups."""
@@ -172,7 +164,6 @@
             _incorrectly_serialized_empty_event)
         eventlogging.schema.schema_cache = copy.deepcopy(_schemas)
         eventlogging.schema.http_get_schema = mock_http_get_schema
-        self.event_generator = _get_event()
 
     def tearDown(self):
         """Clear schema cache and restore stubbed `http_get_schema`."""
@@ -198,7 +189,7 @@
         """Configure :class:`sqlalchemy.engine.Engine` and
         :class:`sqlalchemy.schema.MetaData` objects."""
         super(DatabaseTestMixin, self).setUp()
-        self.engine = sqlalchemy.create_engine('sqlite://', echo=False)
+        self.engine = sqlalchemy.create_engine('sqlite://', echo=True)
         self.meta = sqlalchemy.MetaData(bind=self.engine)
 
     def tearDown(self):
diff --git a/server/tests/test_jrm.py b/server/tests/test_jrm.py
index b3bfcba..166a621 100644
--- a/server/tests/test_jrm.py
+++ b/server/tests/test_jrm.py
@@ -13,7 +13,7 @@
 
 import eventlogging
 import sqlalchemy
-from sqlalchemy.sql import select
+
 from .fixtures import DatabaseTestMixin, TEST_SCHEMA_SCID
 
 
@@ -24,15 +24,8 @@
         """If an attempt is made to store an event for which no table
         exists, the schema is automatically retrieved and a suitable
         table generated."""
-        eventlogging.store_sql_events(self.meta, [self.event])
+        eventlogging.store_sql_event(self.meta, self.event)
         self.assertIn('TestSchema_123', self.meta.tables)
-        table = self.meta.tables['TestSchema_123']
-        # is the table on the db  and does it have the right data?
-        s = select([table])
-        results = self.engine.execute(s)
-        row = results.fetchone()
-        # see columns with print table.c
-        self.assertEquals(row['clientIp'], self.event['clientIp'])
 
     def test_column_names(self):
         """Generated tables contain columns for each relevant field."""
@@ -58,7 +51,7 @@
 
     def test_encoding(self):
         """Timestamps and unicode strings are correctly encoded."""
-        eventlogging.jrm.store_sql_events(self.meta, [self.event])
+        eventlogging.jrm.store_sql_event(self.meta, self.event)
         table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
         row = table.select().execute().fetchone()
         self.assertEqual(row['event_value'], '☆ 彡')
@@ -71,7 +64,7 @@
     def test_reflection(self):
         """Tables which exist in the database but not in the MetaData cache are
         correctly reflected."""
-        eventlogging.store_sql_events(self.meta, [self.event])
+        eventlogging.store_sql_event(self.meta, self.event)
 
         # Tell Python to forget everything it knows about this database
         # by purging ``MetaData``. The actual data in the database is
@@ -86,43 +79,5 @@
         # The ``checkfirst`` arg to :func:`sqlalchemy.Table.create`
         # will ensure that we don't attempt to CREATE TABLE on the
         # already-existing table:
-        eventlogging.store_sql_events(self.meta, [self.event], True)
+        eventlogging.store_sql_event(self.meta, self.event, True)
         self.assertIn('TestSchema_123', self.meta.tables)
-
-    def test_happy_case_insert_more_than_one_event(self):
-        """Insert more than one event on database using batch_write"""
-        another_event = next(self.event_generator)
-        event_list = [another_event, self.event]
-        eventlogging.store_sql_events(self.meta, event_list)
-        table = self.meta.tables['TestSchema_123']
-        # is the table on the db  and does it have the right data?
-        s = select([table])
-        results = self.engine.execute(s)
-        # the number of records in table must be the list size
-        rows = results.fetchall()
-        self.assertEquals(len(rows), 2)
-
-    def test_insertion_of_multiple_events_with_a_duplicate(self):
-        """"If an insert with multiple events includes
-        a duplicate and replace=True we have to
-        insert the other items.
-        """
-        # insert event
-        eventlogging.jrm.store_sql_events(self.meta, [self.event])
-        # now try to insert list of events in which this event is included
-        another_event = next(self.event_generator)
-        event_list = [another_event, self.event]
-        eventlogging.store_sql_events(self.meta, event_list, replace=True)
-
-        # we should still have to insert the other record though
-        table = self.meta.tables['TestSchema_123']
-        s = select([table])
-        results = self.engine.execute(s)
-        rows = results.fetchall()
-        self.assertEquals(len(rows), 2)
-
-    def test_event_queue_is_empty(self):
-        """An empty event queue is handled well
-        No exception is raised"""
-        event_list = []
-        eventlogging.store_sql_events(self.meta, event_list)
diff --git a/server/tox.ini b/server/tox.ini
index ea24863..0c0f0f9 100644
--- a/server/tox.ini
+++ b/server/tox.ini
@@ -6,21 +6,22 @@
 #     pip install tox
 #
 # And then run ``tox`` from this directory.
-
-# To run just one test file:
-# tox -e py27 -- -s tests.test_jrm
 #
 # ..        _tox: http://tox.readthedocs.org/en/latest/
 # .. _virtualenv: http://pypi.python.org/pypi/virtualenv
 
 [tox]
-envlist = py27, py34, flake8
+envlist = py27, py32, flake8
 skipsdist = true
 
 [testenv]
 setenv = VIRTUAL_ENV={envdir}
 deps = -r{toxinidir}/requirements.txt
-commands = python setup.py test {posargs}
+       -r{toxinidir}/test-requirements.txt
+commands = nosetests \
+  --verbose \
+  --with-coverage \
+  --cover-package=eventlogging
 
 [testenv:flake8]
 commands = flake8

-- 
To view, visit https://gerrit.wikimedia.org/r/174329
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id6e506fdcef061f44a2cc7d45535449c6867d6d0
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: QChris <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to