Mforns has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/262738

Change subject: Remove queue of event batches from mysql consumer
......................................................................

Remove queue of event batches from mysql consumer

Since the mysql consumer uses Kafka, there is no more need
to have the deque of event batches, nor the worker thread.
The mysql writer can block while inserting and let Kafka
buffer the incomming events. Note that the mysql consumer
still groups (batches) the events by schema to insert them
together in a single insert statement, but it does not queue
them for insertion, rather inserts them directly when the
batch is big enough (or old enough).

Bug: T121151
Change-Id: If247ed99b3f828b82295689b2e4ade5d21eb5c04
---
M eventlogging/handlers.py
M eventlogging/jrm.py
M eventlogging/utils.py
M tests/test_jrm.py
4 files changed, 69 insertions(+), 163 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/eventlogging 
refs/changes/38/262738/1

diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index a7c8993..db308a2 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -15,8 +15,6 @@
 import imp
 import inspect
 
-from functools import partial
-
 import logging
 import logging.handlers
 import os
@@ -29,10 +27,10 @@
 import uuid
 
 from .compat import items, json
-from .utils import PeriodicThread, uri_delete_query_item
+from .utils import uri_delete_query_item
 from .factory import writes, reads
 from .streams import stream, pub_socket, sub_socket, udp_socket
-from .jrm import store_sql_events, DB_FLUSH_INTERVAL
+from .jrm import store_sql_events
 from .topic import TopicNotFound
 
 __all__ = ('load_plugins',)
@@ -255,27 +253,10 @@
                 kafka_producer.send_messages(message_topic, message)
 
 
-def insert_stats(stats, inserted_count):
-    """
-    Callback function to increment mysql inserted metric in statsd,
-    that is called after successful insertion of events into mysql.
-
-        stats           - Instance of stats.StatsClient
-        inserted_count  - Number of events that have been inserted
-    """
-    if stats:
-        stats.incr('overall.inserted', inserted_count)
-
-
 @writes('mysql', 'sqlite')
 def sql_writer(uri, replace=False, statsd_host=''):
     """Writes to an RDBMS, creating tables for SCIDs and rows for events."""
     import sqlalchemy
-
-    # Don't pass 'replace' and 'statsd_host' parameter to SQLAlchemy.
-    uri = uri_delete_query_item(uri, 'replace')
-    uri = uri_delete_query_item(uri, 'statsd_host')
-
     logger = logging.getLogger('Log')
 
     # Create a statsd client instance if statsd_host is specified
@@ -283,73 +264,53 @@
     if statsd_host:
         stats = statsd.StatsClient(statsd_host, 8125, prefix='eventlogging')
 
-    meta = sqlalchemy.MetaData(bind=uri)
-    # Each scid stores a buffer and the timestamp of the first insertion.
-    events = collections.defaultdict(lambda: ([], time.time()))
-    events_batch = collections.deque()
-    # Since the worker is unaware of the statsd host, create a partial
-    # that binds the statsd client argument to the callback
-    worker = PeriodicThread(interval=DB_FLUSH_INTERVAL,
-                            target=store_sql_events,
-                            args=(meta, events_batch),
-                            kwargs={'replace': replace,
-                                    'on_insert_callback':
-                                        partial(insert_stats, stats)})
-    worker.start()
+    # Don't pass 'replace' and 'statsd_host' parameter to SQLAlchemy.
+    uri = uri_delete_query_item(uri, 'replace')
+    uri = uri_delete_query_item(uri, 'statsd_host')
 
+    meta = sqlalchemy.MetaData(bind=uri)
     if meta.bind.dialect.name == 'mysql':
         @sqlalchemy.event.listens_for(sqlalchemy.pool.Pool, 'checkout')
         def ping(dbapi_connection, connection_record, connection_proxy):
             # Just before executing an insert, call mysql_ping() to verify
             # that the connection is alive, and reconnect if necessary.
             dbapi_connection.ping(True)
+
+    # For each SCID (schema, revision) we store an event batch and
+    # the timestamp of the first event. Whenever the batch reaches
+    # the size specified by batch_size or it hasn't received events
+    # for more than batch_time seconds it is flushed into mysql.
+    events = collections.defaultdict(lambda: ([], time.time()))
+    batch_size = 5000
+    batch_time = 300
+
     try:
