I got it working by using jsonRDD. This is what I had to do in order to
make it work :

      val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
      val lines = messages.map(_._2.toString)
      lines.foreachRDD(jsonRDD => {
        val sqlContext =
SQLContextSingleton.getInstance(jsonRDD.sparkContext)
        val data = sqlContext.read.json(jsonRDD)
        data.printSchema()
        data.show()
        data.select("COL_NAME").show()
        data.groupBy("COL_NAME").count().show()
      })

Not sure though if it's the best way to achieve this.



[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>


On Fri, Feb 26, 2016 at 5:21 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> You can use `DStream.map` to transform objects to anything you want.
>
> On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <donta...@gmail.com>
> wrote:
>
>> Hi group,
>>
>> I have just started working with confluent platform and spark streaming,
>> and was wondering if it is possible to access individual fields from an
>> Avro object read from a kafka topic through spark streaming. As per its
>> default behaviour *KafkaUtils.createDirectStream[Object, Object,
>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
>> a *DStream[Object, Object]*, and don't have any schema associated with
>> *Object*(or I am unable to figure it out). This makes it impossible to
>> perform some operations on this DStream, for example, converting it to a
>> Spark DataFrame.
>>
>> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
>> *Object *I think I am going in the wrong direction. Any
>> pointers/suggestions would be really helpful.
>>
>> *Versions used :*
>> confluent-1.0.1
>> spark-1.6.0-bin-hadoop2.4
>> Scala code runner version - 2.11.6
>>
>> And this is the small piece of code I am using :
>>
>> package org.myorg.scalaexamples
>>
>> import org.apache.spark.rdd.RDD
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming._
>> import org.apache.spark.SparkContext
>> import org.apache.avro.mapred.AvroKey
>> import org.apache.spark.sql.SQLContext
>> //import org.apache.avro.mapred.AvroValue
>> import org.apache.spark.streaming.kafka._
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.avro.generic.GenericRecord
>> import org.apache.spark.streaming.dstream.DStream
>> import io.confluent.kafka.serializers.KafkaAvroDecoder
>> //import org.apache.hadoop.io.serializer.avro.AvroRecord
>> //import org.apache.spark.streaming.dstream.ForEachDStream
>> import org.apache.spark.sql.SQLContext
>> import org.apache.kafka.common.serialization.Deserializer
>>
>> object DirectKafkaWordCount {
>>   def main(args: Array[String]) {
>>     if (args.length < 2) {
>>       System.err.println(s"""
>>         |Usage: DirectKafkaWordCount <brokers> <topics>
>>         |  <brokers> is a list of one or more Kafka brokers
>>         |  <topics> is a list of one or more kafka topics to consume from
>>         |
>>         """.stripMargin)
>>       System.exit(1)
>>     }
>>     val Array(brokers, topics) = args
>>     val sparkConf = new
>> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>>
>> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
>>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>     val topicsSet = topics.split(",").toSet
>>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> brokers, "group.id" -> "consumer",
>>       "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
>> http://localhost:8081";)
>>     val messages = KafkaUtils.createDirectStream[Object, Object,
>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>>     messages.print()
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>> }
>>
>> Thank you so much for your valuable time!
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> <http://about.me/mti>
>>
>>
>
>

Reply via email to