You could also do this with Datasets, which will probably be a little more efficient (since you are telling us you only care about one column)
ds1.select($"value".as[Array[Byte]]).map(Student.parseFrom) On Thu, Nov 17, 2016 at 1:05 PM, shyla deshpande <deshpandesh...@gmail.com> wrote: > Hello everyone, > The following code works ... > > def main(args : Array[String]) { > > val spark = SparkSession.builder. > master("local") > .appName("spark session example") > .getOrCreate() > > import spark.implicits._ > > val ds1 = spark.readStream.format("kafka"). > option("kafka.bootstrap.servers","localhost:9092"). > option("subscribe","student").load() > > val ds2 = ds1.map(row=> > row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_)) > > val query = ds2.writeStream > .outputMode("append") > .format("console") > .start() > > query.awaitTermination() > > } > > > On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande < > deshpandesh...@gmail.com> wrote: > >> val spark = SparkSession.builder. >> master("local") >> .appName("spark session example") >> .getOrCreate() >> >> import spark.implicits._ >> >> val dframe1 = spark.readStream.format("kafka"). >> option("kafka.bootstrap.servers","localhost:9092"). >> option("subscribe","student").load() >> >> *How do I deserialize the value column from dataframe1 * >> >> *which is Array[Byte] to Student object using Student.parseFrom..???* >> >> *Please help.* >> >> *Thanks.* >> >> >> >> // Stream of votes from Kafka as bytesval votesAsBytes = >> KafkaUtils.createDirectStream[String, Array[Byte]]( >> ssc, LocationStrategies.PreferConsistent, >> ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), >> kafkaParams)) >> // Parse them into Vote case class.val votes: DStream[Vote] = >> votesAsBytes.map { >> (cr: ConsumerRecord[String, Array[Byte]]) => >> Vote.parseFrom(cr.value())} >> >> >