Ottomata has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/358062 )

Change subject: Make eventlogging-consumer support a local schema repo, and 
teach jrm.py how to serialize arrays
......................................................................

Make eventlogging-consumer support a local schema repo, and teach jrm.py how to 
serialize arrays

This will allow us to use eventlogging-consumer to consume from eventbus topics 
and insert into MySQL

Bug: T150369
Change-Id: I78dcc351f737286e5253cec3be76dc87478a020f
---
M bin/eventlogging-consumer
M eventlogging/jrm.py
2 files changed, 49 insertions(+), 12 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/eventlogging 
refs/changes/62/358062/1

diff --git a/bin/eventlogging-consumer b/bin/eventlogging-consumer
index 7734038..0aa391b 100755
--- a/bin/eventlogging-consumer
+++ b/bin/eventlogging-consumer
@@ -30,6 +30,7 @@
 import logging
 
 import eventlogging
+import eventlogging.schema
 
 
 eventlogging.setup_logging()
@@ -40,11 +41,19 @@
 ap.add_argument('output', help='URI of output stream', default='stdout://')
 ap.add_argument('--no-plugins', help='run without loading plug-ins',
                 action='store_false', dest='load_plugins')
+
+ap.add_argument(
+    '--schemas-path',
+    help='Path to local schema repository',
+)
 ap.set_defaults(load_plugins=True)
 args = ap.parse_args()
 
 if args.load_plugins:
     eventlogging.load_plugins()
 
+if args.schemas_path:
+    eventlogging.schema.load_local_schemas(args.schemas_path)
+
 logging.info('Driving %s -> %s..', args.input, args.output)
 eventlogging.drive(args.input, args.output)
diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py
index 15a7e0b..3cfd62a 100644
--- a/eventlogging/jrm.py
+++ b/eventlogging/jrm.py
@@ -15,10 +15,11 @@
 import logging
 import _mysql
 import os
+import re
 import sqlalchemy
 import time
 
-from .compat import items
+from .compat import items, json
 from .schema import get_schema
 from .utils import flatten
 
@@ -30,9 +31,16 @@
 # timestamps. See `<https://www.mediawiki.org/wiki/Manual:Timestamp>`_.
 MEDIAWIKI_TIMESTAMP = '%Y%m%d%H%M%S'
 
-# Format string for table names. Interpolates a `SCID` -- i.e., a tuple
-# of (schema_name, revision_id).
-TABLE_NAME_FORMAT = '%s_%s'
+
+
+def scid_to_table_name(scid):
+    # Format string for table names. Interpolates a `SCID` -- i.e., a tuple
+    # of (schema_name, revision_id).
+    return '{}_{}'.format(
+        re.sub('[^A-Za-z0-9]+', '_', scid[0]),
+        scid[1]
+    )
+
 
 # An iterable of properties that should not be stored in the database.
 NO_DB_PROPERTIES = (
@@ -48,6 +56,14 @@
         )
     }
 }
+
+# Maximum length for string and string-like types. Because InnoDB limits index
+# columns to 767 bytes, the maximum length for a utf8mb4 column (which
+# reserves up to four bytes per character) is 191 (191 * 4 = 764).
+STRING_MAX_LEN = 1024
+
+# Default table column definition, to be overridden by mappers below.
+COLUMN_DEFAULTS = {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)}
 
 
 class MediaWikiTimestamp(sqlalchemy.TypeDecorator):
@@ -73,13 +89,22 @@
         return datetime.datetime.strptime(value, MEDIAWIKI_TIMESTAMP)
 
 
-# Maximum length for string and string-like types. Because InnoDB limits index
-# columns to 767 bytes, the maximum length for a utf8mb4 column (which
-# reserves up to four bytes per character) is 191 (191 * 4 = 764).
-STRING_MAX_LEN = 1024
+class JsonSerde(sqlalchemy.TypeDecorator):
+    """A :class:`sqlalchemy.TypeDecorator` for converting to and from JSON 
strings."""
 
-# Default table column definition, to be overridden by mappers below.
-COLUMN_DEFAULTS = {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)}
+    impl = sqlalchemy.Unicode(STRING_MAX_LEN)
+
+    def process_bind_param(self, value, dialect=None):
+        """Convert the value to a JSON string"""
+        value = json.dumps(value)
+        if hasattr(value, 'decode'):
+            value = value.decode('utf-8')
+        return value
+
+    def process_result_value(self, value, dialect=None):
+        """Convert a JSON string into a Python object"""
+        return json.loads(value)
+
 
 # Mapping of JSON Schema attributes to valid values. Each value maps to
 # a dictionary of options. The options are compounded into a single
@@ -97,6 +122,8 @@
         'integer': {'type_': sqlalchemy.BigInteger},
         'number': {'type_': sqlalchemy.Float},
         'string': {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)},
+        # Encode arrays as JSON strings.
+        'array': {'type_': JsonSerde},
     }),
     ('format', {
         'utc-millisec': {'type_': MediaWikiTimestamp, 'index': True},
@@ -152,8 +179,9 @@
     #       |                 |         +-------------+------------+
     #       +-----------------+-------->| Return table description |
     #                                   +--------------------------+
+    table_name = scid_to_table_name(scid)
     try:
-        return meta.tables[TABLE_NAME_FORMAT % scid]
+        return meta.tables[table_name]
     except KeyError:
         return declare_table(meta, scid, should_encapsulate)
 
@@ -166,7 +194,7 @@
     columns = schema_mapper(schema)
 
     table_options = ENGINE_TABLE_OPTIONS.get(meta.bind.name, {})
-    table_name = TABLE_NAME_FORMAT % scid
+    table_name = scid_to_table_name(scid)
 
     table = sqlalchemy.Table(table_name, meta, *columns, **table_options)
     table.create(checkfirst=True)

-- 
To view, visit https://gerrit.wikimedia.org/r/358062
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I78dcc351f737286e5253cec3be76dc87478a020f
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <ao...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to