I went through the story and as I understood it is for saving data to
multiple keyspaces at once.
How will it work for saving data to multiple tables in same keyspace.
I think tableName: String should also be tableName: T=>String..
Let me know if I understood incorrectly..


On Sat, Oct 3, 2015 at 9:55 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi,
>
> collect(partialFunction) is equivalent to filter(x=>
> partialFunction.isDefinedAt(x)).map(partialFunction)  so it's functionally
> equivalent to your expression. I favor collect for its more compact form
> but that's a personal preference. Use what you feel reads best.
>
> Regarding performance, there will be some overhead of submitting many a
> task for every filtered RDD that gets materialized to Cassandra. That's the
> reason I proposed the ticket linked above. Have a look whether that would
> improve your particular usecase and vote for it if so :-)
>
> -kr, Gerard.
>
> On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Thanks Gerard....the code snippet you shared worked.. but can you please
>> explain/point me the usage of *collect* here. How it is
>> different(performance/readability) from *filter.*
>>
>>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*
>>
>>
>> I am doing something like this.Please tell if I can improve the *Processing
>> time* of this particular code:
>>
>> kafkaStringStream.foreachRDD{rdd =>
>>   val topics = rdd.map(_._1).distinct().collect()
>>   if (topics.length > 0) {
>>     val rdd_value = rdd.take(10).mkString("\n.....\n")
>>     Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all 
>> feeds\n$rdd_value"))
>>
>>     topics.foreach { topic =>
>>       //rdd.filter(x=> x._1 == topic).map(_._2)
>>       val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
>>       CassandraHelper.saveDataToCassandra(topic, filteredRdd)
>>     }
>>     updateOffsetsinZk(rdd)
>>   }
>>
>> }
>>
>> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Something like this?
>>>
>>> I'm making the assumption that your topic name equals your keyspace for
>>> this filtering example.
>>>
>>> dstream.foreachRDD{rdd =>
>>>   val topics = rdd.map(_._1).distinct.collect
>>>   topics.foreach{topic =>
>>>     val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>>>     filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
>>> collect with rdd.collect() that brings data to the driver
>>>   }
>>> }
>>>
>>>
>>> I'm wondering: would something like this (
>>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
>>> purposes?
>>>
>>> -kr, Gerard.
>>>
>>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
>>> wrote:
>>>
>>>> Hi Adrian,
>>>>
>>>> Can you please give an example of how to achieve this:
>>>>
>>>>> *I would also look at filtering by topic and saving as different
>>>>> Dstreams in your code*
>>>>
>>>> I have managed to get DStream[(String, String)] which is (
>>>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>>>> Now if I do kafkaStringStream.groupByKey() then I would get a
>>>> DStream[(String,Iterable[String])].
>>>> But I want a DStream instead of Iterable in order to apply
>>>> saveToCassandra for storing it.
>>>>
>>>> Please help in how to transform iterable to DStream or any other
>>>> workaround for achieving same.
>>>>
>>>>
>>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com>
>>>> wrote:
>>>>
>>>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>>>> operators for the processing.
>>>>>
>>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>>>> topics) and the processing is *really* different, I would also look
>>>>> at filtering by topic and saving as different Dstreams in your code.
>>>>>
>>>>> Either way you need to start with Cody’s tip in order to extract the
>>>>> topic name.
>>>>>
>>>>> -adrian
>>>>>
>>>>> From: Cody Koeninger
>>>>> Date: Thursday, October 1, 2015 at 5:06 PM
>>>>> To: Udit Mehta
>>>>> Cc: user
>>>>> Subject: Re: Kafka Direct Stream
>>>>>
>>>>> You can get the topic for a given partition from the offset range.
>>>>> You can either filter using that; or just have a single rdd and match on
>>>>> topic when doing mapPartitions or foreachPartition (which I think is a
>>>>> better idea)
>>>>>
>>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>>>
>>>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am using spark direct stream to consume from multiple topics in
>>>>>> Kafka. I am able to consume fine but I am stuck at how to separate the 
>>>>>> data
>>>>>> for each topic since I need to process data differently depending on the
>>>>>> topic.
>>>>>> I basically want to split the RDD consisting on N topics into N RDD's
>>>>>> each having 1 topic.
>>>>>>
>>>>>> Any help would be appreciated.
>>>>>>
>>>>>> Thanks in advance,
>>>>>> Udit
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> *VARUN SHARMA*
>>>> *Flipkart*
>>>> *Bangalore*
>>>>
>>>
>>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Reply via email to