Use DStream.foreachRDD to do an operation on the final RDD of every batch.

val sumandcount = numbers.map(n => (n.toDouble, 1)).reduce{ (a, b) => (a._1
+ b._1, a._2 + b._2) }
sumandcount.foreachRDD { rdd => val first: (Double, Int) = rdd.take(1)  ;
... }

DStream.reduce creates DStream whose RDDs have just one tuple each. The
rdd.take(1) above gets that one tuple.
However note that there is a corner case in this approach. If in a
particular batch, there is not data, then the rdd will have zero elements
(no data, nothing to reduce). So you have to take that into account (maybe
do a rdd.collect(), check the size, and then get the first / only element).

TD



On Wed, May 7, 2014 at 7:59 AM, Laeeq Ahmed <laeeqsp...@yahoo.com> wrote:

> Hi,
>
> I use the following code for calculating average. The problem is that the
> reduce operation return a DStream here and not a tuple as it normally does
> without Streaming. So how can we get the sum and the count from the
> DStream. Can we cast it to tuple?
>
> val numbers = ssc.textFileStream(args(1))
>     val sumandcount = numbers.map(n => (n.toDouble, 1)).reduce{ (a, b) =>
> (a._1 + b._1, a._2 + b._2) }
>     sumandcount.print()
>
> Regards,
> Laeeq
>
>

Reply via email to