Hello Yakubovich,
I have been looking into a similar problem. @Lars please note that he wants
to maintain the top N products over a sliding window, whereas the
CountMinSketh algorithm is useful if we want to maintain global top N
products list. Please correct me if I am wrong here.
I tried using CountMinSketch and realized that it doesn't suit my use case
as I also wanted to maintain top N over last H hours. CountMinSketch has no
notion of time, so in my understanding you cannot use that.
Yakubovich, you can try doing something like this:
val stream = <DStream from your source>
// I am assuming that each entry is a comma separated list of product ids
// and product ID is a string (doesn't really matter though)
stream
.flatMap(record => record.split(","))
.map(pid => (pid, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2))
.foreachRDD(rdd => {
// `rdd` here is of type (pid, count) and has frequency of each PID over
// a sliding window of S1 seconds which moves by S2 seconds every time.
implicit val order = new scala.Ordering[(String, Long)] {
override def compare(a1: (String, Long), a2: (String, Long)):
Boolean = a1._2 > a2._2
}
val topNPidTuples = rdd.top(N)
// do whatever you want here.
})
--
Thanks
Jatin
On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra <[email protected]>
wrote:
> Hi Alexy,
> We are also trying to solve similar problems using approximation. Would
> like to hear more about your usage. We can discuss this offline without
> boring others. :)
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson <[email protected]>
> wrote:
>
>> Hi,
>>
>> If you can accept approximate top N results, there is a neat solution
>> for this problem: Use an approximate Map<K, V> structure called
>> Count-Min Sketch, in combination with a list of the M top items, where
>> M > N. When you encounter an item not in the top M, you look up its
>> count in the Count-Min Sketch do determine whether it qualifies.
>>
>> You will need to break down your event stream into time windows with a
>> certain time unit, e.g. minutes or hours, and keep one Count-Min
>> Sketch for each unit. The CMSs can be added, so you aggregate them to
>> form your sliding windows. You also keep a top M (aka "heavy hitters")
>> list for each window.
>>
>> The data structures required are surprisingly small, and will likely
>> fit in memory on a single machine, if it can handle the traffic
>> volume, so you might not need Spark at all. If you choose to use Spark
>> in order to benefit from windowing, be aware that Spark lumps events
>> in micro batches based on processing time, not event time.
>>
>> I made a presentation on approximate counting a couple of years ago.
>> Slides and video here:
>>
>> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105
>> .
>> You can also search for presentation by Ted Dunning and Mikio Braun,
>> who have held good presentations on the subject.
>>
>> There are AFAIK two open source implementations of Count-Min Sketch,
>> one of them in Algebird.
>>
>> Let me know if anything is unclear.
>>
>> Good luck, and let us know how it goes.
>>
>> Regards,
>>
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> +46 70 7687109
>>
>>
>> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
>> <[email protected]> wrote:
>> > Good day,
>> >
>> > I have a following task: a stream of “page vies” coming to kafka topic.
>> Each
>> > view contains list of product Ids from a visited page. The task: to
>> have in
>> > “real time” Top N product.
>> >
>> > I am interested in some solution that would require minimum intermediate
>> > writes … So need to build a sliding window for top N product, where the
>> > product counters dynamically changes and window should present the TOP
>> > product for the specified period of time.
>> >
>> > I believe there is no way to avoid maintaining all product counters
>> counters
>> > in memory/storage. But at least I would like to do all logic, all
>> > calculation on a fly, in memory, not spilling multiple RDD from memory
>> to
>> > disk.
>> >
>> > So I believe I see one way of doing it:
>> > Take, msg from kafka take and line up, all elementary action
>> (increase by
>> > 1 the counter for the product PID )
>> > Each action will be implemented as a call to HTable.increment() // or
>> > easier, with incrementColumnValue()…
>> > After each increment I can apply my own operation “offer” would
>> provide
>> > that only top N products with counters are kept in another Hbase table
>> (also
>> > with atomic operations).
>> > But there is another stream of events: decreasing product counters when
>> > view expires the legth of sliding window….
>> >
>> > So my question: does anybody know/have and can share the piece code/
>> know
>> > how: how to implement “sliding Top N window” better.
>> > If nothing will be offered, I will share what I will do myself.
>> >
>> > Thank you
>> > Alexey
>> > This message, including any attachments, is the property of Sears
>> Holdings
>> > Corporation and/or one of its subsidiaries. It is confidential and may
>> > contain proprietary or legally privileged information. If you are not
>> the
>> > intended recipient, please delete it without reading the contents. Thank
>> > you.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>