Thanks Gerard....the code snippet you shared worked.. but can you please explain/point me the usage of *collect* here. How it is different(performance/readability) from *filter.*
> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))* I am doing something like this.Please tell if I can improve the *Processing time* of this particular code: kafkaStringStream.foreachRDD{rdd => val topics = rdd.map(_._1).distinct().collect() if (topics.length > 0) { val rdd_value = rdd.take(10).mkString("\n.....\n") Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all feeds\n$rdd_value")) topics.foreach { topic => //rdd.filter(x=> x._1 == topic).map(_._2) val filteredRdd = rdd.collect { case (t, data) if t == topic => data } CassandraHelper.saveDataToCassandra(topic, filteredRdd) } updateOffsetsinZk(rdd) } } On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Something like this? > > I'm making the assumption that your topic name equals your keyspace for > this filtering example. > > dstream.foreachRDD{rdd => > val topics = rdd.map(_._1).distinct.collect > topics.foreach{topic => > val filteredRdd = rdd.collect{case (t, data) if t == topic => data}. > filteredRdd.saveToCassandra(topic, "table") // do not confuse this > collect with rdd.collect() that brings data to the driver > } > } > > > I'm wondering: would something like this ( > https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your > purposes? > > -kr, Gerard. > > On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com> > wrote: > >> Hi Adrian, >> >> Can you please give an example of how to achieve this: >> >>> *I would also look at filtering by topic and saving as different >>> Dstreams in your code* >> >> I have managed to get DStream[(String, String)] which is ( >> *topic,my_data)* tuple. Lets call it kafkaStringStream. >> Now if I do kafkaStringStream.groupByKey() then I would get a >> DStream[(String,Iterable[String])]. >> But I want a DStream instead of Iterable in order to apply >> saveToCassandra for storing it. >> >> Please help in how to transform iterable to DStream or any other >> workaround for achieving same. >> >> >> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote: >> >>> On top of that you could make the topic part of the key (e.g. keyBy in >>> .transform or manually emitting a tuple) and use one of the .xxxByKey >>> operators for the processing. >>> >>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>> topics) and the processing is *really* different, I would also look at >>> filtering by topic and saving as different Dstreams in your code. >>> >>> Either way you need to start with Cody’s tip in order to extract the >>> topic name. >>> >>> -adrian >>> >>> From: Cody Koeninger >>> Date: Thursday, October 1, 2015 at 5:06 PM >>> To: Udit Mehta >>> Cc: user >>> Subject: Re: Kafka Direct Stream >>> >>> You can get the topic for a given partition from the offset range. You >>> can either filter using that; or just have a single rdd and match on topic >>> when doing mapPartitions or foreachPartition (which I think is a better >>> idea) >>> >>> >>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >>> >>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote: >>> >>>> Hi, >>>> >>>> I am using spark direct stream to consume from multiple topics in >>>> Kafka. I am able to consume fine but I am stuck at how to separate the data >>>> for each topic since I need to process data differently depending on the >>>> topic. >>>> I basically want to split the RDD consisting on N topics into N RDD's >>>> each having 1 topic. >>>> >>>> Any help would be appreciated. >>>> >>>> Thanks in advance, >>>> Udit >>>> >>> >>> >> >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*