Ottomata has submitted this change and it was merged.

Change subject: Remove queue of event batches from mysql consumer
......................................................................


Remove queue of event batches from mysql consumer

Since the mysql consumer uses Kafka, there is no more need
to have the deque of event batches, nor the worker thread.
The mysql writer can block while inserting and let Kafka
buffer the incomming events. Note that the mysql consumer
still groups (batches) the events by schema to insert them
together in a single insert statement, but it does not queue
them for insertion, rather inserts them directly when the
batch is big enough (or old enough).

Bug: T121151
Change-Id: If247ed99b3f828b82295689b2e4ade5d21eb5c04
---
M eventlogging/handlers.py
M eventlogging/jrm.py
M tests/test_jrm.py
3 files changed, 72 insertions(+), 121 deletions(-)

Approvals:
  Ottomata: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index a2b4efb..6d5ad4f 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -15,8 +15,6 @@
 import imp
 import inspect
 
-from functools import partial
-
 import logging
 import logging.handlers
 import os
@@ -29,10 +27,10 @@
 import uuid
 
 from .compat import items, json
-from .utils import PeriodicThread, uri_delete_query_item
+from .utils import uri_delete_query_item
 from .factory import writes, reads
 from .streams import stream, pub_socket, sub_socket, udp_socket
-from .jrm import store_sql_events, DB_FLUSH_INTERVAL
+from .jrm import store_sql_events
 from .topic import TopicNotFound
 
 __all__ = ('load_plugins',)
@@ -255,26 +253,13 @@
                 kafka_producer.send_messages(message_topic, message)
 
 
-def insert_stats(stats, inserted_count):
-    """
-    Callback function to increment mysql inserted metric in statsd,
-    that is called after successful insertion of events into mysql.
-
-        stats           - Instance of stats.StatsClient
-        inserted_count  - Number of events that have been inserted
-    """
-    if stats:
-        stats.incr('overall.inserted', inserted_count)
-
-
 @writes('mysql', 'sqlite')
 def sql_writer(
     uri,
     replace=False,
     statsd_host='',
     batch_size=3000,
-    batch_time=300,  # in seconds
-    queue_size=100  # Max number of batches pending insertion.
+    batch_time=300
 ):
     """
     Writes to an RDBMS, creating tables for SCIDs and rows for events.
@@ -288,9 +273,8 @@
     :param replace:     If true, INSERT REPLACE will be used.
     :param statsd_host: hostname of statsd instance to which insert stats will
                         be sent.
-    :param batch_size:  Number of events per schema to insert as a batch.
-    :param batch_time:  Number of second to wait before inserting a batch.
-    :param queue_size:  Number of batches waiting to be inserted.
+    :param batch_size:  Max number of events per schema to insert as a batch.
+    :param batch_time:  Max seconds to wait before inserting a batch.
     """
     import sqlalchemy
 
@@ -306,68 +290,43 @@
         uri = uri_delete_query_item(uri, argname)
 
     meta = sqlalchemy.MetaData(bind=uri)
-    # Each scid stores a buffer and the timestamp of the first insertion.
-    events = collections.defaultdict(lambda: ([], time.time()))
-    events_batch = collections.deque()
-    # Since the worker is unaware of the statsd host, create a partial
-    # that binds the statsd client argument to the callback
-    worker = PeriodicThread(interval=DB_FLUSH_INTERVAL,
-                            target=store_sql_events,
-                            args=(meta, events_batch),
-                            kwargs={'replace': replace,
-                                    'on_insert_callback':
-                                        partial(insert_stats, stats)})
-    worker.start()
-
     if meta.bind.dialect.name == 'mysql':
         @sqlalchemy.event.listens_for(sqlalchemy.pool.Pool, 'checkout')
         def ping(dbapi_connection, connection_record, connection_proxy):
             # Just before executing an insert, call mysql_ping() to verify
             # that the connection is alive, and reconnect if necessary.
             dbapi_connection.ping(True)
+
+    # For each SCID (schema, revision) we store an event batch and
+    # the timestamp of the first event.
+    events = collections.defaultdict(lambda: ([], time.time()))
     try:
-        sleep_seconds = 5
-        # Link the main thread to the worker thread so we
-        # don't keep filling the queue if the worker died.
-        while worker.is_alive():
-            # If the queue is too big, wait for the worker to empty it.
-            while len(events_batch) > queue_size:
-                logger.info('Sleeping %d seconds', sleep_seconds)
-                time.sleep(sleep_seconds)
+        while True:
             event = (yield)
-            # Break the event stream by schema (and revision)
+            # Group the event stream by schema (and revision)
             scid = event.scid()
             scid_events, first_timestamp = events[scid]
             scid_events.append(event)
-            if stats:
-                stats.incr('overall.insertAttempted')
-            # Check if the schema queue is too long or too old
+            # Whenever the batch reaches
+            # the size specified by batch_size or it hasn't received events
+            # for more than batch_time seconds it is flushed into mysql.
             if (len(scid_events) >= batch_size or
                     time.time() - first_timestamp >= batch_time):
-                logger.info(
-                    'Queueing %d %s_%s events for insertion',
-                    len(scid_events), scid[0], scid[1]
-                )
-                events_batch.append((scid, scid_events))
+                store_sql_events(meta, scid, scid_events, replace=replace)
+                if stats:
+                    stats.incr('overall.inserted', len(scid_events))
                 del events[scid]
-    except GeneratorExit:
-        # Allow the worker to complete any work that is
-        # already in progress before shutting down.
-        logger.info('Stopped main thread via GeneratorExit')
-        logger.info('Events when stopped %s', len(events))
-        worker.stop()
-        worker.join()
     except Exception:
         t = traceback.format_exc()
         logger.warn('Exception caught %s', t)
         raise
     finally:
-        # If there are any events remaining in the queue,
-        # process them in the main thread before exiting.
+        # If there are any batched events remaining,
+        # process them before exiting.
         for scid, (scid_events, _) in events.iteritems():
-            events_batch.append((scid, scid_events))
-        store_sql_events(meta, events_batch, replace=replace,
-                         on_insert_callback=partial(insert_stats, stats))
+            store_sql_events(meta, scid, scid_events, replace=replace)
+            if stats:
+                stats.incr('overall.inserted', len(scid_events))
 
 
 @writes('file')
diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py
index 74973c1..91a9199 100644
--- a/eventlogging/jrm.py
+++ b/eventlogging/jrm.py
@@ -49,10 +49,6 @@
     }
 }
 
-# How long (in seconds) we should accumulate events before flushing
-# to the database.
-DB_FLUSH_INTERVAL = 2
-
 
 class MediaWikiTimestamp(sqlalchemy.TypeDecorator):
     """A :class:`sqlalchemy.TypeDecorator` for MediaWiki timestamps."""
@@ -231,10 +227,11 @@
     return tuple(sorted(event))
 
 
-def store_sql_events(meta, events_batch, replace=False,
-                     on_insert_callback=None):
-    """Store events in the database.
-    It assumes that the events come broken down by scid."""
+def store_sql_events(meta, scid, scid_events, replace=False):
+    """Store a batch of events in the database.
+    It assumes that all events belong to the same scid."""
+    if len(scid_events) == 0:
+        return
     logger = logging.getLogger('Log')
 
     dialect = meta.bind.dialect
@@ -244,38 +241,33 @@
     else:
         insert = _insert_sequential
 
-    while len(events_batch) > 0:
-        scid, scid_events = events_batch.popleft()
-        prepared_events = [prepare(e) for e in scid_events]
+    # Each event here should be the same schema, so we can
+    # check if the first event should be encapsulated
+    # and use that when constructing the create table
+    # statement in declare_table.
+    should_encapsulate = scid_events[0].should_encapsulate()
 
-        # Each event here should be the same schema, so we can
-        # check if the first event should be encapsulated
-        # and use that when constructing the create table
-        # statement in declare_table.
-        should_encapsulate = scid_events[0].should_encapsulate()
+    prepared_events = [prepare(e) for e in scid_events]
+    # TODO: Avoid breaking the inserts down by same set of fields,
+    # instead force a default NULL, 0 or '' value for optional fields.
+    prepared_events.sort(key=insert_sort_key)
+    for _, grouper in itertools.groupby(prepared_events, insert_sort_key):
+        events = list(grouper)
+        table = get_table(meta, scid, should_encapsulate)
 
-        # TODO: Avoid breaking the inserts down by same set of fields,
-        # instead force a default NULL, 0 or '' value for optional fields.
-        prepared_events.sort(key=insert_sort_key)
-        for _, grouper in itertools.groupby(prepared_events, insert_sort_key):
-            events = list(grouper)
-            table = get_table(meta, scid, should_encapsulate)
+        insert_started_at = time.time()
+        insert(table, events, replace)
+        insert_time_taken = time.time() - insert_started_at
 
-            insert_started_at = time.time()
-            insert(table, events, replace)
-            insert_time_taken = time.time() - insert_started_at
-
-            # The insert operation is all or nothing - either all events have
-            # been inserted successfully (sqlalchemy wraps the insertion in a
-            # transaction), or an exception is thrown and it's not caught
-            # anywhere. This means that if the following line is reached,
-            # len(events) events have been inserted, so we can log it.
-            logger.info(
-                'Inserted %d %s_%s events in %f seconds',
-                len(events), scid[0], scid[1], insert_time_taken
-            )
-            if on_insert_callback:
-                on_insert_callback(len(events))
+        # The insert operation is all or nothing - either all events have
+        # been inserted successfully (sqlalchemy wraps the insertion in a
+        # transaction), or an exception is thrown and it's not caught
+        # anywhere. This means that if the following line is reached,
+        # len(events) events have been inserted, so we can log it.
+        logger.info(
+            'Inserted %d %s_%s events in %f seconds',
+            len(events), scid[0], scid[1], insert_time_taken
+        )
 
 
 def _property_getter(item):
