To clarify the confusion here, when you do dstream.count() to generates a DStream[Long] which contains RDD[Long] for each batch. Each of this RDD has only one element in it, which is the count you are interested in. So the following are equivalent.
dstream.foreachRDD { rdd => val count = rdd.count() } AND dstream.count().foreachRDD { rdd => val count = rdd.first() } On Wed, Oct 1, 2014 at 11:30 AM, Sean Owen <so...@cloudera.com> wrote: > Hm, I think this is the same thing though. The input is conceptually a > stream of RDDs. You want the count of elements of each of the RDDs, so > you get a stream of counts. In the foreachRDD example, you're just > computing the count() of each RDD directly and printing it, so, you > print a stream of counts. But you could also as well print the stream > of counts that DStream.count() gives you. I suppose it's just two > means to the same end, but DStream.count() sounds more direct. But if > you really mean you want to do something else with each count like > just print it, sure, foreachRDD could be fine too. > > On Wed, Oct 1, 2014 at 6:38 PM, Andy Davidson > <a...@santacruzintegration.com> wrote: > > Hi Sean > > > > I guess I am missing something. > > > > JavaDStream<String> foo = … > > JavaDStream<Long> c = foo.count() > > > > This is circular. I need to get the count as an actual scalar value not a > > JavaDStream. Some one else posted psudo code that used foreachRDD() . > This > > seems to work for me. > > > > Thanks > > > > Andy > > > > > > From: Sean Owen <so...@cloudera.com> > > Date: Wednesday, October 1, 2014 at 2:32 AM > > To: Andrew Davidson <a...@santacruzintegration.com> > > Cc: "user@spark.apache.org" <user@spark.apache.org> > > Subject: Re: how to get actual count from as long from JavaDStream ? > > > > It's much easier than all this. Spark Streaming gives you a DStream of > > RDDs. You want the count for each RDD. DStream.count() gives you > > exactly that: a DStream of Longs which are the counts of events in > > each mini batch. > > > > On Tue, Sep 30, 2014 at 8:42 PM, Andy Davidson > > <a...@santacruzintegration.com> wrote: > > > > Hi > > > > I have a simple streaming app. All I want to do is figure out how many > lines > > I have received in the current mini batch. If numLines was a JavaRDD I > could > > simply call count(). How do you do something similar in Streaming? > > > > > > Here is my psudo code > > > > > > > > JavaDStream<String> msg = logs.filter(selectINFO); > > > > JavaDStream<Long> numLines = msg.count() > > > > > > Long totalCount = numLines ??? > > > > > > > > Here is what I am really trying to do. I have a python script that > generated > > a graph of totalCount vs time. Python does not support streaming. As a > work > > around I have a java program that does the steaming. I want to pass the > data > > back to the python script. It has been suggested I can use rdd.pipe(). > > > > > > In python I call rdd.pipe(scriptToStartJavaSteam.sh) > > > > > > All I need to do is for each mini batch figure out how to get the the > count > > of the current mini batch and write it to standard out. Seems like this > > should be simple. > > > > > > Maybe Streams do not work the way I think? In a spark core app, I am > able to > > get values like count in my driver and do what ever I want with the local > > value. With streams I know I am getting mini patches because print() > display > > the first 10 lines of my steam. I assume that some how print is executed > in > > my driver so somehow data was sent from the workers back to the driver. > > > > > > Any comments or suggestions would be greatly appreciated. > > > > > > Andy > > > > > > P.s. Should I be asking a different question? > > > > > > > > > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >