Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/278979

Change subject: Add --topic-prefix to eventlogging-service
......................................................................

Add --topic-prefix to eventlogging-service

This also adds extra tests and logic to allow for tests
that actually write data to configured writers.

Add a few more tests for testing Event.factory()

Change-Id: I682fda2abf93733d5ce20a79effd3af153bceb34
---
M bin/eventlogging-service
M eventlogging/event.py
M eventlogging/service.py
M tests/test_event.py
M tests/test_service.py
5 files changed, 196 insertions(+), 9 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/eventlogging 
refs/changes/79/278979/1

diff --git a/bin/eventlogging-service b/bin/eventlogging-service
index 6d520aa..6cb83d4 100755
--- a/bin/eventlogging-service
+++ b/bin/eventlogging-service
@@ -47,6 +47,12 @@
 )
 
 ap.add_argument(
+    '--topic-prefix',
+    default=None,
+    help='If given, each event\'s topic will be prefixed with this before '
+    'being sent to configured writers.',
+)
+ap.add_argument(
     '--log-config',
     default=None,
     help='Python logging config file',
@@ -85,6 +91,10 @@
 
     signal.signal(signal.SIGHUP, sighup_handler)
 
-    service = EventLoggingService(args.output, args.error_output)
+    service = EventLoggingService(
+        args.output,
+        args.error_output,
+        args.topic_prefix,
+    )
     # Start listening for requests.
     service.start(args.port, args.num_processes)
diff --git a/eventlogging/event.py b/eventlogging/event.py
index e2ffd49..94e5df5 100644
--- a/eventlogging/event.py
+++ b/eventlogging/event.py
@@ -44,14 +44,21 @@
     @staticmethod
     def factory(data):
         """
-        Given a JSON string, dict, or list of dicts,
+        Given an open file, JSON string, dict, or list of dicts,
         this function will convert it to the corresponding
         Event(s).  That is, a JSON string dict or a dict
         will be returned as an Event, and a JSON string list
         or a list will be assumed to be a list of dicts
-        and will be returned as a list of Events.
+        and will be returned as a list of Events.  If a file
+        object is given, its contents will be read.  The contents
+        are assumed to be valid JSON.
         """
-        # If we are given a string, assume it is JSON.
+        # Else if data has a read method, assume it is an open file object.
+        # Read the data out of it.
+        if hasattr(data, 'read'):
+            data = data.read()
+
+        # If we've got a string, assume it is JSON.
         if isinstance(data, string_types) or isinstance(data, bytes):
             data = json.loads(data.decode('utf-8'))
 
@@ -90,7 +97,7 @@
         """
         Returns the topic for this event.
 
-        This will return the topic to use fopr this event.
+        This will return the topic to use for this event.
         If topic_format is None, then the value of 'topic' in the event
         meta data will be used.
         Otherwise the event wil be formatted with topic_format.
@@ -205,6 +212,11 @@
         return not self.has_meta_subobject()
 
     def set_scid(self, scid):
+        """
+        Sets the scid for this event.
+        - schema_uri if meta subject style
+        - schema and revision if EventCapsule style
+        """
         if self.has_meta_subobject():
             # Set schema uri to a simple name/revision uri.
             self['meta']['schema_uri'] = scid[0] + '/' + str(scid[1])
@@ -212,6 +224,16 @@
             self['schema'] = scid[0]
             self['revision'] = scid[1]
 
+    def set_topic(self, topic):
+        """
+        Sets the topic for this event.
+        """
+        if self.has_meta_subobject():
+            # Set schema uri to a simple name/revision uri.
+            self['meta']['topic'] = topic
+        else:
+            self['topic'] = topic
+
 
 def create_event_error(
     raw_event,
diff --git a/eventlogging/service.py b/eventlogging/service.py
index 50c49eb..fd1b1c7 100644
--- a/eventlogging/service.py
+++ b/eventlogging/service.py
@@ -79,7 +79,7 @@
     until your event is ACKed by Kafka.
     """
 
-    def __init__(self, writer_uris, error_writer_uri=None):
+    def __init__(self, writer_uris, error_writer_uri=None, topic_prefix=None):
         """
         Note: you should call init_schemas_and_topic_config()
         before you instantiate an EventLoggingService.
@@ -89,6 +89,9 @@
 
         :param error_writer_uri: If configured, EventErrors will be written
         to this writer.
+
+        :param topic_prefix: If given, each event's topic will be prefixed
+        with this before being sent to configured writers.
         """
 
         routes = [
@@ -101,6 +104,8 @@
             # GET /?spec
             (r'[/]?', SpecHandler),
         ]
+
+        self.topic_prefix = topic_prefix
 
         super(EventLoggingService, self).__init__(routes)
 
@@ -168,6 +173,11 @@
                 )
 
         validate(event, encapsulate=event.should_encapsulate())
+
+        # If topic_prefix is configured, then prefix the event's topic now.
+        if self.topic_prefix:
+            event.set_topic(self.topic_prefix + event.topic())
+
         # Send this processed event to all configured writers
         # This will block until each writer finishes writing
         # this event.
diff --git a/tests/test_event.py b/tests/test_event.py
index 868f37b..e2ffdf2 100644
--- a/tests/test_event.py
+++ b/tests/test_event.py
@@ -8,20 +8,70 @@
 from __future__ import unicode_literals
 
 import copy
+import json
+import tempfile
 import unittest
 
 import eventlogging
 from eventlogging import get_schema
-
-from .fixtures import SchemaTestMixin
+from eventlogging.event import Event
+from .fixtures import SchemaTestMixin, _event_with_meta
 
 
 class EventTestCase(SchemaTestMixin, unittest.TestCase):
     def setUp(self):
         super(EventTestCase, self).setUp()
 
-    def test_meta(self):
 
+    def test_factory_dict(self):
+        """Test Event.factory() with a dict."""
+        self.assertEqual(
+            Event.factory(_event_with_meta),
+            _event_with_meta
+        )
+
+    def test_factory_list(self):
+        """Test Event.factory() with a list of dicts."""
+        l = [_event_with_meta, _event_with_meta]
+        self.assertEqual(
+            Event.factory(l),
+            l
+        )
+
+    def test_factory_string(self):
+        """Test Event.factory() with a JSON string."""
+        s = json.dumps(_event_with_meta)
+        self.assertEqual(
+            Event.factory(s),
+            _event_with_meta
+        )
+
+    def test_factory_string_list(self):
+        """Test Event.factory() with a list JSON string."""
+        l = [_event_with_meta, _event_with_meta]
+        s = json.dumps(l)
+        self.assertEqual(
+            Event.factory(s),
+            l
+        )
+
+    def test_factory_file(self):
+        """Test Event.factory() with a file object."""
+        l = [_event_with_meta, _event_with_meta]
+        s = json.dumps(l)
+
+        file = tempfile.TemporaryFile(prefix='eventlogging-event-test')
+        # Write the string to the temp file.
+        file.write(s)
+        # Seek to the beginning of file so Event.factory() can read it.
+        file.seek(0)
+        self.assertEqual(
+            Event.factory(file),
+            l
+        )
+
+    def test_meta(self):
+        """Test that meta() returns proper data for both styles of events."""
         # EventCapsule fields only
         event_capsule = copy.deepcopy(self.event)
         del event_capsule['event']
@@ -164,6 +214,24 @@
         self.event_with_meta.set_scid(fake_scid)
         self.assertEqual(self.event_with_meta['meta']['schema_uri'], 'A/1')
 
+    def test_set_topic(self):
+        """
+        Test that set_topic() sets topic for capsule events,
+        and meta.topic for meta subobject events.
+        """
+        fake_scid = ('A', 1)
+        self.event.set_topic('fake_topic')
+        self.assertEqual(
+            'fake_topic',
+            self.event['topic']
+        )
+
+        self.event_with_meta.set_topic('fake_topic')
+        self.assertEqual(
+            'fake_topic',
+            self.event_with_meta['meta']['topic']
+        )
+
 
 class create_event_errorTestCase(object):
     """
diff --git a/tests/test_service.py b/tests/test_service.py
index b9957a4..b5cf76e 100644
--- a/tests/test_service.py
+++ b/tests/test_service.py
@@ -14,8 +14,12 @@
 )
 from eventlogging.schema import is_schema_cached
 from eventlogging.topic import schema_name_for_topic, TopicNotConfigured
+from eventlogging.event import Event
+from eventlogging.factory import get_writer
 
 import json
+import os
+import tempfile
 from .fixtures import SchemaTestMixin
 
 
@@ -207,3 +211,76 @@
 
         # but with overwrite True, all should be fine.
         append_spec_test_topic_and_schema(overwrite=True)
+
+
+class TestEventLoggingServiceWithFileWriter(SchemaTestMixin, 
AsyncHTTPTestCase):
+    """
+    Testing of EventLogging REST produce API actually writing to a temp file.
+    A new temp file will be used for each test, and deleted in tearDown().
+    """
+    def setUp(self):
+        super(TestEventLoggingServiceWithFileWriter, self).setUp()
+
+    def tearDown(self):
+        os.remove(self.temp_file_path)
+
+    def get_app(self):
+        (_, self.temp_file_path) = tempfile.mkstemp(
+            prefix='eventlogging-service-test',
+            text=True,
+        )
+        writers = ['file://' + self.temp_file_path]
+        self.application = EventLoggingService(
+            writers,
+        )
+        return self.application
+
+    def event_from_temp_file(self):
+        """
+        Read the event(s) from the temp_file.
+        """
+        with open(self.temp_file_path, 'r') as f:
+            event = Event.factory(f)
+        return event
+
+    def test_produce_valid_event_configured_topic(self):
+        """
+        Posting a valid event to a configured topic returns 201
+        and is fully produced.
+        """
+        headers = {'Content-type': 'application/json'}
+        body = json.dumps(self.event_with_meta)
+        self.http_client.fetch(self.get_url('/v1/events'),
+                               self.stop, method="POST",
+                               body=body, headers=headers)
+        response = self.wait()
+        self.assertEqual(201, response.code)
+
+        produced_event = self.event_from_temp_file()
+        self.assertEqual(
+            self.event_with_meta,
+            produced_event
+        )
+
+    def test_produce_valid_event_configured_topic_with_topic_prefix(self):
+        """
+        Posting a valid event with a configured topic_prefix has its
+        meta.topic properly prefixed.
+        """
+        topic_prefix = 'TESTPREFIX.'
+        # Set topic_prefix on our EventloggingService
+        self.application.topic_prefix = topic_prefix
+
+        headers = {'Content-type': 'application/json'}
+        body = json.dumps(self.event_with_meta)
+        self.http_client.fetch(self.get_url('/v1/events'),
+                               self.stop, method="POST",
+                               body=body, headers=headers)
+        response = self.wait()
+        self.assertEqual(201, response.code)
+
+        produced_event = self.event_from_temp_file()
+        self.assertEqual(
+            topic_prefix + self.event_with_meta.topic(),
+            produced_event.topic()
+        )

-- 
To view, visit https://gerrit.wikimedia.org/r/278979
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I682fda2abf93733d5ce20a79effd3af153bceb34
Gerrit-PatchSet: 1
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to