Thanks Gerard....the code snippet you shared worked.. but can you please
explain/point me the usage of *collect* here. How it is
different(performance/readability) from *filter.*

> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*


I am doing something like this.Please tell if I can improve the *Processing
time* of this particular code:

kafkaStringStream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct().collect()
  if (topics.length > 0) {
    val rdd_value = rdd.take(10).mkString("\n.....\n")
    Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all
feeds\n$rdd_value"))

    topics.foreach { topic =>
      //rdd.filter(x=> x._1 == topic).map(_._2)
      val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
      CassandraHelper.saveDataToCassandra(topic, filteredRdd)
    }
    updateOffsetsinZk(rdd)
  }

}

On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Something like this?
>
> I'm making the assumption that your topic name equals your keyspace for
> this filtering example.
>
> dstream.foreachRDD{rdd =>
>   val topics = rdd.map(_._1).distinct.collect
>   topics.foreach{topic =>
>     val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>     filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
> collect with rdd.collect() that brings data to the driver
>   }
> }
>
>
> I'm wondering: would something like this (
> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
> purposes?
>
> -kr, Gerard.
>
> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Hi Adrian,
>>
>> Can you please give an example of how to achieve this:
>>
>>> *I would also look at filtering by topic and saving as different
>>> Dstreams in your code*
>>
>> I have managed to get DStream[(String, String)] which is (
>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>> Now if I do kafkaStringStream.groupByKey() then I would get a
>> DStream[(String,Iterable[String])].
>> But I want a DStream instead of Iterable in order to apply
>> saveToCassandra for storing it.
>>
>> Please help in how to transform iterable to DStream or any other
>> workaround for achieving same.
>>
>>
>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>> operators for the processing.
>>>
>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>> topics) and the processing is *really* different, I would also look at
>>> filtering by topic and saving as different Dstreams in your code.
>>>
>>> Either way you need to start with Cody’s tip in order to extract the
>>> topic name.
>>>
>>> -adrian
>>>
>>> From: Cody Koeninger
>>> Date: Thursday, October 1, 2015 at 5:06 PM
>>> To: Udit Mehta
>>> Cc: user
>>> Subject: Re: Kafka Direct Stream
>>>
>>> You can get the topic for a given partition from the offset range.  You
>>> can either filter using that; or just have a single rdd and match on topic
>>> when doing mapPartitions or foreachPartition (which I think is a better
>>> idea)
>>>
>>>
>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>
>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using spark direct stream to consume from multiple topics in
>>>> Kafka. I am able to consume fine but I am stuck at how to separate the data
>>>> for each topic since I need to process data differently depending on the
>>>> topic.
>>>> I basically want to split the RDD consisting on N topics into N RDD's
>>>> each having 1 topic.
>>>>
>>>> Any help would be appreciated.
>>>>
>>>> Thanks in advance,
>>>> Udit
>>>>
>>>
>>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Reply via email to