Hi, I am trying to consume from Kafka topics following http://spark.apache.org/docs/latest/streaming-kafka-integration.html Approach one(createStream). I am not able to write it to local text file using saveAsTextFiles() function. Below is the code
import pyspark from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext from pyspark.streaming.kafka import KafkaUtils from pyspark.streaming import StreamingContext ssc = StreamingContext(sc, 1) zkQuorum, topic = 'localhost:9092', 'python-kafka' kafka_stream = KafkaUtils.createStream(ssc, zkQuorum,None, {topic: 1}) lines = kafka_stream.map(lambda x: x[1]) kafka_stream.saveAsTextFiles('file:///home/puneett/') When I access the consumer I get following output [puneett@gb-slo-svb-0255 ~]$ /nfs/science/shared/kafka/kafka/bin/kafka-console-consumer.sh --topic python-kafka --property schema.registry.url="http://localhost:9092" --zookeeper localhost:2182 --from-beginning SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/core/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/nfs/science/shared/kafka/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] There are 10000 more similar kafka test message produced There are 10000 more similar kafka test message produced There are 10000 more similar kafka test message produced There are 10000 more similar kafka test message produced There are 10000 more similar kafka test message produced There are 10000 more similar kafka test message produced There are 10000 more similar kafka test message produced There Please can someone suggest what am I doing wrong? Regards, Puneet dunnhumby limited is a limited company registered in England and Wales with registered number 02388853 and VAT registered number 927 5871 83. Our registered office is at Aurora House, 71-75 Uxbridge Road, London W5 5SL. The contents of this message and any attachments to it are confidential and may be legally privileged. If you have received this message in error you should delete it from your system immediately and advise the sender. dunnhumby may monitor and record all emails. The views expressed in this email are those of the sender and not those of dunnhumby.