Ottomata has submitted this change and it was merged.

Change subject: Use confluent-kafka python instead of pykafka for consumption
......................................................................


Use confluent-kafka python instead of pykafka for consumption

This does not change the producer, it still uses kafka-python 0.9.4.
I have tested confluent-kafka synchronous producer throughput, and it seems 
comparable
to kafka-python 0.9.4.  We could unify client libs and swap the producer client
in another change later.

Bug: T133779
Change-Id: I5df1e5dacd94903c05e1c485124a75e898945102
---
M eventlogging/factory.py
M eventlogging/handlers.py
M requirements.txt
M tests/test_factory.py
4 files changed, 152 insertions(+), 45 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/eventlogging/factory.py b/eventlogging/factory.py
index 43d7ac7..f169834 100644
--- a/eventlogging/factory.py
+++ b/eventlogging/factory.py
@@ -6,6 +6,7 @@
   This module implements a factory-like map of URI scheme handlers.
 
 """
+import ast
 import contextlib
 import inspect
 
@@ -34,7 +35,7 @@
         'false': False
     }.get(v.lower(), v)
 
-    # Else try to convert v to an int or float
+    # Else try to convert v to an int or float or array or dict.
     if type(v) is not bool:
         try:
             v = int(v)
@@ -42,7 +43,14 @@
             try:
                 v = float(v)
             except ValueError:
-                pass
+                # If this looks like it could be an array or a dict, then try
+                # to parse it with ast.literal_eval.
+                if v.startswith('[') or v.startswith('{'):
+                    try:
+                        v = ast.literal_eval(v)
+                    except ValueError:
+                        pass
+
     return v
 
 
diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index c9e5354..cd9564d 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -446,67 +446,158 @@
 @reads('kafka')
 def kafka_reader(
     path,
-    topic='eventlogging',
-    identity='',
+    topics=None,
+    topic=None,  # deprecated
+    identity=None,
     raw=False,
-    **kafka_consumer_args
+    poll_timeout=1.0,
+    **kwargs
 ):
     """
     Reads events from Kafka.
 
     Kafka URIs look like:
-    kafka:///b1:9092,b2:9092?topic=topic_name&identity=consumer_group_name&
-    auto_commit_enable=True&auto_commit_interval_ms=1000...
+    kafka:///b1:9092,b2:9092?topics=topic1,topic2&identity=consumer_group&
+    &auto.commit.interval.ms=1000...
 
-    This reader uses the pykafka BalancedConsumer.  You may pass
-    any configs that BalancedConsumer takes as keyword arguments via
-    the kafka URI query params.
+    This uses the Consumer from the librdkafka backed confluent-kafka
+    python library.  You may pass any configs that the librdkafka Consumer
+    take as keyword arguments via URI query params.
 
-    The auto_commit_interval_ms is by default 60 seconds. This is pretty high
-    and may lead to more duplicate message consumption (Kafka has at atleast
-    once message delivery guarantee). Lowering this(to 1 second?) makes sure
-    that there aren't as many duplicates, but incurs the overhead of committing
-    offsets to zookeeper more often.
+    auto.commit.interval.ms is by default 5 seconds.
 
-    If auto_commit_enable is True, then messages will be marked as done based
-    on the auto_commit_interval_ms time period.
+    If enable.auto.commit is True (the default), then messages will be marked
+    as done based on the auto.commit.interval.ms time period.
     This has the downside of committing message offsets before
     work might be actually complete.  E.g. if inserting into MySQL, and
     the process dies somewhere along the way, it is possible
     that message offsets will be committed to Kafka for messages
     that have not been inserted into MySQL.  Future work
     will have to fix this problem somehow.  Perhaps a callback?