diff --git a/tests/test_jrm.py b/tests/test_jrm.py
index 3af0db7..943573d 100644
--- a/tests/test_jrm.py
+++ b/tests/test_jrm.py
@@ -7,7 +7,6 @@
 
 """
 from __future__ import unicode_literals
-from collections import deque
 
 import datetime
 import itertools
@@ -30,8 +29,8 @@
         """If an attempt is made to store an event for which no table
         exists, the schema is automatically retrieved and a suitable
         table generated."""
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event])
         table_name = TABLE_NAME_FORMAT % TEST_SCHEMA_SCID
         self.assertIn(table_name, self.meta.tables)
         table = self.meta.tables[table_name]
@@ -46,8 +45,8 @@
         """If an attempt is made to store an event with meta (not encapsulated)
         for which no table exists, the schema is automatically retrieved and a
         suitable table generated."""
-        events_batch = deque([(TEST_META_SCHEMA_SCID, [self.event_with_meta])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        eventlogging.store_sql_events(
+            self.meta, TEST_META_SCHEMA_SCID, [self.event_with_meta])
         table_name = TABLE_NAME_FORMAT % TEST_META_SCHEMA_SCID
         self.assertIn(table_name, self.meta.tables)
         table = self.meta.tables[table_name]
@@ -82,8 +81,8 @@
 
     def test_encoding(self):
         """Timestamps and unicode strings are correctly encoded."""
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.jrm.store_sql_events(self.meta, events_batch)
+        eventlogging.jrm.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event])
         table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
         row = table.select().execute().fetchone()
         self.assertEqual(row['event_value'], '☆ 彡')
@@ -96,8 +95,8 @@
     def test_reflection(self):
         """Tables which exist in the database but not in the MetaData cache are
         correctly reflected."""
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event])
 
         # Tell Python to forget everything it knows about this database
         # by purging ``MetaData``. The actual data in the database is
@@ -112,15 +111,16 @@
         # The ``checkfirst`` arg to :func:`sqlalchemy.Table.create`
         # will ensure that we don't attempt to CREATE TABLE on the
         # already-existing table:
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch, True)
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event], True)
         self.assertIn('TestSchema_123', self.meta.tables)
 
     def test_happy_case_insert_more_than_one_event(self):
         """Insert more than one event on database using batch_write"""
         another_event = next(self.event_generator)
-        events_batch = deque([(TEST_SCHEMA_SCID, [another_event, self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        event_list = [another_event, self.event]
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, event_list)
         table = self.meta.tables['TestSchema_123']
         # is the table on the db  and does it have the right data?
         s = sqlalchemy.sql.select([table])
@@ -135,13 +135,13 @@
         insert the other items.
         """
         # insert event
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.jrm.store_sql_events(self.meta, events_batch)
+        eventlogging.jrm.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event])
         # now try to insert list of events in which this event is included
         another_event = next(self.event_generator)
         event_list = [another_event, self.event]
-        events_batch = deque([(TEST_SCHEMA_SCID, event_list)])
-        eventlogging.store_sql_events(self.meta, events_batch, replace=True)
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, event_list, replace=True)
 
         # we should still have to insert the other record though
         table = self.meta.tables['TestSchema_123']
@@ -150,11 +150,11 @@
         rows = results.fetchall()
         self.assertEqual(len(rows), 2)
 
-    def test_event_queue_is_empty(self):
+    def test_event_list_is_empty(self):
         """An empty event queue is handled well
         No exception is raised"""
         event_list = []
-        eventlogging.store_sql_events(self.meta, event_list)
+        eventlogging.store_sql_events(self.meta, TEST_SCHEMA_SCID, event_list)
 
     def test_grouping_of_events_happy_case(self):
         """Events belonging to the same schema with the same
@@ -179,8 +179,8 @@
         set of optional fields are inserted correctly"""
         another_event = next(self.event_generator)
         # ensure both events get inserted?
-        events_batch = deque([(TEST_SCHEMA_SCID, [another_event, self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        event_list = [another_event, self.event]
+        eventlogging.store_sql_events(self.meta, TEST_SCHEMA_SCID, event_list)
         table = self.meta.tables['TestSchema_123']
         # is the table on the db  and does it have the right data?
         s = sqlalchemy.sql.select([table])

-- 
To view, visit https://gerrit.wikimedia.org/r/262738
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: If247ed99b3f828b82295689b2e4ade5d21eb5c04
Gerrit-PatchSet: 8
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to