-        batch_size = 5000
-        batch_time = 300  # in seconds
-        # Max number of batches pending insertion.
-        queue_size = 1000
-        sleep_seconds = 5
-        # Link the main thread to the worker thread so we
-        # don't keep filling the queue if the worker died.
-        while worker.is_alive():
-            # If the queue is too big, wait for the worker to empty it.
-            while len(events_batch) > queue_size:
-                logger.info('Sleeping %d seconds', sleep_seconds)
-                time.sleep(sleep_seconds)
+        while True:
             event = (yield)
-            # Break the event stream by schema (and revision)
+            # Group the event stream by schema (and revision)
             scid = (event['schema'], event['revision'])
             scid_events, first_timestamp = events[scid]
             scid_events.append(event)
             if stats:
-                stats.incr('overall.insertAttempted')
+                stats.incr('overall.batched')
             # Check if the schema queue is too long or too old
             if (len(scid_events) >= batch_size or
                     time.time() - first_timestamp >= batch_time):
-                logger.info(
-                    'Queueing %d %s_%s events for insertion',
-                    len(scid_events), scid[0], scid[1]
-                )
-                events_batch.append((scid, scid_events))
+                store_sql_events(meta, scid, scid_events, replace=replace)
+                if stats:
+                    stats.incr('overall.inserted', len(scid_events))
                 del events[scid]
-    except GeneratorExit:
-        # Allow the worker to complete any work that is
-        # already in progress before shutting down.
-        logger.info('Stopped main thread via GeneratorExit')
-        logger.info('Events when stopped %s', len(events))
-        worker.stop()
-        worker.join()
     except Exception:
         t = traceback.format_exc()
         logger.warn('Exception caught %s', t)
         raise
     finally:
-        # If there are any events remaining in the queue,
-        # process them in the main thread before exiting.
+        # If there are any batched events remaining,
+        # process them before exiting.
         for scid, (scid_events, _) in events.iteritems():
-            events_batch.append((scid, scid_events))
-        store_sql_events(meta, events_batch, replace=replace,
-                         on_insert_callback=partial(insert_stats, stats))
+            store_sql_events(meta, scid, scid_events, replace=replace)
+            if stats:
+                stats.incr('overall.inserted', len(scid_events))
 
 
 @writes('file')
diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py
index 27c6d12..0543c79 100644
--- a/eventlogging/jrm.py
+++ b/eventlogging/jrm.py
@@ -46,10 +46,6 @@
     }
 }
 
-# How long (in seconds) we should accumulate events before flushing
-# to the database.
-DB_FLUSH_INTERVAL = 2
-
 
 class MediaWikiTimestamp(sqlalchemy.TypeDecorator):
     """A :class:`sqlalchemy.TypeDecorator` for MediaWiki timestamps."""
@@ -228,10 +224,9 @@
     return tuple(sorted(event))
 
 
-def store_sql_events(meta, events_batch, replace=False,
-                     on_insert_callback=None):
-    """Store events in the database.
-    It assumes that the events come broken down by scid."""
+def store_sql_events(meta, scid, scid_events, replace=False):
+    """Store a batch of events in the database.
+    It assumes that all events belong to the same scid."""
     logger = logging.getLogger('Log')
 
     dialect = meta.bind.dialect
@@ -241,31 +236,27 @@
     else:
         insert = _insert_sequential
 
-    while len(events_batch) > 0:
-        scid, scid_events = events_batch.popleft()
-        prepared_events = [prepare(e) for e in scid_events]
-        # TODO: Avoid breaking the inserts down by same set of fields,
-        # instead force a default NULL, 0 or '' value for optional fields.
-        prepared_events.sort(key=insert_sort_key)
-        for _, grouper in itertools.groupby(prepared_events, insert_sort_key):
-            events = list(grouper)
-            table = get_table(meta, scid)
+    prepared_events = [prepare(e) for e in scid_events]
+    # TODO: Avoid breaking the inserts down by same set of fields,
+    # instead force a default NULL, 0 or '' value for optional fields.
+    prepared_events.sort(key=insert_sort_key)
+    for _, grouper in itertools.groupby(prepared_events, insert_sort_key):
+        events = list(grouper)
+        table = get_table(meta, scid)
 
