kisalay, Are you talking about storing all my data in a non-aggregate format, and just aggregating as needed? If so, do you have any idea what kind of performance I should expect when scanning over 15 million rows to summarize the specific cubes I need (using bitfields to estimate unique users on each region and merge them together later)?
Or are you suggesting something else? --Tom On Tue, Apr 10, 2012 at 11:59 PM, kisalay <[email protected]> wrote: > Tom, > > I was way too curious to resist a reply here. > If you want to store a bytearray estimating the unique count for a > particular OLAP cell, will you not see a lot of updates to the same > cell and create a hotspot ? > > I think another option comes to my mind. I assume that you get all the > user activities. Now consider for a moment that you store each of > there activities in you Table in HBase with the row-key being > udid-timestamp and value being some blob representing the activity > detail. Now if I have to do a unique count of user for a OLAP cell, I > would create endpoint coprocessors, that would execute per region, > scan it, and prepare a bitset representing the unique counts for the > OLAP cellof interest for that region. > > Now if you are implementing Probabilistic Counting, you can OR the > bitsets returned from each region to get the final bitset which will > give you the overall unique counts for all the regions together. > > You would not only save on network transfers, as you are doing the > counts per region in coprocessor and returning only one bitset per > region, you would also be able to resolve the query in the time taken > by one coprocessor to scan one region. > > I have a rudimentary implementation of Probabilistic Count which I > once used as a Bolt in Storm (Storm was developed at Backtype and > opensourced by Twitter) to count the unique users for a similar use > case. Let me know if you would to look at the implementation of the > algorithm. > > > > ~Kisalay > > On Wed, Apr 11, 2012 at 5:23 AM, Andrew Purtell <[email protected]> wrote: >>> Even my implementation of an atomic increment >>> (using a coprocessor) is two orders of magnitude slower than the >>> provided implementation. Are there properties inherent to >>> coprocessors or Incrementors that would force this kind of performance >>> difference? >> >> >> No. >> >> >> You may be seeing a performance difference if you are packing multiple >> Increments into one round trip but not doing a similar kind of batching if >> calling a custom endpoint. Each Endpoint invocation is a round trip unless >> you do something like: >> >> List<Row> actions = new ArrayList<Row>(); actions.add(new Exec(conf, >> row, protocol, method, ...)); >> >> actions.add(new Exec(conf, row, protocol, method, ...)); >> >> actions.add(new Exec(conf, row, protocol, method, ...)); >> >> Object[] results = table.batch(actions); >> ... >> >> >> I've not personally tried that particular API combination but don't see why >> it would not be possible. >> >> >> Beyond that, I'd suggest running a regionserver with your coprocessor >> installed under a profiler to see if you have monitor contention or a >> hotspot or similar. It could be something unexpected. >> >> >>> Can you think of an efficient way to implement an atomic bitfield >>> (other than adding it as a separate feature like atomic increments)? >> >> I think the idea of an atomic bitfield operation as part of the core API is >> intriguing. It has applicability to your estimator use case and I can think >> of a couple of things I could use it for. If there is more support for this >> idea, this may be something to consider. >> >> >> Best regards, >> >> >> - Andy >> >> Problems worthy of attack prove their worth by hitting back. - Piet Hein >> (via Tom White) >> >> >> >> ----- Original Message ----- >>> From: Tom Brown <[email protected]> >>> To: [email protected]; Andrew Purtell <[email protected]> >>> Cc: >>> Sent: Tuesday, April 10, 2012 3:53 PM >>> Subject: Re: Add client complexity or use a coprocessor? >>> >>> Andy, >>> >>> I have attempted to use coprocessors to achieve a passable performance >>> but have failed so far. Even my implementation of an atomic increment >>> (using a coprocessor) is two orders of magnitude slower than the >>> provided implementation. Are there properties inherent to >>> coprocessors or Incrementors that would force this kind of performance >>> difference? >>> >>> Can you think of an efficient way to implement an atomic bitfield >>> (other than adding it as a separate feature like atomic increments)? >>> >>> Thanks! >>> >>> --Tom >>> >>> On Tue, Apr 10, 2012 at 12:01 PM, Andrew Purtell <[email protected]> >>> wrote: >>>> Tom, >>>>> I am a big fan of the Increment class. Unfortunately, I'm not doing >>>>> simple increments for the viewer count. I will be receiving duplicate >>>>> messages from a particular client for a specific cube cell, and >>> don't >>>>> want them to be counted twice >>>> >>>> Gotcha. >>>> >>>>> I created an RPC endpoint coprocessor to perform this function but >>>>> performance suffered heavily under load (it appears that the endpoint >>>>> performs all functions in serial). >>>> >>>> Did you serialize access to your data structure(s)? >>>> >>>>> When I tried implementing it as a region observer, I was unsure of how >>>>> to correctly replace the provided "put" with my own. When I >>> issued a >>>>> put from within "prePut", the server blocked the new put >>> (waiting for >>>>> the "prePut" to finish). Should I be attempting to modify the >>> WALEdit >>>>> object? >>>> >>>> You can add KVs to the WALEdit. Or, you can get a reference to the >>> Put's familyMap: >>>> >>>> Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap(); >>>> >>>> and if you modify the map, you'll change what gets committed. >>>> >>>>> Is there a way to extend the functionality of "Increment" to >>> provide >>>>> arbitrary bitwise operations on a the contents of a field? >>>> >>>> As a matter of design, this should be a new operation. It does sound >>> interesting and useful, some sort of atomic bitfield. >>>> >>>> >>>> Best regards, >>>> >>>> - Andy >>>> >>>> Problems worthy of attack prove their worth by hitting back. - Piet Hein >>> (via Tom White) >>>> >>>> >>>> >>>> ----- Original Message ----- >>>>> From: Tom Brown <[email protected]> >>>>> To: [email protected] >>>>> Cc: >>>>> Sent: Monday, April 9, 2012 10:14 PM >>>>> Subject: Re: Add client complexity or use a coprocessor? >>>>> >>>>> Andy, >>>>> >>>>> I am a big fan of the Increment class. Unfortunately, I'm not doing >>>>> simple increments for the viewer count. I will be receiving duplicate >>>>> messages from a particular client for a specific cube cell, and >>> don't >>>>> want them to be counted twice (my stats don't have to be 100% >>>>> accurate, but the expected rate of duplicates will be higher than the >>>>> allowable error rate). >>>>> >>>>> I created an RPC endpoint coprocessor to perform this function but >>>>> performance suffered heavily under load (it appears that the endpoint >>>>> performs all functions in serial). >>>>> >>>>> When I tried implementing it as a region observer, I was unsure of how >>>>> to correctly replace the provided "put" with my own. When I >>> issued a >>>>> put from within "prePut", the server blocked the new put >>> (waiting for >>>>> the "prePut" to finish). Should I be attempting to modify the >>> WALEdit >>>>> object? >>>>> >>>>> Is there a way to extend the functionality of "Increment" to >>> provide >>>>> arbitrary bitwise operations on a the contents of a field? >>>>> >>>>> Thanks again! >>>>> >>>>> --Tom >>>>> >>>>>> If it helps, yes this is possible: >>>>>> >>>>>>> Can I observe updates to a >>>>>>> particular table and replace the provided data with my own? >>> (The >>>>>>> client calls "put" with the actual user ID, my >>> co-processor >>>>> replaces >>>>>>> it with a computed value, so the actual user ID never gets >>> stored in >>>>>>> HBase). >>>>>> >>>>>> Since your option #2 requires atomic updates to the data structure, >>> have you >>>>> considered native >>>>>> atomic increments? See >>>>>> >>>>>> >>> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/HTable.html#incrementColumnValue%28byte[],%20byte[],%20byte[],%20long,%20boolean%29 >>>>>> >>>>>> >>>>>> or >>>>>> >>>>>> >>> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Increment.html >>>>>> >>>>>> The former is a round trip for each value update. The latter allows >>> you to >>>>> pack multiple updates >>>>>> into a single round trip. This would give you accurate counts even >>> with >>>>> concurrent writers. >>>>>> >>>>>> It should be possible for you to do partial aggregation on the >>> client side >>>>> too whenever parallel >>>>>> requests colocate multiple updates to the same cube within some >>> small window >>>>> of time. >>>>>> >>>>>> Best regards, >>>>>> >>>>>> >>>>>> - Andy >>>>>> >>>>>> Problems worthy of attack prove their worth by hitting back. - Piet >>> Hein >>>>> (via Tom White) >>>>>> >>>>>> ----- Original Message ----- >>>>>>> From: Tom Brown <[email protected]> >>>>>>> To: [email protected] >>>>>>> Cc: >>>>>>> Sent: Monday, April 9, 2012 9:48 AM >>>>>>> Subject: Add client complexity or use a coprocessor? >>>>>>> >>>>>>> To whom it may concern, >>>>>>> >>>>>>> Ignoring the complexities of gathering the data, assume that I >>> will be >>>>>>> tracking millions of unique viewers. Updates from each of our >>> millions >>>>>>> of clients are gathered in a centralized platform and spread >>> among a >>>>>>> group of machines for processing and inserting into HBase >>> (assume that >>>>>>> this group can be scaled horizontally). The data is stored in >>> an OLAP >>>>>>> cube format and one of the metrics I'm tracking across >>> various >>>>>>> attributes is viewership (how many people from Y are watching >>> X). >>>>>>> >>>>>>> I'm writing this to ask for your thoughts as to the most >>>>> appropriate >>>>>>> way to structure my data so I can count unique TV viewers >>> (assume a >>>>>>> service like netflix or hulu). >>>>>>> >>>>>>> Here are the solutions I'm considering: >>>>>>> >>>>>>> 1. Store each unique user ID as the cell name within the >>> cube(s) it >>>>>>> occurs. This has the advantage of having 100% accuracy, but >>> the >>>>>>> downside is the enormous space required to store each unique >>> cell. >>>>>>> Consuming this data is also problematic as the only way to >>> provide a >>>>>>> viewership count is by counting each cell. To save the >>> overhead of >>>>>>> sending each cell over the network, counting them could be >>> done by a >>>>>>> coprocessor on the region server, but that still doesn't >>> avoid the >>>>>>> overhead of reading each cell from the disk. I'm also not >>> sure what >>>>>>> happens if a single row is larger than an entire region (48 >>> bytes per >>>>>>> user ID * 10,000,000 users = 480GB). >>>>>>> >>>>>>> 2. Store a byte array that allows estimating unique viewers >>> (with a >>>>>>> small margin of error*). Add a co-processor for updating this >>> column >>>>>>> so I can guarantee the updates to a specific OLAP cell will be >>> atomic. >>>>>>> The main benefit from this path is that there the nodes that >>> update >>>>>>> HBase can be less complex. Another benefit I see is that the I >>> can >>>>>>> just add more HBase regions as scale requires. However, >>> I'm not >>>>> sure >>>>>>> if I can use a coprocessor the way I want; Can I observe >>> updates to a >>>>>>> particular table and replace the provided data with my own? >>> (The >>>>>>> client calls "put" with the actual user ID, my >>> co-processor >>>>> replaces >>>>>>> it with a computed value, so the actual user ID never gets >>> stored in >>>>>>> HBase). >>>>>>> >>>>>>> 3. Store a byte array that allows estimating unique viewers >>> (with a >>>>>>> small margin of error*). Re-arrange my architecture so that >>> each OLAP >>>>>>> cell is only updated by a single node. The main benefit from >>> this >>>>>>> would be that I don't need to worry about atomic >>> operations in >>>>> HBase >>>>>>> since all updates for a single cell will be atomic and in >>> serial. The >>>>>>> biggest downside is that I believe it will add significant >>> complexity >>>>>>> to my overall architecture. >>>>>>> >>>>>>> >>>>>>> Thanks for your time, and I look forward to hearing your >>> thoughts. >>>>>>> >>>>>>> Sincerely, >>>>>>> Tom Brown >>>>>>> >>>>>>> *(For information about the byte array mentioned in #2 and #3, >>> see: >>>>>>> >>>>> >>> http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html) >>>>>>> >>>>> >>>
