Ori.livneh has uploaded a new change for review.
https://gerrit.wikimedia.org/r/181769
Change subject: [WIP] Add client API
......................................................................
[WIP] Add client API
Change-Id: I3ce7837b15855d077424382c15fbf765441f5f92
---
M server/eventlogging/jrm.py
M server/eventlogging/utils.py
M server/tests/test_jrm.py
3 files changed, 106 insertions(+), 27 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging
refs/changes/69/181769/1
diff --git a/server/eventlogging/jrm.py b/server/eventlogging/jrm.py
index a83953b..ef4d824 100644
--- a/server/eventlogging/jrm.py
+++ b/server/eventlogging/jrm.py
@@ -15,11 +15,12 @@
import sqlalchemy
-from .schema import get_schema
from .compat import items
+from .schema import get_schema
+from .utils import flatten
-__all__ = ('store_sql_events', 'flatten')
+__all__ = ('store_sql_events',)
# Format string for :func:`datetime.datetime.strptime` for MediaWiki
@@ -242,21 +243,6 @@
for prop in NO_DB_PROPERTIES:
event.pop(prop, None)
return event
-
-
-def flatten(d, sep='_', f=None):
- """Collapse a nested dictionary. `f` specifies an optional mapping
- function to apply to each (key, value) pair."""
- flat = []
- for k, v in items(d):
- if f is not None:
- (k, v) = f((k, v))
- if isinstance(v, dict):
- nested = items(flatten(v, sep, f))
- flat.extend((k + sep + nk, nv) for nk, nv in nested)
- else:
- flat.append((k, v))
- return dict(flat)
def column_sort_key(column):
diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py
index 35aef5d..124ee0c 100644
--- a/server/eventlogging/utils.py
+++ b/server/eventlogging/utils.py
@@ -9,15 +9,19 @@
"""
from __future__ import unicode_literals
-import sys
-import re
-import threading
+import copy
import logging
+import re
+import sys
+import threading
-from .compat import monotonic_clock
+from .compat import items, monotonic_clock
+from .factory import get_reader
-__all__ = ('PeriodicThread', 'uri_delete_query_item', 'setup_logging')
+__all__ = ('EventConsumer', 'PeriodicThread', 'flatten', 'is_subset_dict',
+ 'setup_logging', 'unflatten', 'update_recursive',
+ 'uri_delete_query_item')
class PeriodicThread(threading.Thread):
@@ -56,13 +60,102 @@
def uri_delete_query_item(uri, key):
- """Delete a key=value pair (specified by key) from a URI's query string."""
+ """Delete a key-value pair (specified by key) from a URI's query string."""
def repl(match):
separator, trailing_ampersand = match.groups()
return separator if trailing_ampersand else ''
return re.sub('([?&])%s=[^&]*(&?)' % re.escape(key), repl, uri)
+def is_subset_dict(a, b):
+ """True if every key-value pair in `a` is also in `b`.
+ Values in `a` which are themselves dictionaries are tested
+ by recursively calling :func:`is_subset_dict`."""
+ for key, a_value in items(a):
+ try:
+ b_value = b[key]
+ except KeyError:
+ return False
+ if isinstance(a_value, dict) and isinstance(b_value, dict):
+ if not is_subset_dict(a_value, b_value):
+ return False
+ elif a_value != b_value:
+ return False
+ return True
+
+
+def update_recursive(d, other):
+ """Recursively update a dict with items from another dict."""
+ for key, val in items(other):
+ if isinstance(val, dict):
+ val = update_recursive(d.get(key, {}), val)
+ d[key] = val
+ return d
+
+
+def flatten(d, sep='_', f=None):
+ """Collapse a nested dictionary. `f` specifies an optional mapping
+ function to apply to each key, value pair. This function is the inverse
+ of :func:`unflatten`."""
+ flat = []
+ for k, v in items(d):
+ if f is not None:
+ (k, v) = f((k, v))
+ if isinstance(v, dict):
+ nested = items(flatten(v, sep, f))
+ flat.extend((k + sep + nk, nv) for nk, nv in nested)
+ else:
+ flat.append((k, v))
+ return dict(flat)
+
+
+def unflatten(d, sep='_', f=None):
+ """Expand a flattened dictionary. Keys containing `sep` are split into
+ nested key selectors. `f` specifies an optional mapping function to apply
+ to each key-value pair. This function is the inverse of :func:`flatten`."""
+ unflat = {}
+ for k, v in items(d):
+ if f is not None:
+ (k, v) = f((k, v))
+ while sep in k:
+ k, nested_k = k.split(sep, 1)
+ v = {nested_k: v}
+ if isinstance(v, dict):
+ v = unflatten(v, sep)
+ update_recursive(unflat, {k: v})
+ return unflat
+
+
+class EventConsumer(object):
+ """An EventLogging consumer API for standalone scripts.
+
+ .. code-block::
+
+ event_stream = eventlogging.EventConsumer('tcp://localhost:8600')
+ for event in event_stream.filter(schema='NavigationTiming'):
+ print(event)
+
+ """
+
+ def __init__(self, url):
+ self.url = url
+ self.conditions = {}
+
+ def filter(self, **conditions):
+ """Return a copy of this consumer that will filter events based
+ on conditions expressed as keyword arguments."""
+ update_recursive(conditions, self.conditions)
+ filtered = copy.copy(self)
+ filtered.conditions = conditions
+ return filtered
+
+ def __iter__(self):
+ """Iterate events matching the filter."""
+ for event in get_reader(self.url):
+ if is_subset_dict(self.conditions, event):
+ yield event
+
+
def setup_logging():
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG,
format='%(asctime)s %(message)s')
diff --git a/server/tests/test_jrm.py b/server/tests/test_jrm.py
index 87d6b7b..ed0376c 100644
--- a/server/tests/test_jrm.py
+++ b/server/tests/test_jrm.py
@@ -41,9 +41,9 @@
t = eventlogging.jrm.declare_table(self.meta, TEST_SCHEMA_SCID)
# The columns we expect to see are..
- cols = set(eventlogging.jrm.flatten(self.event)) # all properties
- cols -= set(eventlogging.jrm.NO_DB_PROPERTIES) # unless excluded
- cols |= {'id', 'uuid'} # plus 'id' & 'uuid'
+ cols = set(eventlogging.utils.flatten(self.event)) # all properties
+ cols -= set(eventlogging.jrm.NO_DB_PROPERTIES) # unless excluded
+ cols |= {'id', 'uuid'} # plus id & uuid
self.assertSetEqual(set(t.columns.keys()), cols)
@@ -55,7 +55,7 @@
def test_flatten(self):
"""``flatten`` correctly collapses deeply nested maps."""
- flat = eventlogging.jrm.flatten(self.event)
+ flat = eventlogging.utils.flatten(self.event)
self.assertEqual(flat['event_nested_deeplyNested_pi'], 3.14159)
def test_encoding(self):
--
To view, visit https://gerrit.wikimedia.org/r/181769
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3ce7837b15855d077424382c15fbf765441f5f92
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits