Use DStream.foreachRDD to do an operation on the final RDD of every batch. val sumandcount = numbers.map(n => (n.toDouble, 1)).reduce{ (a, b) => (a._1 + b._1, a._2 + b._2) } sumandcount.foreachRDD { rdd => val first: (Double, Int) = rdd.take(1) ; ... }
DStream.reduce creates DStream whose RDDs have just one tuple each. The rdd.take(1) above gets that one tuple. However note that there is a corner case in this approach. If in a particular batch, there is not data, then the rdd will have zero elements (no data, nothing to reduce). So you have to take that into account (maybe do a rdd.collect(), check the size, and then get the first / only element). TD On Wed, May 7, 2014 at 7:59 AM, Laeeq Ahmed <laeeqsp...@yahoo.com> wrote: > Hi, > > I use the following code for calculating average. The problem is that the > reduce operation return a DStream here and not a tuple as it normally does > without Streaming. So how can we get the sum and the count from the > DStream. Can we cast it to tuple? > > val numbers = ssc.textFileStream(args(1)) > val sumandcount = numbers.map(n => (n.toDouble, 1)).reduce{ (a, b) => > (a._1 + b._1, a._2 + b._2) } > sumandcount.print() > > Regards, > Laeeq > >