-            insert_started_at = time.time()
-            insert(table, events, replace)
-            insert_time_taken = time.time() - insert_started_at
+        insert_started_at = time.time()
+        insert(table, events, replace)
+        insert_time_taken = time.time() - insert_started_at
 
-            # The insert operation is all or nothing - either all events have
-            # been inserted successfully (sqlalchemy wraps the insertion in a
-            # transaction), or an exception is thrown and it's not caught
-            # anywhere. This means that if the following line is reached,
-            # len(events) events have been inserted, so we can log it.
-            logger.info(
-                'Inserted %d %s_%s events in %f seconds',
-                len(events), scid[0], scid[1], insert_time_taken
-            )
-            if on_insert_callback:
-                on_insert_callback(len(events))
+        # The insert operation is all or nothing - either all events have
+        # been inserted successfully (sqlalchemy wraps the insertion in a
+        # transaction), or an exception is thrown and it's not caught
+        # anywhere. This means that if the following line is reached,
+        # len(events) events have been inserted, so we can log it.
+        logger.info(
+            'Inserted %d %s_%s events in %f seconds',
+            len(events), scid[0], scid[1], insert_time_taken
+        )
 
 
 def _property_getter(item):
diff --git a/eventlogging/utils.py b/eventlogging/utils.py
index dc6f12e..a6a52e5 100644
--- a/eventlogging/utils.py
+++ b/eventlogging/utils.py
@@ -16,65 +16,19 @@
 import re
 import os
 import sys
-import threading
-import traceback
 
 from .compat import (
-    items, monotonic_clock, urisplit, urlencode, parse_qsl,
+    items, urisplit, urlencode, parse_qsl,
     integer_types, string_types, long
 )
 from .factory import get_reader, cast_string
 from logging.config import fileConfig
 
 
