Hi Cody, Without conditional . It is going with fine. But any processing inside conditional get on to waiting (or) something. Facing this issue with partitioned topics. I would need conditional to skip processing when batch is empty. kafkaStream.foreachRDD( rdd => {
val dataFrame = sqlContext.read.json(rdd.map(_._2)) /*if (dataFrame.count() > 0) { dataFrame.foreach(println) } else { println("Empty DStream ") }*/ }) On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <c...@koeninger.org> wrote: > Take out the conditional and the sqlcontext and just do > > rdd => { > rdd.foreach(println) > > > as a base line to see if you're reading the data you expect > > On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: > > Hi, > > > > I am reading json messages from kafka . Topics has 2 partitions. When > > running streaming job using spark-submit, I could see that val > dataFrame = > > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing > > something wrong here. Below is code .This environment is cloudera sandbox > > env. Same issue in hadoop production cluster mode except that it is > > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka > > 0.10 and Spark 1.4. > > > > val kafkaParams = > > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", > > "group.id" -> "xyz","auto.offset.reset"->"smallest") > > val conf = new SparkConf().setMaster("local[3]").setAppName("topic") > > val ssc = new StreamingContext(conf, Seconds(1)) > > > > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) > > > > val topics = Set("gpp.minf") > > val kafkaStream = KafkaUtils.createDirectStream[String, String, > > StringDecoder,StringDecoder](ssc, kafkaParams, topics) > > > > kafkaStream.foreachRDD( > > rdd => { > > if (rdd.count > 0){ > > val dataFrame = sqlContext.read.json(rdd.map(_._2)) > > dataFrame.printSchema() > > //dataFrame.foreach(println) > > } > > } >