Hi,

I am reading json messages from kafka . Topics has 2 partitions. When
running streaming job using spark-submit, I could see that * val dataFrame
= sqlContext.read.json(rdd.map(_._2)) *executes indefinitely. Am I doing
something wrong here. Below is code .This environment is cloudera sandbox
env. Same issue in hadoop production cluster mode except that it is
restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
0.10 and  Spark 1.4.

val kafkaParams =
Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
"group.id" -> "xyz","auto.offset.reset"->"smallest")
val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
val ssc = new StreamingContext(conf, Seconds(1))

val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)

val topics = Set("gpp.minf")
val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder,StringDecoder](ssc, kafkaParams, topics)

kafkaStream.foreachRDD(
  rdd => {
    if (rdd.count > 0){
       * val dataFrame = sqlContext.read.json(rdd.map(_._2)) *
       dataFrame.printSchema()
//dataFrame.foreach(println)
}
}

Reply via email to