-__all__ = ('EventConsumer', 'PeriodicThread', 'flatten', 'is_subset_dict',
+__all__ = ('EventConsumer', 'flatten', 'is_subset_dict',
            'setup_logging', 'unflatten', 'update_recursive',
            'uri_delete_query_item', 'uri_append_query_items', 'uri_force_raw',
            'parse_etcd_uri', 'datetime_from_uuid1', 'datetime_from_timestamp')
-
-
-class PeriodicThread(threading.Thread):
-    """Represents a threaded job that runs repeatedly at a regular interval."""
-
-    def __init__(self, interval, *args, **kwargs):
-        self.interval = interval
-        self.ready = threading.Event()
-        self.stopping = threading.Event()
-        self.logger = logging.getLogger('Log')
-        super(PeriodicThread, self).__init__(*args, **kwargs)
-
-    def run(self):
-        while not self.stopping.is_set():
-            try:
-                # Run the target function. Check the clock before
-                # and after to determine how long it took to run.
-                time_start = monotonic_clock()
-                self._Thread__target(*self._Thread__args,
-                                     **self._Thread__kwargs)
-                time_stop = monotonic_clock()
-
-                run_duration = time_stop - time_start
-
-                # Subtract the time it took the target function to run
-                # from the desired run interval. The result is how long
-                # we have to sleep before the next run.
-                time_to_next_run = self.interval - run_duration
-                self.logger.debug('Run duration of thread execution: %s',
-                                  str(run_duration))
-                if self.ready.wait(time_to_next_run):
-                    # If the internal flag of `self.ready` was set, we were
-                    # interrupted mid-nap to run immediately. But before we
-                    # do, we reset the flag.
-                    self.ready.clear()
-            except Exception as e:
-                trace = traceback.format_exc()
-                self.logger.warn('Child thread exiting, exception %s', trace)
-                raise e
-
-    def stop(self):
-        """Graceful stop: stop once the current iteration is complete."""
-        self.stopping.set()
-        self.logger.info('Stopping child thread gracefully')
 
 
 def uri_delete_query_item(uri, key):
diff --git a/tests/test_jrm.py b/tests/test_jrm.py
index 5b7d408..43b66a8 100644
--- a/tests/test_jrm.py
+++ b/tests/test_jrm.py
@@ -7,7 +7,6 @@
 
 """
 from __future__ import unicode_literals
-from collections import deque
 
 import datetime
 import itertools
@@ -27,8 +26,8 @@
         """If an attempt is made to store an event for which no table
         exists, the schema is automatically retrieved and a suitable
         table generated."""
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event])
         self.assertIn('TestSchema_123', self.meta.tables)
         table = self.meta.tables['TestSchema_123']
         # is the table on the db  and does it have the right data?
@@ -62,8 +61,8 @@
 
     def test_encoding(self):
         """Timestamps and unicode strings are correctly encoded."""
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.jrm.store_sql_events(self.meta, events_batch)
+        eventlogging.jrm.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event])
         table = eventlogging.jrm.get_table(self.meta, TEST_SCHEMA_SCID)
         row = table.select().execute().fetchone()
         self.assertEqual(row['event_value'], '☆ 彡')
@@ -76,8 +75,8 @@
     def test_reflection(self):
         """Tables which exist in the database but not in the MetaData cache are
         correctly reflected."""
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event])
 
         # Tell Python to forget everything it knows about this database
         # by purging ``MetaData``. The actual data in the database is
@@ -92,15 +91,16 @@
         # The ``checkfirst`` arg to :func:`sqlalchemy.Table.create`
         # will ensure that we don't attempt to CREATE TABLE on the
         # already-existing table:
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch, True)
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event], True)
         self.assertIn('TestSchema_123', self.meta.tables)
 
     def test_happy_case_insert_more_than_one_event(self):
         """Insert more than one event on database using batch_write"""
         another_event = next(self.event_generator)
-        events_batch = deque([(TEST_SCHEMA_SCID, [another_event, self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        event_list = [another_event, self.event]
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, event_list)
         table = self.meta.tables['TestSchema_123']
         # is the table on the db  and does it have the right data?
         s = sqlalchemy.sql.select([table])
@@ -115,13 +115,13 @@
         insert the other items.
         """
         # insert event
-        events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
-        eventlogging.jrm.store_sql_events(self.meta, events_batch)
+        eventlogging.jrm.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, [self.event])
         # now try to insert list of events in which this event is included
         another_event = next(self.event_generator)
         event_list = [another_event, self.event]
-        events_batch = deque([(TEST_SCHEMA_SCID, event_list)])
-        eventlogging.store_sql_events(self.meta, events_batch, replace=True)
+        eventlogging.store_sql_events(
+            self.meta, TEST_SCHEMA_SCID, event_list, replace=True)
 
         # we should still have to insert the other record though
         table = self.meta.tables['TestSchema_123']
@@ -130,11 +130,11 @@
         rows = results.fetchall()
         self.assertEqual(len(rows), 2)
 
-    def test_event_queue_is_empty(self):
+    def test_event_list_is_empty(self):
         """An empty event queue is handled well
         No exception is raised"""
         event_list = []
-        eventlogging.store_sql_events(self.meta, event_list)
+        eventlogging.store_sql_events(self.meta, TEST_SCHEMA_SCID, event_list)
 
     def test_grouping_of_events_happy_case(self):
         """Events belonging to the same schema with the same
@@ -159,8 +159,8 @@
         set of optional fields are inserted correctly"""
         another_event = next(self.event_generator)
         # ensure both events get inserted?
-        events_batch = deque([(TEST_SCHEMA_SCID, [another_event, self.event])])
-        eventlogging.store_sql_events(self.meta, events_batch)
+        event_list = [another_event, self.event]
+        eventlogging.store_sql_events(self.meta, TEST_SCHEMA_SCID, event_list)
         table = self.meta.tables['TestSchema_123']
         # is the table on the db  and does it have the right data?
         s = sqlalchemy.sql.select([table])

-- 
To view, visit https://gerrit.wikimedia.org/r/262738
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If247ed99b3f828b82295689b2e4ade5d21eb5c04
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to