Ottomata has uploaded a new change for review.
https://gerrit.wikimedia.org/r/278979
Change subject: Add --topic-prefix to eventlogging-service
......................................................................
Add --topic-prefix to eventlogging-service
This also adds extra tests and logic to allow for tests
that actually write data to configured writers.
Add a few more tests for testing Event.factory()
Change-Id: I682fda2abf93733d5ce20a79effd3af153bceb34
---
M bin/eventlogging-service
M eventlogging/event.py
M eventlogging/service.py
M tests/test_event.py
M tests/test_service.py
5 files changed, 196 insertions(+), 9 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/eventlogging
refs/changes/79/278979/1
diff --git a/bin/eventlogging-service b/bin/eventlogging-service
index 6d520aa..6cb83d4 100755
--- a/bin/eventlogging-service
+++ b/bin/eventlogging-service
@@ -47,6 +47,12 @@
)
ap.add_argument(
+ '--topic-prefix',
+ default=None,
+ help='If given, each event\'s topic will be prefixed with this before '
+ 'being sent to configured writers.',
+)
+ap.add_argument(
'--log-config',
default=None,
help='Python logging config file',
@@ -85,6 +91,10 @@
signal.signal(signal.SIGHUP, sighup_handler)
- service = EventLoggingService(args.output, args.error_output)
+ service = EventLoggingService(
+ args.output,
+ args.error_output,
+ args.topic_prefix,
+ )
# Start listening for requests.
service.start(args.port, args.num_processes)
diff --git a/eventlogging/event.py b/eventlogging/event.py
index e2ffd49..94e5df5 100644
--- a/eventlogging/event.py
+++ b/eventlogging/event.py
@@ -44,14 +44,21 @@
@staticmethod
def factory(data):
"""
- Given a JSON string, dict, or list of dicts,
+ Given an open file, JSON string, dict, or list of dicts,
this function will convert it to the corresponding
Event(s). That is, a JSON string dict or a dict
will be returned as an Event, and a JSON string list
or a list will be assumed to be a list of dicts
- and will be returned as a list of Events.
+ and will be returned as a list of Events. If a file
+ object is given, its contents will be read. The contents
+ are assumed to be valid JSON.
"""
- # If we are given a string, assume it is JSON.
+ # Else if data has a read method, assume it is an open file object.
+ # Read the data out of it.
+ if hasattr(data, 'read'):
+ data = data.read()
+
+ # If we've got a string, assume it is JSON.
if isinstance(data, string_types) or isinstance(data, bytes):
data = json.loads(data.decode('utf-8'))
@@ -90,7 +97,7 @@
"""
Returns the topic for this event.
- This will return the topic to use fopr this event.
+ This will return the topic to use for this event.
If topic_format is None, then the value of 'topic' in the event
meta data will be used.
Otherwise the event wil be formatted with topic_format.
@@ -205,6 +212,11 @@
return not self.has_meta_subobject()
def set_scid(self, scid):
+ """
+ Sets the scid for this event.
+ - schema_uri if meta subject style
+ - schema and revision if EventCapsule style
+ """
if self.has_meta_subobject():
# Set schema uri to a simple name/revision uri.
self['meta']['schema_uri'] = scid[0] + '/' + str(scid[1])
@@ -212,6 +224,16 @@
self['schema'] = scid[0]
self['revision'] = scid[1]
+ def set_topic(self, topic):
+ """
+ Sets the topic for this event.
+ """
+ if self.has_meta_subobject():
+ # Set schema uri to a simple name/revision uri.
+ self['meta']['topic'] = topic
+ else:
+ self['topic'] = topic
+
def create_event_error(
raw_event,
diff --git a/eventlogging/service.py b/eventlogging/service.py
index 50c49eb..fd1b1c7 100644
--- a/eventlogging/service.py
+++ b/eventlogging/service.py
@@ -79,7 +79,7 @@
until your event is ACKed by Kafka.
"""
- def __init__(self, writer_uris, error_writer_uri=None):
+ def __init__(self, writer_uris, error_writer_uri=None, topic_prefix=None):
"""
Note: you should call init_schemas_and_topic_config()
before you instantiate an EventLoggingService.
@@ -89,6 +89,9 @@
:param error_writer_uri: If configured, EventErrors will be written
to this writer.
+
+ :param topic_prefix: If given, each event's topic will be prefixed
+ with this before being sent to configured writers.
"""
routes = [
@@ -101,6 +104,8 @@
# GET /?spec
(r'[/]?', SpecHandler),
]
+
+ self.topic_prefix = topic_prefix
super(EventLoggingService, self).__init__(routes)
@@ -168,6 +173,11 @@
)
validate(event, encapsulate=event.should_encapsulate())
+
+ # If topic_prefix is configured, then prefix the event's topic now.
+ if self.topic_prefix:
+ event.set_topic(self.topic_prefix + event.topic())
+
# Send this processed event to all configured writers
# This will block until each writer finishes writing
# this event.
diff --git a/tests/test_event.py b/tests/test_event.py
index 868f37b..e2ffdf2 100644
--- a/tests/test_event.py
+++ b/tests/test_event.py
@@ -8,20 +8,70 @@
from __future__ import unicode_literals
import copy
+import json
+import tempfile
import unittest
import eventlogging
from eventlogging import get_schema
-
-from .fixtures import SchemaTestMixin
+from eventlogging.event import Event
+from .fixtures import SchemaTestMixin, _event_with_meta
class EventTestCase(SchemaTestMixin, unittest.TestCase):
def setUp(self):
super(EventTestCase, self).setUp()
- def test_meta(self):
+ def test_factory_dict(self):
+ """Test Event.factory() with a dict."""
+ self.assertEqual(
+ Event.factory(_event_with_meta),
+ _event_with_meta
+ )
+
+ def test_factory_list(self):
+ """Test Event.factory() with a list of dicts."""
+ l = [_event_with_meta, _event_with_meta]
+ self.assertEqual(
+ Event.factory(l),
+ l
+ )
+
+ def test_factory_string(self):
+ """Test Event.factory() with a JSON string."""
+ s = json.dumps(_event_with_meta)
+ self.assertEqual(
+ Event.factory(s),
+ _event_with_meta
+ )
+
+ def test_factory_string_list(self):
+ """Test Event.factory() with a list JSON string."""
+ l = [_event_with_meta, _event_with_meta]
+ s = json.dumps(l)
+ self.assertEqual(
+ Event.factory(s),
+ l
+ )
+
+ def test_factory_file(self):
+ """Test Event.factory() with a file object."""
+ l = [_event_with_meta, _event_with_meta]
+ s = json.dumps(l)
+
+ file = tempfile.TemporaryFile(prefix='eventlogging-event-test')
+ # Write the string to the temp file.
+ file.write(s)
+ # Seek to the beginning of file so Event.factory() can read it.
+ file.seek(0)
+ self.assertEqual(
+ Event.factory(file),
+ l
+ )
+
+ def test_meta(self):
+ """Test that meta() returns proper data for both styles of events."""
# EventCapsule fields only
event_capsule = copy.deepcopy(self.event)
del event_capsule['event']
@@ -164,6 +214,24 @@
self.event_with_meta.set_scid(fake_scid)
self.assertEqual(self.event_with_meta['meta']['schema_uri'], 'A/1')
+ def test_set_topic(self):
+ """
+ Test that set_topic() sets topic for capsule events,
+ and meta.topic for meta subobject events.
+ """
+ fake_scid = ('A', 1)
+ self.event.set_topic('fake_topic')
+ self.assertEqual(
+ 'fake_topic',
+ self.event['topic']
+ )
+
+ self.event_with_meta.set_topic('fake_topic')
+ self.assertEqual(
+ 'fake_topic',
+ self.event_with_meta['meta']['topic']
+ )
+
class create_event_errorTestCase(object):
"""
diff --git a/tests/test_service.py b/tests/test_service.py
index b9957a4..b5cf76e 100644
--- a/tests/test_service.py
+++ b/tests/test_service.py
@@ -14,8 +14,12 @@
)
from eventlogging.schema import is_schema_cached
from eventlogging.topic import schema_name_for_topic, TopicNotConfigured
+from eventlogging.event import Event
+from eventlogging.factory import get_writer
import json
+import os
+import tempfile
from .fixtures import SchemaTestMixin
@@ -207,3 +211,76 @@
# but with overwrite True, all should be fine.
append_spec_test_topic_and_schema(overwrite=True)
+
+
+class TestEventLoggingServiceWithFileWriter(SchemaTestMixin,
AsyncHTTPTestCase):
+ """
+ Testing of EventLogging REST produce API actually writing to a temp file.
+ A new temp file will be used for each test, and deleted in tearDown().
+ """
+ def setUp(self):
+ super(TestEventLoggingServiceWithFileWriter, self).setUp()
+
+ def tearDown(self):
+ os.remove(self.temp_file_path)
+
+ def get_app(self):
+ (_, self.temp_file_path) = tempfile.mkstemp(
+ prefix='eventlogging-service-test',
+ text=True,
+ )
+ writers = ['file://' + self.temp_file_path]
+ self.application = EventLoggingService(
+ writers,
+ )
+ return self.application
+
+ def event_from_temp_file(self):
+ """
+ Read the event(s) from the temp_file.
+ """
+ with open(self.temp_file_path, 'r') as f:
+ event = Event.factory(f)
+ return event
+
+ def test_produce_valid_event_configured_topic(self):
+ """
+ Posting a valid event to a configured topic returns 201
+ and is fully produced.
+ """
+ headers = {'Content-type': 'application/json'}
+ body = json.dumps(self.event_with_meta)
+ self.http_client.fetch(self.get_url('/v1/events'),
+ self.stop, method="POST",
+ body=body, headers=headers)
+ response = self.wait()
+ self.assertEqual(201, response.code)
+
+ produced_event = self.event_from_temp_file()
+ self.assertEqual(
+ self.event_with_meta,
+ produced_event
+ )
+
+ def test_produce_valid_event_configured_topic_with_topic_prefix(self):
+ """
+ Posting a valid event with a configured topic_prefix has its
+ meta.topic properly prefixed.
+ """
+ topic_prefix = 'TESTPREFIX.'
+ # Set topic_prefix on our EventloggingService
+ self.application.topic_prefix = topic_prefix
+
+ headers = {'Content-type': 'application/json'}
+ body = json.dumps(self.event_with_meta)
+ self.http_client.fetch(self.get_url('/v1/events'),
+ self.stop, method="POST",
+ body=body, headers=headers)
+ response = self.wait()
+ self.assertEqual(201, response.code)
+
+ produced_event = self.event_from_temp_file()
+ self.assertEqual(
+ topic_prefix + self.event_with_meta.topic(),
+ produced_event.topic()
+ )
--
To view, visit https://gerrit.wikimedia.org/r/278979
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I682fda2abf93733d5ce20a79effd3af153bceb34
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits