Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/302372 )

Change subject: Make EventloggingService more properly async, handle 
kafka-python futures
......................................................................


Make EventloggingService more properly async, handle kafka-python futures

Change-Id: Ifad0d08c7b9c666c7b0ef9f946d5a264079e08f4
---
M eventlogging/service.py
1 file changed, 81 insertions(+), 18 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/eventlogging/service.py b/eventlogging/service.py
index 4fc8b92..269b64c 100644
--- a/eventlogging/service.py
+++ b/eventlogging/service.py
@@ -17,6 +17,10 @@
 # For UnicodeError
 from _codecs import *  # noqa
 import logging
+
+# Used to convert kafka-python futures to tornado futures
+import kafka
+
 import os
 import socket
 import yaml
@@ -151,11 +155,23 @@
             self.error_writer = None
 
     def send(self, event):
-        """Send the event to configured eventlogging writers."""
+        """
+        Send the event to configured eventlogging writers. If a writer
+        returns a future like object, this method will convert it into
+        a tornado future.  A list of these futures will be returned.
+
+        The non tornado futures returned by the eventlogging writer handlers
+        MUST each have a convert_yielded future function
+        registered with @tornado.gen.convert_yielded.register.
+        """
+        futures = []
         for uri in self.writers.keys():
             w = self.writers[uri]
             try:
-                w.send(event)
+                future = w.send(event)
+                if future:
+                    futures.append(future)
+
             # If the writer coroutine has stopped (likley due to
             # an error during the previous send()), attempt to
             # recreate the writer now.
@@ -166,8 +182,14 @@
                 )
                 w = get_writer(uri)
                 self.writers[uri] = w
-                w.send(event)
+                future = w.send(event)
+                if future:
+                    futures.append(future)
 
+        # Return the list of tornado futures.
+        return futures
+
+    @tornado.gen.coroutine
     def process_event(self, event):
         """
         Validate the event using the schema configured for it's topic.
@@ -213,13 +235,13 @@
 
         validate(event, encapsulate=event.should_encapsulate())
 
-        # Send this processed event to all configured writers
-        # This will block until each writer finishes writing
-        # this event.
-        self.send(event)
-        return True
+        # Send this processed event to all configured writers.  This will
+        # return a list a futures, the result of which will be yielded to the
+        # caller.
+        yield self.send(event)
 
-    def handle_events(self, events, callback=None):
+    @tornado.gen.coroutine
+    def handle_events(self, events):
         """
         Calls process_event on each of the events.  Any
         errors thrown by process_event will be caught, and EventError
@@ -229,11 +251,15 @@
         :param events: list of event dicts
         """
         event_errors = []
+
+        # TODO: This loop is serial, and will wait for each
+        # event to be processed before attempting the next one.
+        # Use a tornado.gen.WaitIterator to get around this.
         for event in events:
             error_message = None
 
             try:
-                self.process_event(event)
+                yield self.process_event(event)
 
             except TopicNotConfigured as e:
                 error_message = str(e)
@@ -284,10 +310,9 @@
                     if self.error_writer:
                         self.error_writer.send(event_error)
 
-        if callback:
-            callback(event_errors)
-        else:
-            return event_errors
+        # raise Return will cause the gen.coroutine
+        # to return a future filled with event_errors
+        raise tornado.gen.Return(event_errors)
 
     def start(self):
         """
@@ -337,10 +362,8 @@
                     if isinstance(events, dict):
                         events = [events]
 
-                    # Process and validate all events.
-                    event_errors = yield tornado.gen.Task(
-                        self.application.handle_events, events
-                    )
+                    event_errors = yield self.application.handle_events(events)
+
                     events_count = len(events)
                     event_errors_count = len(event_errors)
 
@@ -388,6 +411,12 @@
         self.set_status(response_code, response_text)
         if response_body:
             self.write(response_body)
+
+    def on_connection_close(self):
+        logging.warn(
+            'Client closed connection before sending events finished. Body: ',
+            self.request.body
+        )
 
 
 class TopicConfigHandler(
@@ -499,6 +528,40 @@
         return 'application/json; charset=UTF-8'
 
 
[email protected]_yielded.register(
+    kafka.producer.future.FutureRecordMetadata
+)
+def convert_kafka_future(kafka_future):
+    """
+    Given a kafka-python FutureRecordMetadata, this will return
+    a tornado Future that will resolve once the kafka_future resolves.
+
+    Registering this with tornado allows gen.coroutines to yield
+    kafka FutureRecordMetadata without worrying about converting types.
+    """
+
+    def kafka_delivery_callback(tf, v):
+        """
+        This callback is fired on produce response from Kafka.
+        tf should be a tornado Future to set either
+        result or exception appropriately.
+        """
+        if isinstance(v, BaseException):
+            tf.set_exception(v)
+            # TODO: This seems correct, but causes
+            # an exception to be thrown from the Future object.
+            # tf.set_exc_info(sys.exc_info())
+        else:
+            tf.set_result(v)
+            logging.debug('Delivered to kafka: %s', v)
+
+    tornado_future = tornado.concurrent.Future()
+    # When kafka's future calls this delivery callback, the
+    # tornado_future will be resolved.
+    kafka_future.add_both(kafka_delivery_callback, tornado_future)
+    return tornado_future
+
+
 def append_spec_test_topic_and_schema(overwrite=False):
     """
     Augments the topic config and schema cache with a test

-- 
To view, visit https://gerrit.wikimedia.org/r/302372
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ifad0d08c7b9c666c7b0ef9f946d5a264079e08f4
Gerrit-PatchSet: 3
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to