I am not aware of any open source examples. If you search for usages
of stream-lib or Algebird, you might be lucky. Twitter uses CMSs, so
they might have shared some code or presentation.

We created a proprietary prototype of the solution I described, but I
am not at liberty to share code.

We did not proceed to take it into production at scale, so there might
be practical issues that we did not discover. The algorithm
implementation was simple and straightforward. We used the stream-lib
CMS implementation. IIRC, the one in Algebird already includes support
for heavy hitters.

For our use case, the data structures ended up being small, on the
order ot tens or hundreds of megabytes. It varies with use case, but
it is probably a path worth investigating if approximate results are
acceptable.

Regards,


Lars Albertsson
Data engineering consultant
www.mapflat.com
+46 70 7687109


On Wed, Mar 23, 2016 at 3:41 AM, Jatin Kumar <jku...@rocketfuelinc.com> wrote:
> Lar, can you please point to an example?
>
> On Mar 23, 2016 2:16 AM, "Lars Albertsson" <la...@mapflat.com> wrote:
>>
>> @Jatin, I touched that case briefly in the linked presentation.
>>
>> You will have to decide on a time slot size, and then aggregate slots
>> to form windows. E.g. if you select a time slot of an hour, you build
>> a CMS and a heavy hitter list for the current hour slot, and start new
>> ones at 00 minutes. In order to form e.g. a 12 hour window, the
>> 12-hour CMS is calculated as the sum of the 12 hour slot CMSs, and the
>> 12-hour heavy hitters is the union of the hour slot heavy hitters.
>>
>> Since the data structures are small, one can afford using small time
>> slots. One can also keep a long history with different combinations of
>> time windows by pushing out CMSs and heavy hitters to e.g. Kafka, and
>> have different stream processors that aggregate different time windows
>> and push results to Kafka or to lookup tables.
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> +46 70 7687109
>>
>>
>> On Tue, Mar 22, 2016 at 1:23 PM, Jatin Kumar <jku...@rocketfuelinc.com>
>> wrote:
>> > Hello Yakubovich,
>> >
>> > I have been looking into a similar problem. @Lars please note that he
>> > wants
>> > to maintain the top N products over a sliding window, whereas the
>> > CountMinSketh algorithm is useful if we want to maintain global top N
>> > products list. Please correct me if I am wrong here.
>> >
>> > I tried using CountMinSketch and realized that it doesn't suit my use
>> > case
>> > as I also wanted to maintain top N over last H hours. CountMinSketch has
>> > no
>> > notion of time, so in my understanding you cannot use that.
>> >
>> > Yakubovich, you can try doing something like this:
>> >
>> > val stream = <DStream from your source>
>> > // I am assuming that each entry is a comma separated list of product
>> > ids
>> > // and product ID is a string (doesn't really matter though)
>> > stream
>> >  .flatMap(record => record.split(","))
>> >  .map(pid => (pid, 1L))
>> >  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(S1), Seconds(S2))
>> >  .foreachRDD(rdd => {
>> >    // `rdd` here is of type (pid, count) and has frequency of each PID
>> > over
>> >    // a sliding window of S1 seconds which moves by S2 seconds every
>> > time.
>> >
>> >    implicit val order = new scala.Ordering[(String, Long)] {
>> >      override def compare(a1: (String, Long), a2: (String, Long)):
>> > Boolean =
>> > a1._2 > a2._2
>> >    }
>> >
>> >    val topNPidTuples = rdd.top(N)
>> >    // do whatever you want here.
>> >  })
>> >
>> >
>> >
>> > --
>> > Thanks
>> > Jatin
>> >
>> > On Tue, Mar 22, 2016 at 12:11 PM, Rishi Mishra <rmis...@snappydata.io>
>> > wrote:
>> >>
>> >> Hi Alexy,
>> >> We are also trying to solve similar problems using approximation. Would
>> >> like to hear more about your usage.  We can discuss this offline
>> >> without
>> >> boring others.  :)
>> >>
>> >> Regards,
>> >> Rishitesh Mishra,
>> >> SnappyData . (http://www.snappydata.io/)
>> >>
>> >> https://in.linkedin.com/in/rishiteshmishra
>> >>
>> >> On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson <la...@mapflat.com>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> If you can accept approximate top N results, there is a neat solution
>> >>> for this problem: Use an approximate Map<K, V> structure called
>> >>> Count-Min Sketch, in combination with a list of the M top items, where
>> >>> M > N. When you encounter an item not in the top M, you look up its
>> >>> count in the Count-Min Sketch do determine whether it qualifies.
>> >>>
>> >>> You will need to break down your event stream into time windows with a
>> >>> certain time unit, e.g. minutes or hours, and keep one Count-Min
>> >>> Sketch for each unit. The CMSs can be added, so you aggregate them to
>> >>> form your sliding windows. You also keep a top M (aka "heavy hitters")
>> >>> list for each window.
>> >>>
>> >>> The data structures required are surprisingly small, and will likely
>> >>> fit in memory on a single machine, if it can handle the traffic
>> >>> volume, so you might not need Spark at all. If you choose to use Spark
>> >>> in order to benefit from windowing, be aware that Spark lumps events
>> >>> in micro batches based on processing time, not event time.
>> >>>
>> >>> I made a presentation on approximate counting a couple of years ago.
>> >>> Slides and video here:
>> >>>
>> >>>
>> >>> http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105.
>> >>> You can also search for presentation by Ted Dunning and Mikio Braun,
>> >>> who have held good presentations on the subject.
>> >>>
>> >>> There are AFAIK two open source implementations of Count-Min Sketch,
>> >>> one of them in Algebird.
>> >>>
>> >>> Let me know if anything is unclear.
>> >>>
>> >>> Good luck, and let us know how it goes.
>> >>>
>> >>> Regards,
>> >>>
>> >>>
>> >>>
>> >>> Lars Albertsson
>> >>> Data engineering consultant
>> >>> www.mapflat.com
>> >>> +46 70 7687109
>> >>>
>> >>>
>> >>> On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey
>> >>> <alexey.yakubov...@searshc.com> wrote:
>> >>> > Good day,
>> >>> >
>> >>> > I have a following task: a stream of “page vies” coming to kafka
>> >>> > topic.
>> >>> > Each
>> >>> > view contains list of product Ids from a visited page. The task: to
>> >>> > have in
>> >>> > “real time” Top N product.
>> >>> >
>> >>> > I am interested in some solution that would require minimum
>> >>> > intermediate
>> >>> > writes … So  need to build a sliding window for top N product, where
>> >>> > the
>> >>> > product counters dynamically changes and window should present the
>> >>> > TOP
>> >>> > product for the specified period of time.
>> >>> >
>> >>> > I believe there is no way to avoid maintaining all product counters
>> >>> > counters
>> >>> > in memory/storage.  But at least I would like to do all logic, all
>> >>> > calculation on a fly, in memory, not spilling multiple RDD from
>> >>> > memory
>> >>> > to
>> >>> > disk.
>> >>> >
>> >>> > So I believe I see one way of doing it:
>> >>> >    Take, msg from kafka take and line up, all elementary action
>> >>> > (increase by
>> >>> > 1 the counter for the product PID )
>> >>> >   Each action will be implemented as a call to HTable.increment()
>> >>> > //
>> >>> > or
>> >>> > easier, with incrementColumnValue()…
>> >>> >   After each increment I can apply my own operation “offer” would
>> >>> > provide
>> >>> > that only top N products with counters are kept in another Hbase
>> >>> > table
>> >>> > (also
>> >>> > with atomic operations).
>> >>> >  But there is another stream of events: decreasing product counters
>> >>> > when
>> >>> > view expires the legth of sliding window….
>> >>> >
>> >>> > So my question: does anybody know/have and can share the piece code/
>> >>> > know
>> >>> > how: how to implement “sliding Top N window” better.
>> >>> > If nothing will be offered, I will share what I will do myself.
>> >>> >
>> >>> > Thank you
>> >>> > Alexey
>> >>> > This message, including any attachments, is the property of Sears
>> >>> > Holdings
>> >>> > Corporation and/or one of its subsidiaries. It is confidential and
>> >>> > may
>> >>> > contain proprietary or legally privileged information. If you are
>> >>> > not
>> >>> > the
>> >>> > intended recipient, please delete it without reading the contents.
>> >>> > Thank
>> >>> > you.
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to