Something like this?

I'm making the assumption that your topic name equals your keyspace for
this filtering example.

dstream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct.collect
  topics.foreach{topic =>
    val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
    filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
collect with rdd.collect() that brings data to the driver
  }
}


I'm wondering: would something like this (
https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
purposes?

-kr, Gerard.

On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
wrote:

> Hi Adrian,
>
> Can you please give an example of how to achieve this:
>
>> *I would also look at filtering by topic and saving as different Dstreams
>> in your code*
>
> I have managed to get DStream[(String, String)] which is (*topic,my_data)*
> tuple. Lets call it kafkaStringStream.
> Now if I do kafkaStringStream.groupByKey() then I would get a
> DStream[(String,Iterable[String])].
> But I want a DStream instead of Iterable in order to apply saveToCassandra
> for storing it.
>
> Please help in how to transform iterable to DStream or any other
> workaround for achieving same.
>
>
> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> On top of that you could make the topic part of the key (e.g. keyBy in
>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>> operators for the processing.
>>
>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>> topics) and the processing is *really* different, I would also look at
>> filtering by topic and saving as different Dstreams in your code.
>>
>> Either way you need to start with Cody’s tip in order to extract the
>> topic name.
>>
>> -adrian
>>
>> From: Cody Koeninger
>> Date: Thursday, October 1, 2015 at 5:06 PM
>> To: Udit Mehta
>> Cc: user
>> Subject: Re: Kafka Direct Stream
>>
>> You can get the topic for a given partition from the offset range.  You
>> can either filter using that; or just have a single rdd and match on topic
>> when doing mapPartitions or foreachPartition (which I think is a better
>> idea)
>>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>
>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>>
>>> Hi,
>>>
>>> I am using spark direct stream to consume from multiple topics in Kafka.
>>> I am able to consume fine but I am stuck at how to separate the data for
>>> each topic since I need to process data differently depending on the topic.
>>> I basically want to split the RDD consisting on N topics into N RDD's
>>> each having 1 topic.
>>>
>>> Any help would be appreciated.
>>>
>>> Thanks in advance,
>>> Udit
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>

Reply via email to