Ottomata has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/302372 )
Change subject: Make EventloggingService more properly async, handle
kafka-python futures
......................................................................
Make EventloggingService more properly async, handle kafka-python futures
Change-Id: Ifad0d08c7b9c666c7b0ef9f946d5a264079e08f4
---
M eventlogging/service.py
1 file changed, 81 insertions(+), 18 deletions(-)
Approvals:
Ottomata: Verified; Looks good to me, approved
diff --git a/eventlogging/service.py b/eventlogging/service.py
index 4fc8b92..269b64c 100644
--- a/eventlogging/service.py
+++ b/eventlogging/service.py
@@ -17,6 +17,10 @@
# For UnicodeError
from _codecs import * # noqa
import logging
+
+# Used to convert kafka-python futures to tornado futures
+import kafka
+
import os
import socket
import yaml
@@ -151,11 +155,23 @@
self.error_writer = None
def send(self, event):
- """Send the event to configured eventlogging writers."""
+ """
+ Send the event to configured eventlogging writers. If a writer
+ returns a future like object, this method will convert it into
+ a tornado future. A list of these futures will be returned.
+
+ The non tornado futures returned by the eventlogging writer handlers
+ MUST each have a convert_yielded future function
+ registered with @tornado.gen.convert_yielded.register.
+ """
+ futures = []
for uri in self.writers.keys():
w = self.writers[uri]
try:
- w.send(event)
+ future = w.send(event)
+ if future:
+ futures.append(future)
+
# If the writer coroutine has stopped (likley due to
# an error during the previous send()), attempt to
# recreate the writer now.
@@ -166,8 +182,14 @@
)
w = get_writer(uri)
self.writers[uri] = w
- w.send(event)
+ future = w.send(event)
+ if future:
+ futures.append(future)
+ # Return the list of tornado futures.
+ return futures
+
+ @tornado.gen.coroutine
def process_event(self, event):
"""
Validate the event using the schema configured for it's topic.
@@ -213,13 +235,13 @@
validate(event, encapsulate=event.should_encapsulate())
- # Send this processed event to all configured writers
- # This will block until each writer finishes writing
- # this event.
- self.send(event)
- return True
+ # Send this processed event to all configured writers. This will
+ # return a list a futures, the result of which will be yielded to the
+ # caller.
+ yield self.send(event)
- def handle_events(self, events, callback=None):
+ @tornado.gen.coroutine
+ def handle_events(self, events):
"""
Calls process_event on each of the events. Any
errors thrown by process_event will be caught, and EventError
@@ -229,11 +251,15 @@
:param events: list of event dicts
"""
event_errors = []
+
+ # TODO: This loop is serial, and will wait for each
+ # event to be processed before attempting the next one.
+ # Use a tornado.gen.WaitIterator to get around this.
for event in events:
error_message = None
try:
- self.process_event(event)
+ yield self.process_event(event)
except TopicNotConfigured as e:
error_message = str(e)
@@ -284,10 +310,9 @@
if self.error_writer:
self.error_writer.send(event_error)
- if callback:
- callback(event_errors)
- else:
- return event_errors
+ # raise Return will cause the gen.coroutine
+ # to return a future filled with event_errors
+ raise tornado.gen.Return(event_errors)
def start(self):
"""
@@ -337,10 +362,8 @@
if isinstance(events, dict):
events = [events]
- # Process and validate all events.
- event_errors = yield tornado.gen.Task(
- self.application.handle_events, events
- )
+ event_errors = yield self.application.handle_events(events)
+
events_count = len(events)
event_errors_count = len(event_errors)
@@ -388,6 +411,12 @@
self.set_status(response_code, response_text)
if response_body:
self.write(response_body)
+
+ def on_connection_close(self):
+ logging.warn(
+ 'Client closed connection before sending events finished. Body: ',
+ self.request.body
+ )
class TopicConfigHandler(
@@ -499,6 +528,40 @@
return 'application/json; charset=UTF-8'
[email protected]_yielded.register(
+ kafka.producer.future.FutureRecordMetadata
+)
+def convert_kafka_future(kafka_future):
+ """
+ Given a kafka-python FutureRecordMetadata, this will return
+ a tornado Future that will resolve once the kafka_future resolves.
+
+ Registering this with tornado allows gen.coroutines to yield
+ kafka FutureRecordMetadata without worrying about converting types.
+ """
+
+ def kafka_delivery_callback(tf, v):
+ """
+ This callback is fired on produce response from Kafka.
+ tf should be a tornado Future to set either
+ result or exception appropriately.
+ """
+ if isinstance(v, BaseException):
+ tf.set_exception(v)
+ # TODO: This seems correct, but causes
+ # an exception to be thrown from the Future object.
+ # tf.set_exc_info(sys.exc_info())
+ else:
+ tf.set_result(v)
+ logging.debug('Delivered to kafka: %s', v)
+
+ tornado_future = tornado.concurrent.Future()
+ # When kafka's future calls this delivery callback, the
+ # tornado_future will be resolved.
+ kafka_future.add_both(kafka_delivery_callback, tornado_future)
+ return tornado_future
+
+
def append_spec_test_topic_and_schema(overwrite=False):
"""
Augments the topic config and schema cache with a test
--
To view, visit https://gerrit.wikimedia.org/r/302372
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ifad0d08c7b9c666c7b0ef9f946d5a264079e08f4
Gerrit-PatchSet: 3
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits