I went through the story and as I understood it is for saving data to multiple keyspaces at once. How will it work for saving data to multiple tables in same keyspace. I think tableName: String should also be tableName: T=>String.. Let me know if I understood incorrectly..
On Sat, Oct 3, 2015 at 9:55 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Hi, > > collect(partialFunction) is equivalent to filter(x=> > partialFunction.isDefinedAt(x)).map(partialFunction) so it's functionally > equivalent to your expression. I favor collect for its more compact form > but that's a personal preference. Use what you feel reads best. > > Regarding performance, there will be some overhead of submitting many a > task for every filtered RDD that gets materialized to Cassandra. That's the > reason I proposed the ticket linked above. Have a look whether that would > improve your particular usecase and vote for it if so :-) > > -kr, Gerard. > > On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com> > wrote: > >> Thanks Gerard....the code snippet you shared worked.. but can you please >> explain/point me the usage of *collect* here. How it is >> different(performance/readability) from *filter.* >> >>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))* >> >> >> I am doing something like this.Please tell if I can improve the *Processing >> time* of this particular code: >> >> kafkaStringStream.foreachRDD{rdd => >> val topics = rdd.map(_._1).distinct().collect() >> if (topics.length > 0) { >> val rdd_value = rdd.take(10).mkString("\n.....\n") >> Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all >> feeds\n$rdd_value")) >> >> topics.foreach { topic => >> //rdd.filter(x=> x._1 == topic).map(_._2) >> val filteredRdd = rdd.collect { case (t, data) if t == topic => data } >> CassandraHelper.saveDataToCassandra(topic, filteredRdd) >> } >> updateOffsetsinZk(rdd) >> } >> >> } >> >> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com> >> wrote: >> >>> Something like this? >>> >>> I'm making the assumption that your topic name equals your keyspace for >>> this filtering example. >>> >>> dstream.foreachRDD{rdd => >>> val topics = rdd.map(_._1).distinct.collect >>> topics.foreach{topic => >>> val filteredRdd = rdd.collect{case (t, data) if t == topic => data}. >>> filteredRdd.saveToCassandra(topic, "table") // do not confuse this >>> collect with rdd.collect() that brings data to the driver >>> } >>> } >>> >>> >>> I'm wondering: would something like this ( >>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your >>> purposes? >>> >>> -kr, Gerard. >>> >>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com> >>> wrote: >>> >>>> Hi Adrian, >>>> >>>> Can you please give an example of how to achieve this: >>>> >>>>> *I would also look at filtering by topic and saving as different >>>>> Dstreams in your code* >>>> >>>> I have managed to get DStream[(String, String)] which is ( >>>> *topic,my_data)* tuple. Lets call it kafkaStringStream. >>>> Now if I do kafkaStringStream.groupByKey() then I would get a >>>> DStream[(String,Iterable[String])]. >>>> But I want a DStream instead of Iterable in order to apply >>>> saveToCassandra for storing it. >>>> >>>> Please help in how to transform iterable to DStream or any other >>>> workaround for achieving same. >>>> >>>> >>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> >>>> wrote: >>>> >>>>> On top of that you could make the topic part of the key (e.g. keyBy in >>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey >>>>> operators for the processing. >>>>> >>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named >>>>> topics) and the processing is *really* different, I would also look >>>>> at filtering by topic and saving as different Dstreams in your code. >>>>> >>>>> Either way you need to start with Cody’s tip in order to extract the >>>>> topic name. >>>>> >>>>> -adrian >>>>> >>>>> From: Cody Koeninger >>>>> Date: Thursday, October 1, 2015 at 5:06 PM >>>>> To: Udit Mehta >>>>> Cc: user >>>>> Subject: Re: Kafka Direct Stream >>>>> >>>>> You can get the topic for a given partition from the offset range. >>>>> You can either filter using that; or just have a single rdd and match on >>>>> topic when doing mapPartitions or foreachPartition (which I think is a >>>>> better idea) >>>>> >>>>> >>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >>>>> >>>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am using spark direct stream to consume from multiple topics in >>>>>> Kafka. I am able to consume fine but I am stuck at how to separate the >>>>>> data >>>>>> for each topic since I need to process data differently depending on the >>>>>> topic. >>>>>> I basically want to split the RDD consisting on N topics into N RDD's >>>>>> each having 1 topic. >>>>>> >>>>>> Any help would be appreciated. >>>>>> >>>>>> Thanks in advance, >>>>>> Udit >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> *VARUN SHARMA* >>>> *Flipkart* >>>> *Bangalore* >>>> >>> >>> >> >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*