I got it working by using jsonRDD. This is what I had to do in order to make it work :
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet) val lines = messages.map(_._2.toString) lines.foreachRDD(jsonRDD => { val sqlContext = SQLContextSingleton.getInstance(jsonRDD.sparkContext) val data = sqlContext.read.json(jsonRDD) data.printSchema() data.show() data.select("COL_NAME").show() data.groupBy("COL_NAME").count().show() }) Not sure though if it's the best way to achieve this. [image: http://] Tariq, Mohammad about.me/mti [image: http://] <http://about.me/mti> On Fri, Feb 26, 2016 at 5:21 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com > wrote: > You can use `DStream.map` to transform objects to anything you want. > > On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <donta...@gmail.com> > wrote: > >> Hi group, >> >> I have just started working with confluent platform and spark streaming, >> and was wondering if it is possible to access individual fields from an >> Avro object read from a kafka topic through spark streaming. As per its >> default behaviour *KafkaUtils.createDirectStream[Object, Object, >> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return >> a *DStream[Object, Object]*, and don't have any schema associated with >> *Object*(or I am unable to figure it out). This makes it impossible to >> perform some operations on this DStream, for example, converting it to a >> Spark DataFrame. >> >> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but >> *Object *I think I am going in the wrong direction. Any >> pointers/suggestions would be really helpful. >> >> *Versions used :* >> confluent-1.0.1 >> spark-1.6.0-bin-hadoop2.4 >> Scala code runner version - 2.11.6 >> >> And this is the small piece of code I am using : >> >> package org.myorg.scalaexamples >> >> import org.apache.spark.rdd.RDD >> import org.apache.spark.SparkConf >> import org.apache.spark.streaming._ >> import org.apache.spark.SparkContext >> import org.apache.avro.mapred.AvroKey >> import org.apache.spark.sql.SQLContext >> //import org.apache.avro.mapred.AvroValue >> import org.apache.spark.streaming.kafka._ >> import org.apache.spark.storage.StorageLevel >> import org.apache.avro.generic.GenericRecord >> import org.apache.spark.streaming.dstream.DStream >> import io.confluent.kafka.serializers.KafkaAvroDecoder >> //import org.apache.hadoop.io.serializer.avro.AvroRecord >> //import org.apache.spark.streaming.dstream.ForEachDStream >> import org.apache.spark.sql.SQLContext >> import org.apache.kafka.common.serialization.Deserializer >> >> object DirectKafkaWordCount { >> def main(args: Array[String]) { >> if (args.length < 2) { >> System.err.println(s""" >> |Usage: DirectKafkaWordCount <brokers> <topics> >> | <brokers> is a list of one or more Kafka brokers >> | <topics> is a list of one or more kafka topics to consume from >> | >> """.stripMargin) >> System.exit(1) >> } >> val Array(brokers, topics) = args >> val sparkConf = new >> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]") >> >> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]])) >> val ssc = new StreamingContext(sparkConf, Seconds(5)) >> val topicsSet = topics.split(",").toSet >> val kafkaParams = Map[String, String]("metadata.broker.list" -> >> brokers, "group.id" -> "consumer", >> "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> " >> http://localhost:8081") >> val messages = KafkaUtils.createDirectStream[Object, Object, >> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet) >> messages.print() >> ssc.start() >> ssc.awaitTermination() >> } >> } >> >> Thank you so much for your valuable time! >> >> >> [image: http://] >> >> Tariq, Mohammad >> about.me/mti >> [image: http://] >> <http://about.me/mti> >> >> > >