Hi Siva,

Does topic  has partitions? which version of Spark you are using?

On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com> wrote:

> Hi,
>
> Here is a working example I did.
>
> HTH
>
> Regards,
>
> Sivakumaran S
>
> val topics = "test"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new 
> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local")
> //spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
>     println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
>     System.exit(-1)
>   }
>   val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.
> sparkContext)
>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>   //Process your DF as required here on
> }
>
>
>
> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
>
> Hi,
>
> I am reading json messages from kafka . Topics has 2 partitions. When
> running streaming job using spark-submit, I could see that * val
> dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely.
> Am I doing something wrong here. Below is code .This environment is
> cloudera sandbox env. Same issue in hadoop production cluster mode except
> that it is restricted thats why tried to reproduce issue in Cloudera
> sandbox. Kafka 0.10 and  Spark 1.4.
>
> val kafkaParams = Map[String,String]("bootstrap.
> servers"->"localhost:9093,localhost:9092", "group.id" ->
> "xyz","auto.offset.reset"->"smallest")
> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> val ssc = new StreamingContext(conf, Seconds(1))
>
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>
> val topics = Set("gpp.minf")
> val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>
> kafkaStream.foreachRDD(
>   rdd => {
>     if (rdd.count > 0){
>        * val dataFrame = sqlContext.read.json(rdd.map(_._2)) *
>        dataFrame.printSchema()
> //dataFrame.foreach(println)
> }
> }
>
>
>

Reply via email to