To clarify the confusion here, when you do dstream.count()  to generates a
DStream[Long] which contains RDD[Long] for each batch. Each of this RDD has
only one element in it, which is the count you are interested in. So the
following are equivalent.

dstream.foreachRDD { rdd =>  val count = rdd.count() }

AND

dstream.count().foreachRDD { rdd => val count = rdd.first() }



On Wed, Oct 1, 2014 at 11:30 AM, Sean Owen <so...@cloudera.com> wrote:

> Hm, I think this is the same thing though. The input is conceptually a
> stream of RDDs. You want the count of elements of each of the RDDs, so
> you get a stream of counts. In the foreachRDD example, you're just
> computing the count() of each RDD directly and printing it, so, you
> print a stream of counts. But you could also as well print the stream
> of counts that DStream.count() gives you. I suppose it's just two
> means to the same end, but DStream.count() sounds more direct. But if
> you really mean you want to do something else with each count like
> just print it, sure, foreachRDD could be fine too.
>
> On Wed, Oct 1, 2014 at 6:38 PM, Andy Davidson
> <a...@santacruzintegration.com> wrote:
> > Hi Sean
> >
> > I guess I am missing something.
> >
> > JavaDStream<String> foo = …
> > JavaDStream<Long> c = foo.count()
> >
> > This is circular. I need to get the count as an actual scalar value not a
> > JavaDStream. Some one else posted psudo code that used foreachRDD() .
> This
> > seems to work for me.
> >
> > Thanks
> >
> > Andy
> >
> >
> > From: Sean Owen <so...@cloudera.com>
> > Date: Wednesday, October 1, 2014 at 2:32 AM
> > To: Andrew Davidson <a...@santacruzintegration.com>
> > Cc: "user@spark.apache.org" <user@spark.apache.org>
> > Subject: Re: how to get actual count from as long from JavaDStream ?
> >
> > It's much easier than all this. Spark Streaming gives you a DStream of
> > RDDs. You want the count for each RDD. DStream.count() gives you
> > exactly that: a DStream of Longs which are the counts of events in
> > each mini batch.
> >
> > On Tue, Sep 30, 2014 at 8:42 PM, Andy Davidson
> > <a...@santacruzintegration.com> wrote:
> >
> > Hi
> >
> > I have a simple streaming app. All I want to do is figure out how many
> lines
> > I have received in the current mini batch. If numLines was a JavaRDD I
> could
> > simply call count(). How do you do something similar in Streaming?
> >
> >
> > Here is my psudo code
> >
> >
> >
> > JavaDStream<String> msg = logs.filter(selectINFO);
> >
> > JavaDStream<Long> numLines  = msg.count()
> >
> >
> > Long totalCount = numLines ???
> >
> >
> >
> > Here is what I am really trying to do. I have a python script that
> generated
> > a graph of totalCount vs time. Python does not support streaming. As a
> work
> > around I have a java program that does the steaming. I want to pass the
> data
> > back to the python script. It has been suggested I can use rdd.pipe().
> >
> >
> > In python I call rdd.pipe(scriptToStartJavaSteam.sh)
> >
> >
> > All I need to do is for each mini batch figure out how to get the the
> count
> > of the current mini batch and write it to standard out. Seems like this
> > should be simple.
> >
> >
> > Maybe Streams do not work the way I think? In a spark core app, I am
> able to
> > get values like count in my driver and do what ever I want with the local
> > value. With streams I know I am getting mini patches because print()
> display
> > the first 10 lines of my steam. I assume that some how print is executed
> in
> > my driver so somehow  data was sent from the workers back to the driver.
> >
> >
> > Any comments or suggestions would be greatly appreciated.
> >
> >
> > Andy
> >
> >
> > P.s. Should I be asking a different question?
> >
> >
> >
> >
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to