Hi Folks,

Below is the code  have for Spark based Kafka Producer to take advantage of
multiple executors reading files in parallel on my cluster but I am stuck
at The program not making any progress.

Below is my scrubbed code:

val sparkConf = new SparkConf().setAppName(applicationName)
val ssc = new StreamingContext(sparkConf, Seconds(2))

val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))

val zipFileDStreams = ssc.textFileStream(inputFiles)
zipFileDStreams.foreachRDD {
  rdd =>
    rdd.foreachPartition(
      partition => {
        partition.foreach{
          case (logLineText) =>
            println(logLineText)
            producerObj.value.send(topics, logLineText)
        }
      }
    )
}

ssc.start()
ssc.awaitTermination()

ssc.stop()

The code for KafkaSink is as follows.

class KafkaSink(createProducer: () => KafkaProducer[Array[Byte],
Array[Byte]]) extends Serializable {

  lazy val producer = createProducer()
  val logParser = new LogParser()

  def send(topic: String, value: String): Unit = {

    val logLineBytes =
Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
logLineBytes))
  }
}

object KafkaSink {
  def apply(config: Properties): KafkaSink = {

    val f = () => {
      val producer = new KafkaProducer[Array[Byte],
Array[Byte]](config, null, null)

      sys.addShutdownHook {
        producer.close()
      }
      producer
    }

    new KafkaSink(f)
  }
}

Disclaimer: it is based on the code inspired by
http://allegro.tech/spark-kafka-integration.html.

The job just sits there I cannot see any Job Stages being created.
Something I want to mention - I I am trying to read gzipped files from HDFS
- could it be that Streaming context is not able to read *.gz files?


I am not sure what more details I can provide to help explain my problem.


-- 
Regards,
Atul Kulkarni

Reply via email to