Something like this? I'm making the assumption that your topic name equals your keyspace for this filtering example.
dstream.foreachRDD{rdd => val topics = rdd.map(_._1).distinct.collect topics.foreach{topic => val filteredRdd = rdd.collect{case (t, data) if t == topic => data}. filteredRdd.saveToCassandra(topic, "table") // do not confuse this collect with rdd.collect() that brings data to the driver } } I'm wondering: would something like this ( https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your purposes? -kr, Gerard. On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com> wrote: > Hi Adrian, > > Can you please give an example of how to achieve this: > >> *I would also look at filtering by topic and saving as different Dstreams >> in your code* > > I have managed to get DStream[(String, String)] which is (*topic,my_data)* > tuple. Lets call it kafkaStringStream. > Now if I do kafkaStringStream.groupByKey() then I would get a > DStream[(String,Iterable[String])]. > But I want a DStream instead of Iterable in order to apply saveToCassandra > for storing it. > > Please help in how to transform iterable to DStream or any other > workaround for achieving same. > > > On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote: > >> On top of that you could make the topic part of the key (e.g. keyBy in >> .transform or manually emitting a tuple) and use one of the .xxxByKey >> operators for the processing. >> >> If you have a stable, domain specific list of topics (e.g. 3-5 named >> topics) and the processing is *really* different, I would also look at >> filtering by topic and saving as different Dstreams in your code. >> >> Either way you need to start with Cody’s tip in order to extract the >> topic name. >> >> -adrian >> >> From: Cody Koeninger >> Date: Thursday, October 1, 2015 at 5:06 PM >> To: Udit Mehta >> Cc: user >> Subject: Re: Kafka Direct Stream >> >> You can get the topic for a given partition from the offset range. You >> can either filter using that; or just have a single rdd and match on topic >> when doing mapPartitions or foreachPartition (which I think is a better >> idea) >> >> >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >> >> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote: >> >>> Hi, >>> >>> I am using spark direct stream to consume from multiple topics in Kafka. >>> I am able to consume fine but I am stuck at how to separate the data for >>> each topic since I need to process data differently depending on the topic. >>> I basically want to split the RDD consisting on N topics into N RDD's >>> each having 1 topic. >>> >>> Any help would be appreciated. >>> >>> Thanks in advance, >>> Udit >>> >> >> > > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* >