You can use `DStream.map` to transform objects to anything you want.

On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <donta...@gmail.com> wrote:

> Hi group,
>
> I have just started working with confluent platform and spark streaming,
> and was wondering if it is possible to access individual fields from an
> Avro object read from a kafka topic through spark streaming. As per its
> default behaviour *KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
> a *DStream[Object, Object]*, and don't have any schema associated with
> *Object*(or I am unable to figure it out). This makes it impossible to
> perform some operations on this DStream, for example, converting it to a
> Spark DataFrame.
>
> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
> *Object *I think I am going in the wrong direction. Any
> pointers/suggestions would be really helpful.
>
> *Versions used :*
> confluent-1.0.1
> spark-1.6.0-bin-hadoop2.4
> Scala code runner version - 2.11.6
>
> And this is the small piece of code I am using :
>
> package org.myorg.scalaexamples
>
> import org.apache.spark.rdd.RDD
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.SparkContext
> import org.apache.avro.mapred.AvroKey
> import org.apache.spark.sql.SQLContext
> //import org.apache.avro.mapred.AvroValue
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.storage.StorageLevel
> import org.apache.avro.generic.GenericRecord
> import org.apache.spark.streaming.dstream.DStream
> import io.confluent.kafka.serializers.KafkaAvroDecoder
> //import org.apache.hadoop.io.serializer.avro.AvroRecord
> //import org.apache.spark.streaming.dstream.ForEachDStream
> import org.apache.spark.sql.SQLContext
> import org.apache.kafka.common.serialization.Deserializer
>
> object DirectKafkaWordCount {
>   def main(args: Array[String]) {
>     if (args.length < 2) {
>       System.err.println(s"""
>         |Usage: DirectKafkaWordCount <brokers> <topics>
>         |  <brokers> is a list of one or more Kafka brokers
>         |  <topics> is a list of one or more kafka topics to consume from
>         |
>         """.stripMargin)
>       System.exit(1)
>     }
>     val Array(brokers, topics) = args
>     val sparkConf = new
> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>
> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers, "group.id" -> "consumer",
>       "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
> http://localhost:8081";)
>     val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>     messages.print()
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> Thank you so much for your valuable time!
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>

Reply via email to