+
+    The 'topic' parameter is provided for backwards compatibility.
+    It will be used if topics is not given.
+
+    Arguments:
+        *path (str): Comma separated list of broker hostname:ports.
+
+        *topics (list): List of topics to subscribe to.
+
+        *topic (str): Deprecated topic to subscribe to.  Use topics instead.
+            Ignored if topics is provided.
+
+        *identity (str): Used as the Kafka consumer group.id, and the prefix
+            of the Kafka client.id.  If not given, a new unique identity will
+            be created.
+
+        *raw (bool): If True, the generator returned will yield a stream of
+            strings, else a stream of Events.  Default: False.
+
+        *poll_timeout (float) Timeout in seconds to use for call to
+            consumer.poll().  poll will only block for this long
+            if there are no messages.  Default: 1.0.
     """
-    from pykafka import KafkaClient as PyKafkaClient
-    from pykafka import BalancedConsumer
+    if not topics and not topic:
+        raise ValueError(
+            'Cannot consume from Kafka without providing topics.'
+        )
 
-    # The identity param is used to define the consumer group name.
-    # If identity is empty create a default unique one. This ensures we don't
-    # accidentally put consumers to the same group. Explicitly specify identity
-    # to launch consumers in the same consumer group
-    identity = identity if identity else 'eventlogging-' + str(uuid.uuid1())
+    from confluent_kafka import Consumer, KafkaError
+    import signal
 
-    # Brokers should be in the uri path
-    # path.strip returns type 'unicode' and pykafka expects a string, so
-    # converting unicode to str
-    brokers = path.strip('/').encode('ascii', 'ignore')
+    # Use topics as an array if given, else just use topic
+    topics = topics.split(',') if topics else [topic]
 
-    # remove non KafkaConsumer args from kafka_consumer_args
-    kafka_consumer_args = {
-        k: v for k, v in items(kafka_consumer_args)
-        if k in inspect.getargspec(BalancedConsumer.__init__).args
-    }
+    # identity is used to define the consumer group.id and the prefix of
+    # the client.id. If identity is not given create a default unique
+    # one. This ensures we don't accidentally put consumers to the same group.
+    # Explicitly specify identity to launch consumers in the same consumer
+    # group.
+    group_id = identity if identity else 'eventlogging-' + str(uuid.uuid1())
+    client_id = '{0}-{1}.{2}'.format(group_id, socket.getfqdn(), os.getpid())
 
-    kafka_client = PyKafkaClient(hosts=brokers)
-    kafka_topic = kafka_client.topics[topic.encode('ascii', 'ignore')]
+    # Remove anything that we know is not going to be a valid
+    # Kafka Consumer parameter from kwargs and then set some required
+    # configs.
+    eventlogging_keys = ('port', 'hostname', 'uri')
+    kafka_args = {k: kwargs[k] for k in kwargs if k not in eventlogging_keys}
+    kafka_args['bootstrap.servers'] = path.strip('/')
+    kafka_args['group.id'] = group_id
+    kafka_args['client.id'] = client_id
 
-    consumer = kafka_topic.get_balanced_consumer(
-        consumer_group=identity.encode('ascii', 'ignore'),
-        **kafka_consumer_args)
+    kafka_consumer = Consumer(**kafka_args)
 
-    # Define a generator to read from the BalancedConsumer instance
-    def message_stream(consumer):
-        while True:
-            yield consumer.consume()
+    logging.info(
+        'Consuming topics %s from Kafka in group %s as %s',
+        topics, group_id, client_id
+    )
 
-    return stream((message.value for message in message_stream(consumer)), raw)
+    # Callback for logging during consumer rebalances
+    def log_assign(consumer, partitions):
+        logging.info('Partition assignment change for %s. Now consuming '
+                     'from %s partitions: %s',
+                     client_id, len(partitions), partitions)
+
+    # Subscribe to list of topics.
+    kafka_consumer.subscribe(topics, on_assign=log_assign)
+
+    # Define a generator to read from the Consumer instance.
+    def consume(consumer, timeout=1.0):
+        # Make sure we close the consumer on SIGTERM.
+        # SIGINT should be caught by the finally in consume().
+        def shutdown_handler(_signo, _stack_frame):
+            logging.info('Caught SIGTERM, closing KafkaConsumer %s '
+                         'to commit outstanding offsets.', client_id)
+            consumer.close()
+            sys.exit(0)
+        signal.signal(signal.SIGTERM, shutdown_handler)
+
+        # Wrap the poll loop in a try/finally.
+        try:
+            while True:
+                # Poll for messages
+                message = consumer.poll(timeout=timeout)
+
+                # If no message was found in timeout, poll again.
+                if not message:
+                    continue
+
+                # Else if we encountered a KafkaError, log and continue.
+                elif message.error():
+                    # _PARTITION_EOF is pretty normal, just log at debug
+                    if message.error().code() == KafkaError._PARTITION_EOF:
+                        logging.debug(
+                            'KafkaConsumer %s consuming %s [%d] '
+                            'reached end at offset %d\n' % (
+                                client_id,
+                                message.topic(),
+                                message.partition(),
+                                message.offset()
+                            )
+                        )
+                    # Else this is a real KafkaError, log at error.
+                    else:
+                        logging.error(message.error())
+
+                # Else we got a proper message, yield it.
+                else:
+                    yield message.value()
+        except BaseException as e:
+            error_message = 'Exception while KafkaConsumer %s consuming' % (
+                client_id
+            )
+            # Add more info if message is defined.
+            if message:
+                error_message += ' from %s [%s] at offset %s' % (
+                    message.topic(), message.partition(), message.offset(),
+                )
+            logging.error(error_message)
+            if (type(e) != KeyboardInterrupt):
+                raise(e)
+        finally:
+            logging.info('Finally closing KafkaConsumer %s '
+                         'to commit outstanding offsets.', client_id)
+            consumer.close()
+
+    # Return a stream of message values.
+    return stream(consume(kafka_consumer, poll_timeout), raw)
diff --git a/requirements.txt b/requirements.txt
index 49ec4fd..3938324 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,9 +1,9 @@
 python-dateutil>=1.5
 jsonschema>=0.7
-kafka-python>=0.9.4
+kafka-python==0.9.4
 mysqlclient>=1.3.7
 pygments>=1.5
-pykafka>=2.1.0
+confluent-kafka>=0.9.1.2
 PyYAML>=3.10
 pyzmq>=2.1
 sqlalchemy>=0.7
diff --git a/tests/test_factory.py b/tests/test_factory.py
index c9e9ce5..b78e4f0 100644
--- a/tests/test_factory.py
+++ b/tests/test_factory.py
@@ -84,3 +84,11 @@
             True,
             eventlogging.factory.cast_string("True")
         )
+        self.assertEqual(
+            ['string', 10],
+            eventlogging.factory.cast_string('["string", 10]')
+        )
+        self.assertEqual(
+            {'string': 10},
+            eventlogging.factory.cast_string('{"string": 10}')
+        )

-- 
To view, visit https://gerrit.wikimedia.org/r/292755
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I5df1e5dacd94903c05e1c485124a75e898945102
Gerrit-PatchSet: 8
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Madhuvishy <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to