Hi Tathagata,

Thanks for the solution. Actually, I will use the number of unique integers
in the batch instead of accumulative number of unique integers.

I do have two questions about your code:

1. Why do we need uniqueValuesRDD?  Why do we need to call
uniqueValuesRDD.checkpoint()?

2. Where is distinctValues defined?

Bill


On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Do you want to continuously maintain the set of unique integers seen since
> the beginning of stream?
>
> var uniqueValuesRDD: RDD[Int] = ...
>
> dstreamOfIntegers.transform(newDataRDD => {
>    val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
>    uniqueValuesRDD = newUniqueValuesRDD
>
>    // periodically call uniqueValuesRDD.checkpoint()
>
>    val uniqueCount = uniqueValuesRDD.count()
>    newDataRDD.map(x => x / count)
> })
>
>
>
>
>
> On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay <bill.jaypeter...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am working on a pipeline that needs to join two Spark streams. The
>> input is a stream of integers. And the output is the number of integer's
>> appearance divided by the total number of unique integers. Suppose the
>> input is:
>>
>> 1
>> 2
>> 3
>> 1
>> 2
>> 2
>>
>> There are 3 unique integers and 1 appears twice. Therefore, the output
>> for the integer 1 will be:
>> 1 0.67
>>
>> Since the input is from a stream, it seems we need to first join the
>> appearance of the integers and the total number of unique integers and then
>> do a calculation using map. I am thinking of adding a dummy key to both
>> streams and use join. However, a Cartesian product matches the application
>> here better. How to do this effectively? Thanks!
>>
>> Bill
>>
>
>

Reply via email to