Re: How to restrict foreach on a streaming RDD only once upon receiver completion
Thanks. I’ll look into it. But the JSON string I push via receiver goes through a series of transformations, before it ends up in the final RDD. I need to take care to ensure that this magic value propagates all the way down to the last one that I’m iterating on. Currently, I’m calling “stop" from the receiver once its done fetching all the records and have a StreamingListener to act on it via the “onReceiverStopped” hook through which I’m stopping the streamingContext and it seems to be working except that I see this message "2015-04-06 16:41:48,002 WARN [StreamingListenerBus] org.apache.spark.Logging$class.logWarning - All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,XYZ,)): Is this not advised? BTW I’m running in local mode. > On Apr 7, 2015, at 1:43 AM, Michael Malak <mailto:michaelma...@yahoo.com>> wrote: > > You could have your receiver send a "magic value" when it is done. I discuss > this Spark Streaming pattern in my presentation "Spark Gotchas and > Anti-Patterns". In the PDF version, it's slides 34-36. > http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language > > <http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language> > > YouTube version cued to that place: > http://www.youtube.com/watch?v=W5Uece_JmNs&t=23m18s > <http://www.youtube.com/watch?v=W5Uece_JmNs&t=23m18s> > > > From: Hari Polisetty mailto:hpoli...@icloud.com>> > To: Tathagata Das mailto:t...@databricks.com>> > Cc: user mailto:user@spark.apache.org>> > Sent: Monday, April 6, 2015 2:02 PM > Subject: Re: How to restrict foreach on a streaming RDD only once upon > receiver completion > > Yes, I’m using updateStateByKey and it works. But then I need to perform > further computation on this Stateful RDD (see code snippet below). I perform > forEach on the final RDD and get the top 10 records. I just don’t want the > foreach to be performed every time a new batch is received. Only when the > receiver is done fetching all the records. > > My requirements are to programmatically invoke the E.S query (it varies by > usecase) , get all the records and apply certain transformations and get the > top 10 results based on certain criteria back into the driver program for > further processing. I’m able to apply the transformations on the batches of > records fetched from E.S using streaming. So, I don’t need to wait for all > the records to be fetched. The RDD transformations are happening all the time > and the top k results are getting updated constantly until all the records > are fetched by the receiver. Is there any drawback with this approach? > > Can you give more pointers on what you mean by creating a custom RDD that > reads from ElasticSearch? > > Here is the relevant portion of my Spark streaming code: > > //Create a custom streaming receiver to query for relevant data > from E.S > JavaReceiverInputDStream jsonStrings = > ssc.receiverStream( > new ElasticSearchResponseReceiver(query…….)); > > //Apply JSON Paths to extract specific value(s) from each record > JavaDStream fieldVariations = jsonStrings.flatMap(new > FlatMapFunction() { > private static final long serialVersionUID = > 465237345751948L; > > @Override > public Iterable call(String jsonString) { > List r = JsonPath.read(jsonString, > attributeDetail.getJsonPath()); > return r; > } > > }); > > //Perform a stateful map reduce on each variation > JavaPairDStream fieldVariationCounts = > fieldVariations.mapToPair( > new PairFunction() { > private static final long > serialVersionUID = -1241276515559408238L; > > @Override public Tuple2 Integer> call(String s) { > return new Tuple2 Integer>(s, 1); > } > }).updateStateByKey(new Function2, > Optional, > Optional> () { > private static final long > serialVersionUID = 7598681835161199865L; > > public Optional > c
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
You could have your receiver send a "magic value" when it is done. I discuss this Spark Streaming pattern in my presentation "Spark Gotchas and Anti-Patterns". In the PDF version, it's slides 34-36.http://www.datascienceassn.org/content/2014-11-05-spark-gotchas-and-anti-patterns-julia-language YouTube version cued to that place: http://www.youtube.com/watch?v=W5Uece_JmNs&t=23m18s From: Hari Polisetty To: Tathagata Das Cc: user Sent: Monday, April 6, 2015 2:02 PM Subject: Re: How to restrict foreach on a streaming RDD only once upon receiver completion Yes, I’m using updateStateByKey and it works. But then I need to perform further computation on this Stateful RDD (see code snippet below). I perform forEach on the final RDD and get the top 10 records. I just don’t want the foreach to be performed every time a new batch is received. Only when the receiver is done fetching all the records. My requirements are to programmatically invoke the E.S query (it varies by usecase) , get all the records and apply certain transformations and get the top 10 results based on certain criteria back into the driver program for further processing. I’m able to apply the transformations on the batches of records fetched from E.S using streaming. So, I don’t need to wait for all the records to be fetched. The RDD transformations are happening all the time and the top k results are getting updated constantly until all the records are fetched by the receiver. Is there any drawback with this approach? Can you give more pointers on what you mean by creating a custom RDD that reads from ElasticSearch? Here is the relevant portion of my Spark streaming code: //Create a custom streaming receiver to query for relevant data from E.S JavaReceiverInputDStream jsonStrings = ssc.receiverStream( new ElasticSearchResponseReceiver(query…….)); //Apply JSON Paths to extract specific value(s) from each record JavaDStream fieldVariations = jsonStrings.flatMap(new FlatMapFunction() { private static final long serialVersionUID = 465237345751948L; @Override public Iterable call(String jsonString) { List r = JsonPath.read(jsonString, attributeDetail.getJsonPath()); return r; } }); //Perform a stateful map reduce on each variation JavaPairDStream fieldVariationCounts = fieldVariations.mapToPair( new PairFunction() { private static final long serialVersionUID = -1241276515559408238L; @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }).updateStateByKey(new Function2, Optional, Optional> () { private static final long serialVersionUID = 7598681835161199865L; public Optional call(List nums, Optional current) { Integer sum = current.or((int) 0L); return (Optional) Optional.of(sum + nums.size()); } }).reduceByKey(new Function2() { private static final long serialVersionUID = -5906059838295609562L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //Swap the Map from Enum String,Int to Int,Enum String. This is so that we can sort on frequencies JavaPairDStream swappedPair = fieldVariationCounts.mapToPair(new PairFunction, Integer, String>() { private static final long serialVersionUID = -5889774695187619957L; @Override public Tuple2 call(Tuple2 item) throws Exception { return item.swap(); } }); //Sort based on Key i.e, frequency JavaPairDStream sortedCounts = swappedPair.transformToPair( new Function, JavaPairRDD>() { private static final long serialVersionUID = -4172702039963232779L; public JavaPairRDD call(JavaPairRDD in) throws Exception { //False to denote sort in descending order return in.sortByKey(false); } }); //Iterate through the RDD and get the top 20 values in the sorted pair and write to results list sortedCounts.foreach( new Function, Void> () { private static final long serialVersionUID = 2186144129973051920L; public Void call(JavaPairRDD rdd) { resultList.clear(); for (Tuple2 t: rdd.take(MainDriver.NUMBER_OF_TOP_VARIATIONS)) { resultList.add(new Tuple3(t._2(), t._1(), (double) (100*t._1())/totalProcessed.value())); } return null; } } ); On Apr 7, 2015, at 1:14 AM, Tathagata Das wrote: So you want to sort based on the total count of the all the records received through receiver? In that case, you have to combine all the counts using updateStateByKey (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala) But stepping back, if you want to get the final results at the end of the receiving all the data (as opposed to continuously), why are you even using streaming? You could create a custom RDD that reads from ElasticSearch and then use it in a Spark program. I think that's more natural as your application is more batch-like than streaming-like as you are using the results in real-time. TD On Mon, Apr 6, 2015 at 12:31 PM, Hari Polisetty wrote: I have created a Cu
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
Yes, I’m using updateStateByKey and it works. But then I need to perform further computation on this Stateful RDD (see code snippet below). I perform forEach on the final RDD and get the top 10 records. I just don’t want the foreach to be performed every time a new batch is received. Only when the receiver is done fetching all the records. My requirements are to programmatically invoke the E.S query (it varies by usecase) , get all the records and apply certain transformations and get the top 10 results based on certain criteria back into the driver program for further processing. I’m able to apply the transformations on the batches of records fetched from E.S using streaming. So, I don’t need to wait for all the records to be fetched. The RDD transformations are happening all the time and the top k results are getting updated constantly until all the records are fetched by the receiver. Is there any drawback with this approach? Can you give more pointers on what you mean by creating a custom RDD that reads from ElasticSearch? Here is the relevant portion of my Spark streaming code: //Create a custom streaming receiver to query for relevant data from E.S JavaReceiverInputDStream jsonStrings = ssc.receiverStream( new ElasticSearchResponseReceiver(query…….)); //Apply JSON Paths to extract specific value(s) from each record JavaDStream fieldVariations = jsonStrings.flatMap(new FlatMapFunction() { private static final long serialVersionUID = 465237345751948L; @Override public Iterable call(String jsonString) { List r = JsonPath.read(jsonString, attributeDetail.getJsonPath()); return r; } }); //Perform a stateful map reduce on each variation JavaPairDStream fieldVariationCounts = fieldVariations.mapToPair( new PairFunction() { private static final long serialVersionUID = -1241276515559408238L; @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }).updateStateByKey(new Function2, Optional, Optional> () { private static final long serialVersionUID = 7598681835161199865L; public Optional call(List nums, Optional current) { Integer sum = current.or((int) 0L); return (Optional) Optional.of(sum + nums.size()); } }).reduceByKey(new Function2() { private static final long serialVersionUID = -5906059838295609562L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //Swap the Map from Enum String,Int to Int,Enum String. This is so that we can sort on frequencies JavaPairDStream swappedPair = fieldVariationCounts.mapToPair(new PairFunction, Integer, String>() { private static final long serialVersionUID = -5889774695187619957L; @Override public Tuple2 call(Tuple2 item) throws Exception { return item.swap(); } }); //Sort based on Key i.e, frequency JavaPairDStream sortedCounts = swappedPair.transformToPair( new Function, JavaPairRDD>() { private static final long serialVersionUID = -4172702039963232779L; public JavaPairRDD call(JavaPairRDD in) throws Exception { //False to denote sort in descending order return in.sortByKey(false); } }); //Iterate through the RDD and get the top 20 values in the sorted pair and write to results list sortedCounts.foreach( new Function, Void> () { private static final long serialVersionUID = 218614412997305192
Re: How to restrict foreach on a streaming RDD only once upon receiver completion
So you want to sort based on the total count of the all the records received through receiver? In that case, you have to combine all the counts using updateStateByKey ( https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala ) But stepping back, if you want to get the final results at the end of the receiving all the data (as opposed to continuously), why are you even using streaming? You could create a custom RDD that reads from ElasticSearch and then use it in a Spark program. I think that's more natural as your application is more batch-like than streaming-like as you are using the results in real-time. TD On Mon, Apr 6, 2015 at 12:31 PM, Hari Polisetty wrote: > I have created a Custom Receiver to fetch records pertaining to a specific > query from Elastic Search and have implemented Streaming RDD > transformations to process the data generated by the receiver. > > The final RDD is a sorted list of name value pairs and I want to read the > top 20 results programmatically rather than write to an external file. > I use "foreach" on the RDD and take the top 20 values into a list. I see > that forEach is processed every time there is a new microbatch from the > receiver. > > However, I want the foreach computation to be done only once when the > receiver has finished fetching all the records from Elastic Search and > before the streaming context is killed so that I can populate the results > into a list and process it in my driver program. > > Appreciate any guidance in this regard. > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
How to restrict foreach on a streaming RDD only once upon receiver completion
I have created a Custom Receiver to fetch records pertaining to a specific query from Elastic Search and have implemented Streaming RDD transformations to process the data generated by the receiver. The final RDD is a sorted list of name value pairs and I want to read the top 20 results programmatically rather than write to an external file. I use "foreach" on the RDD and take the top 20 values into a list. I see that forEach is processed every time there is a new microbatch from the receiver. However, I want the foreach computation to be done only once when the receiver has finished fetching all the records from Elastic Search and before the streaming context is killed so that I can populate the results into a list and process it in my driver program. Appreciate any guidance in this regard. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org