DCausse has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/367394 )

Change subject: Fix some issues when collecting with kafka
......................................................................

Fix some issues when collecting with kafka

On the client only fetch start offsets if the topic exists.
On the daemon force a seek to the end, auto_offset_reset does not seem to work
properly, seek_to_end does seem to be needed otherwize we always return start
offsets.

Change-Id: I504c6045224582fd8a9c27baf03a6bf38acf5e36
---
M docs/running-in-analytics.rst
M mjolnir/kafka/client.py
M mjolnir/kafka/daemon.py
3 files changed, 27 insertions(+), 5 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/94/367394/1

diff --git a/docs/running-in-analytics.rst b/docs/running-in-analytics.rst
index 98d0f29..bea481b 100644
--- a/docs/running-in-analytics.rst
+++ b/docs/running-in-analytics.rst
@@ -51,6 +51,15 @@
 * MjoLniR requires a jar built in the /jvm directory of this repository. This 
is a fat jar
   containing other dependencies, such as xgboost and kafka streaming.
 
+* Some conflict with dependencies installed in the analytics cluster may cause 
kafka streaming
+  to fail with `kafka.cluster.BrokerEndPoint cannot be cast to 
kafka.cluster.broker`.
+  This is probably due to `kafka 0.9` client jar being imported with flume. 
Workround
+  is to copy wmf spark conf:
+  `cp -r /etc/spark/conf.analytics-hadoop my_spark_conf`
+  then comment the flume dependency in spark-env.sh:
+  `#SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/usr/lib/flume-ng/lib/*"`
+  and run spark with SPARK_CONF_DIR set to your custom folder.
+
 Spark gotchas
 =============
 
diff --git a/mjolnir/kafka/client.py b/mjolnir/kafka/client.py
index 83ed886..cb803fb 100644
--- a/mjolnir/kafka/client.py
+++ b/mjolnir/kafka/client.py
@@ -89,7 +89,10 @@
     list of int
     """
     consumer = kafka.KafkaConsumer(bootstrap_servers=brokers)
-    partitions = [kafka.TopicPartition(topic, p) for p in 
consumer.partitions_for_topic(topic)]
+    parts = consumer.partitions_for_topic(topic)
+    if parts is None:
+        return None
+    partitions = [kafka.TopicPartition(topic, p) for p in parts]
     consumer.assign(partitions)
     return [consumer.position(p) for p in partitions]
 
@@ -124,6 +127,10 @@
                                    # sure we don't miss one start at the 
beginning.
                                    auto_offset_reset='earliest',
                                    value_deserializer=json.loads)
+    parts = consumer.partitions_for_topic(topic=mjolnir.kafka.TOPIC_COMPLETE)
+    if parts is None:
+        raise RuntimeError("topic %s missing" % topic)
+
     partitions = [kafka.TopicPartition(topic, p) for p in 
consumer.partitions_for_topic(topic)]
     consumer.assign(partitions)
     # Tracks the maximum reported offset in the response topic
@@ -151,7 +158,7 @@
     Parameters
     ----------
     sc : pyspark.SparkContext
-    brokers : list of str
+    brokers : string
     receive_record : callable
         Callable receiving a json decoded record from kafka. It must return
         either an empty list on error, or a 3 item tuple containing
@@ -170,11 +177,16 @@
     """
 
     offset_ranges = []
-    # If this ends up being too much data from kafka, blowing up memory in the
-    # spark executors, we could chunk the offsets and union together multiple 
RDD's.
+    if offsets_start is None:
+        offsets_start = get_offset_start(brokers, mjolnir.kafka.TOPIC_RESULT)
+
+    if offsets_start is None:
+        raise RuntimeError( "Cannot fetch offset_start, topic %s should have 
been created" % topic )
     for partition, (start, end) in enumerate(zip(offsets_start, offsets_end)):
         offset_ranges.append(OffsetRange(mjolnir.kafka.TOPIC_RESULT, 
partition, start, end))
-    kafka_params = {"metadata.broker.list": ','.join(brokers)}
+    kafka_params = {"metadata.broker.list": brokers}
+    # If this ends up being too much data from kafka, blowing up memory in the
+    # spark executors, we could chunk the offsets and union together multiple 
RDD's.
     return (
         KafkaUtils.createRDD(sc, kafka_params, offset_ranges)
         .map(lambda (k, v): json.loads(v))
diff --git a/mjolnir/kafka/daemon.py b/mjolnir/kafka/daemon.py
index 0beaca6..2256bca 100644
--- a/mjolnir/kafka/daemon.py
+++ b/mjolnir/kafka/daemon.py
@@ -129,6 +129,7 @@
         partitions = [kafka.TopicPartition(self.topic_result, p)
                       for p in 
consumer.partitions_for_topic(self.topic_result)]
         consumer.assign(partitions)
+        consumer.seek_to_end(partitions)
         offsets = [consumer.position(tp) for tp in partitions]
         consumer.close()
         return offsets

-- 
To view, visit https://gerrit.wikimedia.org/r/367394
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I504c6045224582fd8a9c27baf03a6bf38acf5e36
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: DCausse <dcau...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to