Ottomata has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/358062 )
Change subject: Make eventlogging-consumer support a local schema repo, and teach jrm.py how to serialize arrays ...................................................................... Make eventlogging-consumer support a local schema repo, and teach jrm.py how to serialize arrays This will allow us to use eventlogging-consumer to consume from eventbus topics and insert into MySQL Bug: T150369 Change-Id: I78dcc351f737286e5253cec3be76dc87478a020f --- M bin/eventlogging-consumer M eventlogging/jrm.py 2 files changed, 49 insertions(+), 12 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/eventlogging refs/changes/62/358062/1 diff --git a/bin/eventlogging-consumer b/bin/eventlogging-consumer index 7734038..0aa391b 100755 --- a/bin/eventlogging-consumer +++ b/bin/eventlogging-consumer @@ -30,6 +30,7 @@ import logging import eventlogging +import eventlogging.schema eventlogging.setup_logging() @@ -40,11 +41,19 @@ ap.add_argument('output', help='URI of output stream', default='stdout://') ap.add_argument('--no-plugins', help='run without loading plug-ins', action='store_false', dest='load_plugins') + +ap.add_argument( + '--schemas-path', + help='Path to local schema repository', +) ap.set_defaults(load_plugins=True) args = ap.parse_args() if args.load_plugins: eventlogging.load_plugins() +if args.schemas_path: + eventlogging.schema.load_local_schemas(args.schemas_path) + logging.info('Driving %s -> %s..', args.input, args.output) eventlogging.drive(args.input, args.output) diff --git a/eventlogging/jrm.py b/eventlogging/jrm.py index 15a7e0b..3cfd62a 100644 --- a/eventlogging/jrm.py +++ b/eventlogging/jrm.py @@ -15,10 +15,11 @@ import logging import _mysql import os +import re import sqlalchemy import time -from .compat import items +from .compat import items, json from .schema import get_schema from .utils import flatten @@ -30,9 +31,16 @@ # timestamps. See `<https://www.mediawiki.org/wiki/Manual:Timestamp>`_. MEDIAWIKI_TIMESTAMP = '%Y%m%d%H%M%S' -# Format string for table names. Interpolates a `SCID` -- i.e., a tuple -# of (schema_name, revision_id). -TABLE_NAME_FORMAT = '%s_%s' + + +def scid_to_table_name(scid): + # Format string for table names. Interpolates a `SCID` -- i.e., a tuple + # of (schema_name, revision_id). + return '{}_{}'.format( + re.sub('[^A-Za-z0-9]+', '_', scid[0]), + scid[1] + ) + # An iterable of properties that should not be stored in the database. NO_DB_PROPERTIES = ( @@ -48,6 +56,14 @@ ) } } + +# Maximum length for string and string-like types. Because InnoDB limits index +# columns to 767 bytes, the maximum length for a utf8mb4 column (which +# reserves up to four bytes per character) is 191 (191 * 4 = 764). +STRING_MAX_LEN = 1024 + +# Default table column definition, to be overridden by mappers below. +COLUMN_DEFAULTS = {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)} class MediaWikiTimestamp(sqlalchemy.TypeDecorator): @@ -73,13 +89,22 @@ return datetime.datetime.strptime(value, MEDIAWIKI_TIMESTAMP) -# Maximum length for string and string-like types. Because InnoDB limits index -# columns to 767 bytes, the maximum length for a utf8mb4 column (which -# reserves up to four bytes per character) is 191 (191 * 4 = 764). -STRING_MAX_LEN = 1024 +class JsonSerde(sqlalchemy.TypeDecorator): + """A :class:`sqlalchemy.TypeDecorator` for converting to and from JSON strings.""" -# Default table column definition, to be overridden by mappers below. -COLUMN_DEFAULTS = {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)} + impl = sqlalchemy.Unicode(STRING_MAX_LEN) + + def process_bind_param(self, value, dialect=None): + """Convert the value to a JSON string""" + value = json.dumps(value) + if hasattr(value, 'decode'): + value = value.decode('utf-8') + return value + + def process_result_value(self, value, dialect=None): + """Convert a JSON string into a Python object""" + return json.loads(value) + # Mapping of JSON Schema attributes to valid values. Each value maps to # a dictionary of options. The options are compounded into a single @@ -97,6 +122,8 @@ 'integer': {'type_': sqlalchemy.BigInteger}, 'number': {'type_': sqlalchemy.Float}, 'string': {'type_': sqlalchemy.Unicode(STRING_MAX_LEN)}, + # Encode arrays as JSON strings. + 'array': {'type_': JsonSerde}, }), ('format', { 'utc-millisec': {'type_': MediaWikiTimestamp, 'index': True}, @@ -152,8 +179,9 @@ # | | +-------------+------------+ # +-----------------+-------->| Return table description | # +--------------------------+ + table_name = scid_to_table_name(scid) try: - return meta.tables[TABLE_NAME_FORMAT % scid] + return meta.tables[table_name] except KeyError: return declare_table(meta, scid, should_encapsulate) @@ -166,7 +194,7 @@ columns = schema_mapper(schema) table_options = ENGINE_TABLE_OPTIONS.get(meta.bind.name, {}) - table_name = TABLE_NAME_FORMAT % scid + table_name = scid_to_table_name(scid) table = sqlalchemy.Table(table_name, meta, *columns, **table_options) table.create(checkfirst=True) -- To view, visit https://gerrit.wikimedia.org/r/358062 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I78dcc351f737286e5253cec3be76dc87478a020f Gerrit-PatchSet: 1 Gerrit-Project: eventlogging Gerrit-Branch: master Gerrit-Owner: Ottomata <ao...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits