Hi, I am reading json messages from kafka . Topics has 2 partitions. When running streaming job using spark-submit, I could see that * val dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely. Am I doing something wrong here. Below is code .This environment is cloudera sandbox env. Same issue in hadoop production cluster mode except that it is restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka 0.10 and Spark 1.4.
val kafkaParams = Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", "group.id" -> "xyz","auto.offset.reset"->"smallest") val conf = new SparkConf().setMaster("local[3]").setAppName("topic") val ssc = new StreamingContext(conf, Seconds(1)) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) val topics = Set("gpp.minf") val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, topics) kafkaStream.foreachRDD( rdd => { if (rdd.count > 0){ * val dataFrame = sqlContext.read.json(rdd.map(_._2)) * dataFrame.printSchema() //dataFrame.foreach(println) } }