DCausse has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/367394 )
Change subject: Fix some issues when collecting with kafka ...................................................................... Fix some issues when collecting with kafka On the client only fetch start offsets if the topic exists. On the daemon force a seek to the end, auto_offset_reset does not seem to work properly, seek_to_end does seem to be needed otherwize we always return start offsets. Change-Id: I504c6045224582fd8a9c27baf03a6bf38acf5e36 --- M docs/running-in-analytics.rst M mjolnir/kafka/client.py M mjolnir/kafka/daemon.py 3 files changed, 27 insertions(+), 5 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/94/367394/1 diff --git a/docs/running-in-analytics.rst b/docs/running-in-analytics.rst index 98d0f29..bea481b 100644 --- a/docs/running-in-analytics.rst +++ b/docs/running-in-analytics.rst @@ -51,6 +51,15 @@ * MjoLniR requires a jar built in the /jvm directory of this repository. This is a fat jar containing other dependencies, such as xgboost and kafka streaming. +* Some conflict with dependencies installed in the analytics cluster may cause kafka streaming + to fail with `kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.broker`. + This is probably due to `kafka 0.9` client jar being imported with flume. Workround + is to copy wmf spark conf: + `cp -r /etc/spark/conf.analytics-hadoop my_spark_conf` + then comment the flume dependency in spark-env.sh: + `#SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/usr/lib/flume-ng/lib/*"` + and run spark with SPARK_CONF_DIR set to your custom folder. + Spark gotchas ============= diff --git a/mjolnir/kafka/client.py b/mjolnir/kafka/client.py index 83ed886..cb803fb 100644 --- a/mjolnir/kafka/client.py +++ b/mjolnir/kafka/client.py @@ -89,7 +89,10 @@ list of int """ consumer = kafka.KafkaConsumer(bootstrap_servers=brokers) - partitions = [kafka.TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)] + parts = consumer.partitions_for_topic(topic) + if parts is None: + return None + partitions = [kafka.TopicPartition(topic, p) for p in parts] consumer.assign(partitions) return [consumer.position(p) for p in partitions] @@ -124,6 +127,10 @@ # sure we don't miss one start at the beginning. auto_offset_reset='earliest', value_deserializer=json.loads) + parts = consumer.partitions_for_topic(topic=mjolnir.kafka.TOPIC_COMPLETE) + if parts is None: + raise RuntimeError("topic %s missing" % topic) + partitions = [kafka.TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)] consumer.assign(partitions) # Tracks the maximum reported offset in the response topic @@ -151,7 +158,7 @@ Parameters ---------- sc : pyspark.SparkContext - brokers : list of str + brokers : string receive_record : callable Callable receiving a json decoded record from kafka. It must return either an empty list on error, or a 3 item tuple containing @@ -170,11 +177,16 @@ """ offset_ranges = [] - # If this ends up being too much data from kafka, blowing up memory in the - # spark executors, we could chunk the offsets and union together multiple RDD's. + if offsets_start is None: + offsets_start = get_offset_start(brokers, mjolnir.kafka.TOPIC_RESULT) + + if offsets_start is None: + raise RuntimeError( "Cannot fetch offset_start, topic %s should have been created" % topic ) for partition, (start, end) in enumerate(zip(offsets_start, offsets_end)): offset_ranges.append(OffsetRange(mjolnir.kafka.TOPIC_RESULT, partition, start, end)) - kafka_params = {"metadata.broker.list": ','.join(brokers)} + kafka_params = {"metadata.broker.list": brokers} + # If this ends up being too much data from kafka, blowing up memory in the + # spark executors, we could chunk the offsets and union together multiple RDD's. return ( KafkaUtils.createRDD(sc, kafka_params, offset_ranges) .map(lambda (k, v): json.loads(v)) diff --git a/mjolnir/kafka/daemon.py b/mjolnir/kafka/daemon.py index 0beaca6..2256bca 100644 --- a/mjolnir/kafka/daemon.py +++ b/mjolnir/kafka/daemon.py @@ -129,6 +129,7 @@ partitions = [kafka.TopicPartition(self.topic_result, p) for p in consumer.partitions_for_topic(self.topic_result)] consumer.assign(partitions) + consumer.seek_to_end(partitions) offsets = [consumer.position(tp) for tp in partitions] consumer.close() return offsets -- To view, visit https://gerrit.wikimedia.org/r/367394 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I504c6045224582fd8a9c27baf03a6bf38acf5e36 Gerrit-PatchSet: 1 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: DCausse <dcau...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits