Hi Folks, Below is the code have for Spark based Kafka Producer to take advantage of multiple executors reading files in parallel on my cluster but I am stuck at The program not making any progress.
Below is my scrubbed code: val sparkConf = new SparkConf().setAppName(applicationName) val ssc = new StreamingContext(sparkConf, Seconds(2)) val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties)) val zipFileDStreams = ssc.textFileStream(inputFiles) zipFileDStreams.foreachRDD { rdd => rdd.foreachPartition( partition => { partition.foreach{ case (logLineText) => println(logLineText) producerObj.value.send(topics, logLineText) } } ) } ssc.start() ssc.awaitTermination() ssc.stop() The code for KafkaSink is as follows. class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], Array[Byte]]) extends Serializable { lazy val producer = createProducer() val logParser = new LogParser() def send(topic: String, value: String): Unit = { val logLineBytes = Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString) producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, logLineBytes)) } } object KafkaSink { def apply(config: Properties): KafkaSink = { val f = () => { val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, null, null) sys.addShutdownHook { producer.close() } producer } new KafkaSink(f) } } Disclaimer: it is based on the code inspired by http://allegro.tech/spark-kafka-integration.html. The job just sits there I cannot see any Job Stages being created. Something I want to mention - I I am trying to read gzipped files from HDFS - could it be that Streaming context is not able to read *.gz files? I am not sure what more details I can provide to help explain my problem. -- Regards, Atul Kulkarni