You can use `DStream.map` to transform objects to anything you want. On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <donta...@gmail.com> wrote:
> Hi group, > > I have just started working with confluent platform and spark streaming, > and was wondering if it is possible to access individual fields from an > Avro object read from a kafka topic through spark streaming. As per its > default behaviour *KafkaUtils.createDirectStream[Object, Object, > KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return > a *DStream[Object, Object]*, and don't have any schema associated with > *Object*(or I am unable to figure it out). This makes it impossible to > perform some operations on this DStream, for example, converting it to a > Spark DataFrame. > > Since *KafkaAvroDecoder *doesn't allow us to have any other Class but > *Object *I think I am going in the wrong direction. Any > pointers/suggestions would be really helpful. > > *Versions used :* > confluent-1.0.1 > spark-1.6.0-bin-hadoop2.4 > Scala code runner version - 2.11.6 > > And this is the small piece of code I am using : > > package org.myorg.scalaexamples > > import org.apache.spark.rdd.RDD > import org.apache.spark.SparkConf > import org.apache.spark.streaming._ > import org.apache.spark.SparkContext > import org.apache.avro.mapred.AvroKey > import org.apache.spark.sql.SQLContext > //import org.apache.avro.mapred.AvroValue > import org.apache.spark.streaming.kafka._ > import org.apache.spark.storage.StorageLevel > import org.apache.avro.generic.GenericRecord > import org.apache.spark.streaming.dstream.DStream > import io.confluent.kafka.serializers.KafkaAvroDecoder > //import org.apache.hadoop.io.serializer.avro.AvroRecord > //import org.apache.spark.streaming.dstream.ForEachDStream > import org.apache.spark.sql.SQLContext > import org.apache.kafka.common.serialization.Deserializer > > object DirectKafkaWordCount { > def main(args: Array[String]) { > if (args.length < 2) { > System.err.println(s""" > |Usage: DirectKafkaWordCount <brokers> <topics> > | <brokers> is a list of one or more Kafka brokers > | <topics> is a list of one or more kafka topics to consume from > | > """.stripMargin) > System.exit(1) > } > val Array(brokers, topics) = args > val sparkConf = new > SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]") > > sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]])) > val ssc = new StreamingContext(sparkConf, Seconds(5)) > val topicsSet = topics.split(",").toSet > val kafkaParams = Map[String, String]("metadata.broker.list" -> > brokers, "group.id" -> "consumer", > "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> " > http://localhost:8081") > val messages = KafkaUtils.createDirectStream[Object, Object, > KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet) > messages.print() > ssc.start() > ssc.awaitTermination() > } > } > > Thank you so much for your valuable time! > > > [image: http://] > > Tariq, Mohammad > about.me/mti > [image: http://] > <http://about.me/mti> > >