Hi Tathagata, Thanks for the solution. Actually, I will use the number of unique integers in the batch instead of accumulative number of unique integers.
I do have two questions about your code: 1. Why do we need uniqueValuesRDD? Why do we need to call uniqueValuesRDD.checkpoint()? 2. Where is distinctValues defined? Bill On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Do you want to continuously maintain the set of unique integers seen since > the beginning of stream? > > var uniqueValuesRDD: RDD[Int] = ... > > dstreamOfIntegers.transform(newDataRDD => { > val newUniqueValuesRDD = newDataRDD.union(distinctValues).distinct > uniqueValuesRDD = newUniqueValuesRDD > > // periodically call uniqueValuesRDD.checkpoint() > > val uniqueCount = uniqueValuesRDD.count() > newDataRDD.map(x => x / count) > }) > > > > > > On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay <bill.jaypeter...@gmail.com> > wrote: > >> Hi all, >> >> I am working on a pipeline that needs to join two Spark streams. The >> input is a stream of integers. And the output is the number of integer's >> appearance divided by the total number of unique integers. Suppose the >> input is: >> >> 1 >> 2 >> 3 >> 1 >> 2 >> 2 >> >> There are 3 unique integers and 1 appears twice. Therefore, the output >> for the integer 1 will be: >> 1 0.67 >> >> Since the input is from a stream, it seems we need to first join the >> appearance of the integers and the total number of unique integers and then >> do a calculation using map. I am thinking of adding a dummy key to both >> streams and use join. However, a Cartesian product matches the application >> here better. How to do this effectively? Thanks! >> >> Bill >> > >