Hi Jon Thanks, foreachRDD seems to work. I am running on a 4 machine cluster. Its seems like Function executed by foreachRDD is running on my driver. I used logging to check. This is exactly what I want. I need to write my final results back to stdout so RDD.pipe() will work. I do not have any evidence that anything ever ran on any of the workers. I wonder things are working because I do not have a lot of data?
I need to do some more testing. Andy From: Jon Gregg <jonrgr...@gmail.com> Date: Tuesday, September 30, 2014 at 1:22 PM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user@spark.apache.org" <user@spark.apache.org> Subject: Re: how to get actual count from as long from JavaDStream ? > Hi Andy > > I'm new to Spark and have been working with Scala not Java but I see there's a > dstream() method to convert from JavaDStream to DStream. Then within DStream > <http://people.apache.org/~pwendell/spark-1.1.0-rc4-docs/api/java/org/apache/s > park/streaming/dstream/DStream.html> there is a foreachRDD() method that > allows you to do things like: > > msgConvertedToDStream.foreachRDD(rdd => println("The count is: " + > rdd.count().toInt)) > > The syntax for the casting should be changed for Java and probably the > function argument syntax is wrong too, but hopefully there's enough there to > help. > > Jon > > > On Tue, Sep 30, 2014 at 3:42 PM, Andy Davidson <a...@santacruzintegration.com> > wrote: >> Hi >> >> I have a simple streaming app. All I want to do is figure out how many lines >> I have received in the current mini batch. If numLines was a JavaRDD I could >> simply call count(). How do you do something similar in Streaming? >> >> >> >> Here is my psudo code >> >> >> >> JavaDStream<String> msg = logs.filter(selectINFO); >> >> JavaDStream<Long> numLines = msg.count() >> >> >> >> Long totalCount = numLines ??? >> >> >> >> >> >> Here is what I am really trying to do. I have a python script that generated >> a graph of totalCount vs time. Python does not support streaming. As a work >> around I have a java program that does the steaming. I want to pass the data >> back to the python script. It has been suggested I can use rdd.pipe(). >> >> >> >> In python I call rdd.pipe(scriptToStartJavaSteam.sh) >> >> >> >> All I need to do is for each mini batch figure out how to get the the count >> of the current mini batch and write it to standard out. Seems like this >> should be simple. >> >> >> >> Maybe Streams do not work the way I think? In a spark core app, I am able to >> get values like count in my driver and do what ever I want with the local >> value. With streams I know I am getting mini patches because print() display >> the first 10 lines of my steam. I assume that some how print is executed in >> my driver so somehow data was sent from the workers back to the driver. >> >> >> >> Any comments or suggestions would be greatly appreciated. >> >> >> >> Andy >> >> >> >> P.s. Should I be asking a different question? >> >> >> >> >> >> >> >> >