Ottomata has uploaded a new change for review.
https://gerrit.wikimedia.org/r/261638
Change subject: Fix sql_writer so that meta style (non encapsulated) events work
......................................................................
Fix sql_writer so that meta style (non encapsulated) events work
This also parameterizes some batching settings that previously
had been hardcoded, and allows configuration of the default
MySQL engine via the EVENTLOGGING_MYSQL_ENGINE environment
variable.
Change-Id: Ifc1b6bfe31f6eac3bf61bcb687898f1079e89eb1
---
M eventlogging/handlers.py
M eventlogging/jrm.py
M tests/test_jrm.py
3 files changed, 67 insertions(+), 19 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/eventlogging
refs/changes/38/261638/1
diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index 95b28f4..a2b4efb 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -268,13 +268,31 @@
@writes('mysql', 'sqlite')
-def sql_writer(uri, replace=False, statsd_host=''):
- """Writes to an RDBMS, creating tables for SCIDs and rows for events."""
- import sqlalchemy
+def sql_writer(
+ uri,
+ replace=False,
+ statsd_host='',
+ batch_size=3000,
+ batch_time=300, # in seconds
+ queue_size=100 # Max number of batches pending insertion.
+):
+ """
+ Writes to an RDBMS, creating tables for SCIDs and rows for events.
+ Note that the default MySQL engine is TokuDB. If your MySQL
+ does not support TokuDB, then set the EVENTLOGGING_MYSQL_ENGINE
+ environment variable to the engine you want to use. E.g.
- # Don't pass 'replace' and 'statsd_host' parameter to SQLAlchemy.
- uri = uri_delete_query_item(uri, 'replace')
- uri = uri_delete_query_item(uri, 'statsd_host')
+ export EVENTLOGGING_MYSQL_ENGINE=InnoDB
+
+ :param uri: SQLAlchemy bind URI.
+ :param replace: If true, INSERT REPLACE will be used.
+ :param statsd_host: hostname of statsd instance to which insert stats will
+ be sent.
+ :param batch_size: Number of events per schema to insert as a batch.
+ :param batch_time: Number of second to wait before inserting a batch.
+ :param queue_size: Number of batches waiting to be inserted.
+ """
+ import sqlalchemy
logger = logging.getLogger('Log')
@@ -282,6 +300,10 @@
stats = None
if statsd_host:
stats = statsd.StatsClient(statsd_host, 8125, prefix='eventlogging')
+
+ # Don't pass non SQLAlchemy parameters to SQLAlchemy.
+ for argname in inspect.getargspec(sql_writer)[0]:
+ uri = uri_delete_query_item(uri, argname)
meta = sqlalchemy.MetaData(bind=uri)
# Each scid stores a buffer and the timestamp of the first insertion.
@@ -304,10 +326,6 @@
# that the connection is alive, and reconnect if necessary.
dbapi_connection.ping(True)
try:
- batch_size = 3000
- batch_time = 300 # in seconds
- # Max number of batches pending insertion.
- queue_size = 100
sleep_seconds = 5
# Link the main thread to the worker thread so we
# don't keep filling the queue if the worker died.
diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py
index 27c6d12..74973c1 100644
--- a/eventlogging/jrm.py
+++ b/eventlogging/jrm.py
@@ -14,6 +14,7 @@
import itertools
import logging
import _mysql
+import os
import sqlalchemy
import time
@@ -42,7 +43,9 @@
ENGINE_TABLE_OPTIONS = {
'mysql': {
'mysql_charset': 'utf8',
- 'mysql_engine': 'TokuDB'
+ 'mysql_engine': os.environ.setdefault(
+ 'EVENTLOGGING_MYSQL_ENGINE', 'TokuDB'
+ )
}
}
@@ -121,7 +124,7 @@
return sqlalchemy.Column(**options)
-def get_table(meta, scid):
+def get_table(meta, scid, should_encapsulate=True):
"""Acquire a :class:`sqlalchemy.schema.Table` object for a JSON
Schema specified by `scid`."""
# +---------------------------------+
@@ -156,13 +159,13 @@
try:
return meta.tables[TABLE_NAME_FORMAT % scid]
except KeyError:
- return declare_table(meta, scid)
+ return declare_table(meta, scid, should_encapsulate)
-def declare_table(meta, scid):
+def declare_table(meta, scid, should_encapsulate=True):
"""Map a JSON schema to a SQL table. If the table does not exist in
the database, issue ``CREATE TABLE`` statement."""
- schema = get_schema(scid, encapsulate=True)
+ schema = get_schema(scid, encapsulate=should_encapsulate)
columns = schema_mapper(schema)
@@ -244,12 +247,19 @@
while len(events_batch) > 0:
scid, scid_events = events_batch.popleft()
prepared_events = [prepare(e) for e in scid_events]
+
+ # Each event here should be the same schema, so we can
+ # check if the first event should be encapsulated
+ # and use that when constructing the create table
+ # statement in declare_table.
+ should_encapsulate = scid_events[0].should_encapsulate()
+
# TODO: Avoid breaking the inserts down by same set of fields,
# instead force a default NULL, 0 or '' value for optional fields.
prepared_events.sort(key=insert_sort_key)
for _, grouper in itertools.groupby(prepared_events, insert_sort_key):
events = list(grouper)
- table = get_table(meta, scid)
+ table = get_table(meta, scid, should_encapsulate)
insert_started_at = time.time()
insert(table, events, replace)
diff --git a/tests/test_jrm.py b/tests/test_jrm.py
index 5b7d408..3af0db7 100644
--- a/tests/test_jrm.py
+++ b/tests/test_jrm.py
@@ -17,7 +17,10 @@
import sqlalchemy
import sqlalchemy.sql
-from .fixtures import (DatabaseTestMixin, TEST_SCHEMA_SCID)
+from .fixtures import (
+ DatabaseTestMixin, TEST_SCHEMA_SCID, TEST_META_SCHEMA_SCID
+)
+from eventlogging.jrm import TABLE_NAME_FORMAT
class JrmTestCase(DatabaseTestMixin, unittest.TestCase):
@@ -29,8 +32,9 @@
table generated."""
events_batch = deque([(TEST_SCHEMA_SCID, [self.event])])
eventlogging.store_sql_events(self.meta, events_batch)
- self.assertIn('TestSchema_123', self.meta.tables)
- table = self.meta.tables['TestSchema_123']
+ table_name = TABLE_NAME_FORMAT % TEST_SCHEMA_SCID
+ self.assertIn(table_name, self.meta.tables)
+ table = self.meta.tables[table_name]
# is the table on the db and does it have the right data?
s = sqlalchemy.sql.select([table])
results = self.engine.execute(s)
@@ -38,6 +42,22 @@
# see columns with print table.c
self.assertEqual(row['clientIp'], self.event['clientIp'])
+ def test_lazy_table_creation_with_meta(self):
+ """If an attempt is made to store an event with meta (not encapsulated)
+ for which no table exists, the schema is automatically retrieved and a
+ suitable table generated."""
+ events_batch = deque([(TEST_META_SCHEMA_SCID, [self.event_with_meta])])
+ eventlogging.store_sql_events(self.meta, events_batch)
+ table_name = TABLE_NAME_FORMAT % TEST_META_SCHEMA_SCID
+ self.assertIn(table_name, self.meta.tables)
+ table = self.meta.tables[table_name]
+ # is the table on the db and does it have the right data?
+ s = sqlalchemy.sql.select([table])
+ results = self.engine.execute(s)
+ row = results.fetchone()
+ # see columns with print table.c
+ self.assertEqual(row['meta_id'], self.event_with_meta.id())
+
def test_column_names(self):
"""Generated tables contain columns for each relevant field."""
t = eventlogging.jrm.declare_table(self.meta, TEST_SCHEMA_SCID)
--
To view, visit https://gerrit.wikimedia.org/r/261638
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifc1b6bfe31f6eac3bf61bcb687898f1079e89eb1
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits