I can see room for improvement w.r.t. ColumnInterpreters I logged two JIRAs: https://issues.apache.org/jira/browse/HBASE-5122 is for loading ColumnInterpreters dynamically
https://issues.apache.org/jira/browse/HBASE-5123 is for adding more aggregation functions. Royston: Feel free to elaborate on 5213 and explain what Mult aggregate should do. Cheers On Wed, Jan 4, 2012 at 3:43 AM, Royston Sellman < [email protected]> wrote: > Ted, Himanshu and Gary, > > It works now! I re-created my HBase table to contain Bytes.toBytes(Long) > values and that fixed it. > > For the time being we can convert everything to Longs and work with that, > but we will probably write our own ColumnInterpreters soon for our data > types, so thanks for the pointer to HBASE-4946. There are also Functions we > need (e.g. Median, Weighted Median, Mult) which might best be placed in the > Aggregations Protocol. We'll be sure to discuss this with you when we start. > > Meanwhile, thanks again for all your help! > > Royston > > > On 3 Jan 2012, at 18:58, Ted Yu wrote: > > > I like long messages :-) because they provide more clues. > > > > For part 1, you don't have to call Bytes.toxxx as long as the interpreter > > uses method consistent with the way you write values into HBase tables. > > > > For part 2, HBASE-4946 is related. > > Basically you need to place the jar containing your coprocessor and > > interpreter code on hdfs so that you can load it into your HBase table. > > Look at this for details: > > https://issues.apache.org/jira/browse/HBASE-4554 > > > > Cheers > > > > On Tue, Jan 3, 2012 at 10:42 AM, Royston Sellman < > > [email protected]> wrote: > > > >> Hi Ted, > >> > >> PART 1 > >> ===== > >> Thanks for the hint. I think maybe you have given me some inspiration! > >> > >> It looks like getValue will return null if the table value is not the > >> length > >> of a long. When we created our table (batch loading CSVs using the > >> SampleUploader example) we simply have this as our put(): > >> put.add(family, Bytes.toBytes("advanceKWh"), advanceKWh); > >> [note we do no Bytes.toxxx casts to the advanceKWh value. The values > look > >> OK > >> from HBase shell though :-)] > >> > >> but I looked at TestAggregateProtocol.java again and I see there puts > like: > >> p2.add(TEST_FAMILY, Bytes.add(TEST_MULTI_CQ, Bytes.toBytes(l)), > >> Bytes.toBytes(l * 10)); > >> > >> So my hypothesis is that we need to do something like: > >> Long l = new Long(1); > >> put.add(family, Bytes.toBytes("advanceKWh"), Bytes.toBytes(l * > >> advanceKWh)); > >> when we create the table. > >> > >> Do you think my hypothesis is correct? Did we build our table > incorrectly > >> for reading longs from it? > >> > >> PART 2 > >> ===== > >> Anyway we will obviously need to make our own interpreters. but we > failed > >> at > >> this task so far: > >> In order to implement our own ColumnInterpretter, we first attempted > simply > >> extending the LongColumnInterpreter and passing that as a parameter to > >> aClient.sum(). > >> import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; > >> > >> public class LCI extends LongColumnInterpreter { > >> public Long add(Long a, Long b) { > >> System.out.print("LCI.add("+a+","+b+")"); > >> Long c = super.add(a,b); > >> System.out.println(" -> "+c); > >> return c; > >> } > >> }; > >> .... > >> final ColumnInterpreter<Long, Long> ci = new LCI(); > >> long sum=-1; > >> try { > >> sum = aClient.sum(EDRP_TABLE, ci, scan); > >> ... > >> However, we received class not found errors in our regionserver log > when we > >> ran this code > >> Clearly we are missing something > >> We've started looking at modifying the HBase source and rebuilding that > >> But I think this is not the way we should be working > >> It seems that we should be implementing the ColumnInterpreter interface > and > >> passing an instance of that implementation class too as a parameter. Is > >> this correct? Could you provide us with an example? Any ideas why we > >> might > >> be getting class not found exception from the regionserver? > >> > >> Sorry for the long message! > >> > >> Royston (and Tom) > >> > >> > >> -----Original Message----- > >> From: Ted Yu [mailto:[email protected]] > >> Sent: 03 January 2012 18:00 > >> To: [email protected] > >> Subject: Re: AggregateProtocol Help > >> > >> My previous email might not be hitting the root cause. > >> I think the following method in LCI may be giving you the null: > >> > >> public Long getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv) > >> throws IOException { > >> if (kv == null || kv.getValueLength() != Bytes.SIZEOF_LONG) > >> return null; > >> return Bytes.toLong(kv.getBuffer(), kv.getValueOffset()); > >> > >> Look at the if statement above carefully. > >> If it doesn't match how you store values in HBase, feel free to subclass > >> LongColumnInterpreter and provide the correct interpretation. > >> > >> BTW you don't need to restart cluster just because you need to use your > own > >> interpreter :-) > >> > >> On Tue, Jan 3, 2012 at 9:48 AM, Royston Sellman < > >> [email protected]> wrote: > >> > >>> Hi Ted, > >>> > >>> Here is the output. As you can see aClient is not nul: > >>> > >>> AggregationClient aClient = new AggregationClient(conf); > >>> System.err.println("aClient: "+aClient); > >>> > >>> <<< aClient: > >>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient@28787c16 > >>> > >>> It will take us a little while to add log code to LCI... we have to > >>> edit the source, rebuild 0.92, redistribute round our cluster, restart > >>> ;) We'll get back to you when this is done. > >>> > >>> Royston > >>> > >>> -----Original Message----- > >>> From: Ted Yu [mailto:[email protected]] > >>> Sent: 03 January 2012 17:10 > >>> To: [email protected] > >>> Subject: Re: AggregateProtocol Help > >>> > >>> Royston: > >>> Thanks for your effort trying to hunt down the problem. > >>> > >>> Can you add a log after this line to see if aClient is null ? > >>> AggregationClient aClient = new AggregationClient(conf); > >>> > >>> I was looking at LongColumnInterpreter.add() which is called by > >>> aClient.sum() > >>> Can you add a few log statements in LongColumnInterpreter.add() to see > >>> what parameters are passed to it ? > >>> > >>> Cheers > >>> > >>> On Tue, Jan 3, 2012 at 8:32 AM, Royston Sellman < > >>> [email protected]> wrote: > >>> > >>>> Hi Ted, Himanshu, Gary, > >>>> > >>>> Thanks again for your attention. I experimented with a shorter table > >>>> and it looks like the timeout error was spurious... > >>>> > >>>> With the shorter table I now get an NPE when I call > >>>> AggregationClient.sum(). > >>>> Here's the code snippet: > >>>> > >>>> // Test the table > >>>> HTable table = new HTable(EDRP_TABLE); > >>>> Get get = new Get(Bytes.toBytes("row-aa")); > >>>> get.addColumn(Bytes.toBytes("EDRP"), > >>>> Bytes.toBytes("advanceKWh")); > >>>> Result result = table.get(get); > >>>> byte [] val = result.getValue(Bytes.toBytes("EDRP"), > >>>> Bytes.toBytes("advanceKWh")); > >>>> System.out.println("Row aa = " + > >>>> Bytes.toString(val)); > >>>> > >>>> AggregationClient aClient = new AggregationClient(conf); > >>>> Scan scan = new Scan(); > >>>> scan.addColumn(EDRP_FAMILY, EDRP_QUALIFIER); > >>>> scan.setStartRow(Bytes.toBytes("row-ab")); > >>>> scan.setStopRow(Bytes.toBytes("row-az")); > >>>> System.out.println(Bytes.toString(EDRP_FAMILY) + ":" > >>>> + Bytes.toString(EDRP_QUALIFIER)); > >>>> final ColumnInterpreter<Long, Long> ci = new > >>>> LongColumnInterpreter(); > >>>> long sum=-1; > >>>> try { > >>>> sum = aClient.sum(EDRP_TABLE, ci, scan); > >>>> } catch (Throwable e) { > >>>> // TODO Auto-generated catch block > >>>> e.printStackTrace(); > >>>> } > >>>> System.out.println(sum); > >>>> > >>>> The first part is just to check that my table is OK. It prints the > >>>> correct value for row aa. Then I check CF:CQ is correct. Then I get > >>>> the -1 that sum was defined as. Then the NPE. Here is the run output: > >>>> > >>>> Row aa = 3.0 > >>>> EDRP:advanceKWh > >>>> -1 > >>>> java.lang.NullPointerException > >>>> at EDRPAggregator.testSumWithValidRange(EDRPAggregator.java:66) > >>>> at EDRPAggregator.main(EDRPAggregator.java:96) > >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > >>>> > >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl > >>>> .j > >>>> ava:39 > >>>> ) > >>>> at > >>>> > >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcce > >>>> ss > >>>> orImpl > >>>> .java:25) > >>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>> at org.apache.hadoop.util.RunJar.main(RunJar.java:156) > >>>> > >>>> Line 66 is the line: > >>>> sum = aClient.sum(EDRP_TABLE, ci, scan); > >>>> > >>>> But I'm not sure that is really the line generating the NPE because > >>>> I get the -1 output (same result when line 66 is not wrapped in > >>>> try-catch > >>> BTW). > >>>> Anyway, -1 is clearly not the correct value for the sum. > >>>> > >>>> [Note: I'm setting a start and end row on the scan because I saw a > >>>> comment in AggregationClient.java saying they must be set. However, > >>>> I think this comment is obsolete as the test code in > >>>> TestAggregateProtocol.java sometimes does not include start & end > >>>> rows. The run is exactly the same without the start & stop rows ] > >>>> > >>>> We are struggling with Aggregations not least because > >>>> TestAggregateProtocol.java is the only source of example code we > >>>> could find and it is not ideal because it is designed to work with > >>>> MiniCluster rather than a real cluster like ours. Is there any other > >>>> example code in existence? > >>>> I think that would really help us. > >>>> > >>>> Meanwhile, thanks for your patience with our questions. We are > >>>> really impressed with the AggregationProtocol > >>>> architecture/functionality and very keen to use it. > >>>> > >>>> Royston (and Tom) > >>>> > >>>> -----Original Message----- > >>>> From: Gary Helmling [mailto:[email protected]] > >>>> Sent: 02 January 2012 06:23 > >>>> To: [email protected] > >>>> Subject: Re: AggregateProtocol Help > >>>> > >>>> Hi Royston, > >>>> > >>>> Try increasing the value set for hbase.rpc.timeout (by default 60 > >>> seconds). > >>>> Add something like this to hbase-site.xml: > >>>> > >>>> <property> > >>>> <name>hbase.rpc.timeout</name> > >>>> <value>180000</value> > >>>> </property> > >>>> > >>>> This would increase the timeout value to 3 minutes, for example. > >>>> > >>>> But as Ted and Himanshu mention, 557k rows is not a large table and > >>>> should not be taking very long to process, unless these happen to be > >>>> very, very wide rows. It sounds like there could be something else > >>>> going on. Does debug level logging show anything else in the region > >>> server log? > >>>> > >>>> --gh > >>>> > >>>> On Sun, Jan 1, 2012 at 5:53 PM, Ted Yu <[email protected]> wrote: > >>>>> Thanks for the reminder Himanshu. > >>>>> > >>>>> Royston: > >>>>> From this blog you can get some history on this subject: > >>>>> http://zhihongyu.blogspot.com/2011/03/genericizing-endpointcoproce > >>>>> ss > >>>>> or > >>>>> .html > >>>>> > >>>>> > >>>>> On Sun, Jan 1, 2012 at 5:18 PM, Himanshu Vashishtha > >>>>> <[email protected] > >>>>>> wrote: > >>>>> > >>>>>> Hello Royston, > >>>>>> > >>>>>> Sorry to hear that you are getting trouble while using > >>>>>> Aggregation functionalities. > >>>>>> > >>>>>> 557k rows seems to be a small table and a SocketTimeout does not > >>>>>> seem to be an ok response. > >>>>>> It will be good to know the region distribution as such. (how > >>>>>> many > >>>> regions? > >>>>>> Is it a full table scan?) > >>>>>> > >>>>>> You are using the sum function; how are you using the > >>> ColumnInterpreter. > >>>>>> Can you enable the log level to debug to see why the RS is taking > >>>>>> that long to respond (more than 113 sec). > >>>>>> The 0 return value is the default result. > >>>>>> > >>>>>> Thanks for trying this out. > >>>>>> > >>>>>> Thanks, > >>>>>> Himanshu > >>>>>> > >>>>>> On Sun, Jan 1, 2012 at 12:26 PM, Royston Sellman < > >>>>>> [email protected]> wrote: > >>>>>> > >>>>>>> Hi Ted, > >>>>>>> > >>>>>>> I think 0 is the only value we ever see (I'll check tomorrow: > >>>>>>> the server is down right now). Our table has 557,000 rows. I'll > >>>>>>> try a much shorter table tomorrow. > >>>>>>> > >>>>>>> Yes, we have RS running on the NN, but it's a test cluster and > >>>>>>> we are > >>>>>> used > >>>>>>> to it :) > >>>>>>> > >>>>>>> Do you think using AggregationProtocol is the best strategy for > >>>>>>> the case where we want to use basic SQL-style functions like > >>>>>>> SUM, AVG, STD, MIN, MAX? Do you think there is a better strategy? > >>>>>>> > >>>>>>> Many thanks, > >>>>>>> Royston > >>>>>>> > >>>>>>> > >>>>>>> On 1 Jan 2012, at 17:58, Ted Yu wrote: > >>>>>>> > >>>>>>>> Royston: > >>>>>>>> Happy New Year to you too. > >>>>>>>> > >>>>>>>>>> java.net.SocketTimeoutException: Call to > >>>>>>>>>> namenode/10.0.0.235:60020 > >>>>>> failed > >>>>>>> on > >>>>>>>> > >>>>>>>> It seems the namenode above actually refers to a region server. > >>>>>>>> This > >>>>>> is a > >>>>>>>> little bit confusing :-) > >>>>>>>> > >>>>>>>> The sum value below is 0. > >>>>>>>> Have you ever seen a value greater than 0 ? > >>>>>>>> > >>>>>>>> How many rows are there in this CF:CQ ? > >>>>>>>> The timeout was reported earlier by other people where > >>>>>>>> there're many > >>>>>> rows > >>>>>>>> in the table. > >>>>>>>> > >>>>>>>> There is a JIRA to provide streaming support for coprocessor > >>>>>>>> but the development there has stalled. > >>>>>>>> > >>>>>>>> Cheers > >>>>>>>> > >>>>>>>> On Sun, Jan 1, 2012 at 9:35 AM, Royston Sellman < > >>>>>>>> [email protected]> wrote: > >>>>>>>> > >>>>>>>>> Hi Gary and Ted, > >>>>>>>>> > >>>>>>>>> Royston (Tom's colleague) here. Back onto this after the > >>>>>>>>> Christmas/New > >>>>>>> Year > >>>>>>>>> break. > >>>>>>>>> > >>>>>>>>> Many thanks for your help so far. We enabled our database > >>>>>>>>> via your hbase-site.xml mod and were able to move on. to > >>>>>>>>> other errors. But I > >>>>>>> think > >>>>>>>>> we > >>>>>>>>> are now actually getting an aggregation partially calculated > >>>>>>>>> on our > >>>>>>> table > >>>>>>>>> (this feels like progress). The details: > >>>>>>>>> > >>>>>>>>> On running our client we now get this exception: > >>>>>>>>> 11/12/31 17:51:09 WARN > >>>>>>>>> client.HConnectionManager$HConnectionImplementation: Error > >>>>>>>>> executing > >>>>>> for > >>>>>>>>> row > >>>>>>>>> > >>>>>>>>> java.util.concurrent.ExecutionException: > >>>>>>>>> org.apache.hadoop.hbase.client.RetriesExhaustedException: > >>>>>>>>> Failed after attempts=10, exceptions: > >>>>>>>>> Sat Dec 31 17:41:30 GMT 2011, > >>>>>>>>> org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1@1fc4f0f8, > >>>>>>>>> java.net.SocketTimeoutException: Call to > >>>>>>>>> namenode/10.0.0.235:60020 > >>>>>> failed > >>>>>>>>> on > >>>>>>>>> socket timeout exception: java.net.SocketTimeoutException: > >>>>>>>>> 60000 > >>>>>> millis > >>>>>>>>> timeout while waiting for channel to be ready for read. ch : > >>>>>>>>> java.nio.channels.SocketChannel[connected > >>>>>>>>> local=/10.0.0.235:59999 remote=namenode/10.0.0.235:60020] > >>>>>>>>> (8 more of these, making for 10 tries) Sat Dec 31 17:51:09 > >>>>>>>>> GMT 2011, > >>>>>>>>> org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1@1fc4f0f8, > >>>>>>>>> java.net.SocketTimeoutException: Call to > >>>>>>>>> namenode/10.0.0.235:60020 > >>>>>> failed > >>>>>>>>> on > >>>>>>>>> socket timeout exception: java.net.SocketTimeoutException: > >>>>>>>>> 60000 > >>>>>> millis > >>>>>>>>> timeout while waiting for channel to be ready for read. ch : > >>>>>>>>> java.nio.channels.SocketChannel[connected > >>>>>>>>> local=/10.0.0.235:59364 remote=namenode/10.0.0.235:60020] > >>>>>>>>> > >>>>>>>>> at > >>>>>>>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java: > >>>>>>>>> 22 > >>>>>>>>> 2) > >>>>>>>>> at > >> java.util.concurrent.FutureTask.get(FutureTask.java:83) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>>> > >>> > >>> > >> > >> > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation. > >>>>>>>>> processExecs(HConnectionManager.java:1465) > >>>>>>>>> at > >>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.HTable.coprocessorExec(HTable.java > >>>>>> :1 > >>>>>> 55 > >>>>>> 5) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient.sum( > >>>>>> Ag > >>>>>> gr > >>>>>> egation > >>>>>>>>> Client.java:229) > >>>>>>>>> at > >>>>>>>>> EDRPAggregator.testSumWithValidRange(EDRPAggregator.java:51) > >>>>>>>>> at EDRPAggregator.main(EDRPAggregator.java:77) > >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>>>>>>> Method) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>>>> java:39 > >>>>>>>>> ) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodA > >>>>>> cc > >>>>>> es > >>>>>> sorImpl > >>>>>>>>> .java:25) > >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>>>>>> at org.apache.hadoop.util.RunJar.main(RunJar.java:156) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Looking at the log (.regionserver-namenode.log) I see this > >>>>>>>>> debug > >>>>>>> message: > >>>>>>>>> > >>>>>>>>> 2011-12-31 17:42:23,472 DEBUG > >>>>>>>>> org.apache.hadoop.hbase.coprocessor.AggregateImplementation: > >>>>>>>>> Sum from > >>>>>>> this > >>>>>>>>> region is > >>>>>> EDRPTestTbl,,1324485124322.7b9ee0d113db9b24ea9fdde90702d006.: > >>>>>>> 0 > >>>>>>>>> > >>>>>>>>> Where the sum value looks reasonable which makes me think > >>>>>>>>> the sum of a CF:CQ worked. But I never see this value on > >> stdout. > >>>>>>>>> Then I see this warning: > >>>>>>>>> > >>>>>>>>> 2011-12-31 17:42:23,476 WARN org.apache.hadoop.ipc.HBaseServer: > >>>>>>>>> (responseTooSlow): > >>>>>>>>> {"processingtimems":113146,"call":"execCoprocess$ > >>>>>>>>> 2011-12-31 17:42:23,511 WARN org.apache.hadoop.ipc.HBaseServer: > >>>>>>>>> IPC > >>>>>>> Server > >>>>>>>>> Responder, call execCoprocessor([B@4b22fad6, getSum(org.$ > >>>>>>>>> 2011-12-31 17:42:23,515 WARN org.apache.hadoop.ipc.HBaseServer: > >>>>>>>>> IPC > >>>>>>> Server > >>>>>>>>> handler 1 on 60020 caught: java.nio.channels.ClosedChann$ > >>>>>>>>> at > >>>>>>>>> > >>>>>> sun.nio.ch.SocketChannelImpl.ensureWriteOpen(SocketChannelImpl.ja > >>>>>> va > >>>>>> :1 > >>>>>> 33) > >>>>>>>>> at > >>>>>> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:324) > >>>>>>>>> at > >>>>>>>>> > >>>>>>> > >>>>>> org.apache.hadoop.hbase.ipc.HBaseServer.channelWrite(HBaseServer. > >>>>>> ja > >>>>>> va > >>>>>> :1651) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> org.apache.hadoop.hbase.ipc.HBaseServer$Responder.processResponse > >>>>>> (H > >>>>>> Ba > >>>>>> seServe > >>>>>>>>> r.java:924) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> org.apache.hadoop.hbase.ipc.HBaseServer$Responder.doRespond(HBase > >>>>>> Se > >>>>>> rv > >>>>>> er.java > >>>>>>>>> :1003) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> org.apache.hadoop.hbase.ipc.HBaseServer$Call.sendResponseIfReady( > >>>>>> HB > >>>>>> as > >>>>>> eServer > >>>>>>>>> .java:409) > >>>>>>>>> at > >>>>>>>>> > >>>>>>> > >>>>>> > >> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java: > >>>>>> 1345) > >>>>>>>>> > >>>>>>>>> Have we missed out some step in the HBase/RegionServerconfig? > >>>>>>>>> Or is > >>>>>> our > >>>>>>>>> client code still deficient? > >>>>>>>>> > >>>>>>>>> Can you offer any suggestions? Is there any example code for > >>>>>>>>> the new Aggregations stuff. > >>>>>>>>> > >>>>>>>>> Thanks and Happy New Year to you guys, > >>>>>>>>> > >>>>>>>>> Royston (and Tom). > >>>>>>>>> > >>>>>>>>> (HBase 0.92, Hadoop 1.0) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -----Original Message----- > >>>>>>>>> From: Gary Helmling [mailto:[email protected]] > >>>>>>>>> Sent: 23 December 2011 18:06 > >>>>>>>>> To: [email protected] > >>>>>>>>> Subject: Re: AggregateProtocol Help > >>>>>>>>> > >>>>>>>>> Hi Tom, > >>>>>>>>> > >>>>>>>>> The test code is not really the best guide for configuration. > >>>>>>>>> > >>>>>>>>> To enable the AggregateProtocol on all of your tables, add > >>>>>>>>> this to the hbase-site.xml for the servers in your cluster: > >>>>>>>>> > >>>>>>>>> <property> > >>>>>>>>> <name>hbase.coprocessor.user.region.classes</name> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementatio > >>>>>> n< > >>>>>> /v > >>>>>> alue> > >>>>>>>>> </property> > >>>>>>>>> > >>>>>>>>> If you only want to use the aggregate functions on a > >>>>>>>>> specific table > >>>>>> (or > >>>>>>>>> tables), then you can enable that individually for the table > >>>>>>>>> from the > >>>>>>>>> shell: > >>>>>>>>> > >>>>>>>>> 1) disable the table > >>>>>>>>> hbase> disable 'EDRP7' > >>>>>>>>> > >>>>>>>>> 2) add the coprocessor > >>>>>>>>> hbase> alter 'EDRP7', METHOD => 'table_att', > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> 'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImp > >>>>>> le > >>>>>> me > >>>>>> ntation > >>>>>>>>> ||' > >>>>>>>>> > >>>>>>>>> (Note that the pipes in the value string are required) > >>>>>>>>> > >>>>>>>>> 3) re-enable the table > >>>>>>>>> hbase> enable 'EDRP7' > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Either way should work. With the second approach you will > >>>>>>>>> see the coprocessor listed when you describe the table from > >>>>>>>>> the shell, as Ted mentioned. With the first approach you > >>>>>>>>> will not, but it should be > >>>>>>> loaded > >>>>>>>>> all the same. > >>>>>>>>> > >>>>>>>>> --gh > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Fri, Dec 23, 2011 at 7:04 AM, Ted Yu > >>>>>>>>> <[email protected]> > >>>> wrote: > >>>>>>>>>> I don't know why you chose HBaseTestingUtility to create > >>>>>>>>>> the > >>>> table. > >>>>>>>>>> I guess you followed test code example. > >>>>>>>>>> > >>>>>>>>>> At least you should pass the conf to this ctor: > >>>>>>>>>> public HBaseTestingUtility(Configuration conf) { > >>>>>>>>>> > >>>>>>>>>> If coprocessor was installed correctly, you should see > >>>>>>>>>> something like(from > >>>>>>>>>> HBASE-5070): > >>>>>>>>>> coprocessor$1 => > >>>>>>>>>> > >>>> '|org.apache.hadoop.hbase.constraint.ConstraintProcessor|1073741823|' > >>>>>>>>>> > >>>>>>>>>> Cheers > >>>>>>>>>> > >>>>>>>>>> On Fri, Dec 23, 2011 at 3:02 AM, Tom Wilcox > >>>>>>>>>> <[email protected]> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> I am not sure how we load the AggregateImplementation into > >>>>>>>>>>> the > >>>>>> table. > >>>>>>>>>>> When we are creating a table, we use the same functions as > >>>>>>>>>>> the test > >>>>>> as > >>>>>>>>> follows... > >>>>>>>>>>> > >>>>>>>>>>> ... > >>>>>>>>>>>> > >>>>>>>>>>>> conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, > >>>>>>>>>>>> > >>>>>>>>>>>> > >> "org.apache.hadoop.hbase.coprocessor.AggregateImplementation" > >>>>>>>>>>>> ); > >>>>>>>>>>>> > >>>>>>>>>>>> // Utility.CreateHBaseTable(conf, > >>>>>>>>>>>> otherArgs[1], > >>>>>>>>>>> otherArgs[2], > >>>>>>>>>>>> true); > >>>>>>>>>>>> > >>>>>>>>>>>> HBaseTestingUtility util = new > >>>>>>>>>>>> HBaseTestingUtility(); > >>>>>>>>>>>> HTable table = util.createTable(EDRP_TABLE, > >>>>>>>>>>>> EDRP_FAMILY); > >>>>>>>>>>>> > >>>>>>>>>>>> AggregationClient aClient = new > >>>>>>>>>>>> AggregationClient(conf); > >>>>>>>>>>> ... > >>>>>>>>>>> > >>>>>>>>>>> Running DESCRIBE on a table produced shows the following > >>> output: > >>>>>>>>>>> > >>>>>>>>>>> hbase(main):002:0> describe 'EDRP7' > >>>>>>>>>>> DESCRIPTION > >>>>>>>>>>> ENABLED {NAME => > >>>>>>>>>>> 'EDRP7', FAMILIES => [{NAME => 'advanceKWh', BLOOMFILTER > >>>>>>>>>>> => 'NONE', REPLICATION_SCOPE => '0', VERSIONS => true > >>>>>>>>>>> '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => > >>>>>>>>>>> '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', > >>>>>>>>>>> B LOCKCACHE => 'true'}]} > >>>>>>>>>>> > >>>>>>>>>>> We are using the tip of 0.92 (cloned from the Git repo). > >>>>>>>>>>> See the version string below: > >>>>>>>>>>> > >>>>>>>>>>> hbase(main):005:0> version 0.92.0, r1208286, Thu Dec 15 > >>>>>>>>>>> 13:16:03 GMT 2011 > >>>>>>>>>>> > >>>>>>>>>>> We would really appreciate an example of how to create a > >>>>>>>>>>> table that is enabled to handle Aggregation). > >>>>>>>>>>> > >>>>>>>>>>> Thanks > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> ________________________________________ > >>>>>>>>>>> From: Ted Yu [[email protected]] > >>>>>>>>>>> Sent: 22 December 2011 17:03 > >>>>>>>>>>> To: [email protected] > >>>>>>>>>>> Subject: Re: AggregateProtocol Help > >>>>>>>>>>> > >>>>>>>>>>> Have you loaded AggregateImplementation into your table ? > >>>>>>>>>>> Can you show us the contents of the following command in > >>>>>>>>>>> hbase > >>>>>> shell: > >>>>>>>>>>> describe 'your-table' > >>>>>>>>>>> > >>>>>>>>>>> BTW are you using the tip of 0.92 ? > >>>>>>>>>>> HBASE-4946 would be of help for dynamically loaded > >>>>>>>>>>> coprocessors > >>>>>> which > >>>>>>>>>>> you might use in the future. > >>>>>>>>>>> > >>>>>>>>>>> Cheers > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Dec 22, 2011 at 8:09 AM, Tom Wilcox > >>>>>>>>>>> <[email protected]> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi, > >>>>>>>>>>>> > >>>>>>>>>>>> We are trying to use the aggregation functionality in > >>>>>>>>>>>> HBase > >>>>>>>>>>>> 0.92 and we have managed to get the test code working > >>>>>>>>>>>> using the > >>>>>> following > >>>>>>>>> command: > >>>>>>>>>>>> > >>>>>>>>>>>> java -classpath junit-4.10.jar:build/*:$HBASELIBS/* > >>>>>>>>>>>> org.junit.runner.JUnitCore > >>>>>>>>>>>> org.apache.hadoop.hbase.coprocessor.TestAggregateProtocol > >>>>>>>>>>>> > >>>>>>>>>>>> Closer inspection of this test class has revealed that it > >>>>>>>>>>>> uses a mini DFS cluster to populate and run the tests. > >>>>>>>>>>>> These tests return > >>>>>>>>> successfully. > >>>>>>>>>>>> > >>>>>>>>>>>> However, when we attempt to run similar code on our > >>>>>>>>>>>> development HDFS cluster we experience the following error: > >>>>>>>>>>>> > >>>>>>>>>>>> 11/12/22 15:46:28 WARN > >>>>>>>>>>>> client.HConnectionManager$HConnectionImplementation: > >>>>>>>>>>>> Error executing for > >>>>>>>>>>> row > >>>>>>>>>>>> java.util.concurrent.ExecutionException: > >>>>>>>>>>>> > >> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > >>>>>>>>>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolExcep > >>>>>>>>>>>> ti > >>>>>>>>>>>> on > >>>>>>>>>>>> : No > >>>>>>>>>>> matching > >>>>>>>>>>>> handler for protocol > >>>>>>>>>>> org.apache.hadoop.hbase.coprocessor.AggregateProtocol > >>>>>>>>>>>> in region > >>>>>>> EDRPTestTbl,,1324485124322.7b9ee0d113db9b24ea9fdde90702d006. > >>>>>>>>>>>> at > >>>>>>>>>>>> org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion > >>>>>>>>>>>> .j > >>>>>>>>>>>> av > >>>>>>>>>>>> a:4010 > >>>>>>>>>>>> ) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocesso > >>>>>> r( > >>>>>> HR > >>>>>>>>>>> egionServer.java:3040) > >>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>>>>>>>>> Method) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>>>>>>>>> java:39) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodA > >>>>>> cc > >>>>>> es > >>>>>>>>>>> sorImpl.java:25) > >>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(Writabl > >>>>>> eR > >>>>>> pc > >>>>>>>>>>> Engine.java:364) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java: > >>>>>>>>>>> 1325) > >>>>>>>>>>>> [sshexec] > >>>>>>>>>>>> at > >>>>>>>>>>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask. > >>>>>>>>>>>> ja > >>>>>>>>>>>> va > >>>>>>>>>>>> :222) > >>>>>>>>>>>> at > >>>>>>>>>>>> java.util.concurrent.FutureTask.get(FutureTask.java:83) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpl > >>>>>> em > >>>>>> en > >>>>>>>>>>> tation.processExecs(HConnectionManager.java:1465) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >> org.apache.hadoop.hbase.client.HTable.coprocessorExec(HTable. > >>>>>>>>>>>> java:1 > >>>>>>>>>>>> 555) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient.sum( > >>>>>> Ag > >>>>>> gr > >>>>>>>>>>> egationClient.java:229) > >>>>>>>>>>>> at > >>>>>>>>>>>> EDRPAggregator.testSumWithValidRange(EDRPAggregator.java:51) > >>>>>>>>>>>> at EDRPAggregator.main(EDRPAggregator.java:77) > >>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>>>>>>>>> Method) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>>>>>>>>> java:39) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodA > >>>>>> cc > >>>>>> es > >>>>>>>>>>> sorImpl.java:25) > >>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>>>>>>>>> at org.apache.hadoop.util.RunJar.main(RunJar.java:156) > >>>>>>>>>>>> Caused by: > >>>>>>>>>>>> > >> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > >>>>>>>>>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolExcep > >>>>>>>>>>>> ti > >>>>>>>>>>>> on > >>>>>>>>>>>> : No > >>>>>>>>>>> matching > >>>>>>>>>>>> handler for protocol > >>>>>>>>>>> org.apache.hadoop.hbase.coprocessor.AggregateProtocol > >>>>>>>>>>>> in region > >>>>>>> EDRPTestTbl,,1324485124322.7b9ee0d113db9b24ea9fdde90702d006. > >>>>>>>>>>>> at > >>>>>>>>>>>> org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion > >>>>>>>>>>>> .j > >>>>>>>>>>>> av > >>>>>>>>>>>> a:4010 > >>>>>>>>>>>> ) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocesso > >>>>>> r( > >>>>>> HR > >>>>>>>>>>> egionServer.java:3040) > >>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>>>>>>>>> Method) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>>>>>>>>> java:39) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodA > >>>>>> cc > >>>>>> es > >>>>>>>>>>> sorImpl.java:25) > >>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(Writabl > >>>>>> eR > >>>>>> pc > >>>>>>>>>>> Engine.java:364) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java: > >>>>>>>>>>> 1325) > >>>>>>>>>>>> [sshexec] > >>>>>>>>>>>> at > >>>>>>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Na > >>>>>>>>>>>> ti > >>>>>>>>>>>> ve > >>>>>>>>>>>> Method) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConst > >>>>>> ru > >>>>>> ct > >>>>>>>>>>> orAccessorImpl.java:39) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Delegat > >>>>>> in > >>>>>> gC > >>>>>>>>>>> onstructorAccessorImpl.java:27) > >>>>>>>>>>>> at > >>>>>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java: > >>>>>>>>>>>> 51 > >>>>>>>>>>>> 3) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.RemoteExceptionHandler.decodeRemoteExcept > >>>>>> io > >>>>>> n( > >>>>>>>>>>> RemoteExceptionHandler.java:96) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpl > >>>>>> em > >>>>>> en > >>>>>>>>>>> tation.translateException(HConnectionManager.java:1651) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpl > >>>>>> em > >>>>>> en > >>>>>>>>>>> tation.getRegionServerWithRetries(HConnectionManager.java: > >>>>>>>>>>> 13 > >>>>>>>>>>> 27 > >>>>>>>>>>> ) > >>>>>>>>>>>> at > >>>>>>>>>>>> org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPC > >>>>>>>>>>>> In > >>>>>>>>>>>> vo > >>>>>>>>>>>> ker.ja > >>>>>>>>>>>> va:79) > >>>>>>>>>>>> at $Proxy3.getSum(Unknown Source) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient$4.ca > >>>>>> ll > >>>>>> (A > >>>>>>>>>>> ggregationClient.java:233) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient$4.ca > >>>>>> ll > >>>>>> (A > >>>>>>>>>>> ggregationClient.java:230) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpl > >>>>>> em > >>>>>> en > >>>>>>>>>>> tation$4.call(HConnectionManager.java:1453) > >>>>>>>>>>>> at > >>>>>>>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask. > >>>>>>>>>>>> ja > >>>>>>>>>>>> va > >>>>>>>>>>>> :303) > >>>>>>>>>>>> at > >>>>>>>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPool > >>>>>> Ex > >>>>>> ec > >>>>>>>>>>> utor.java:886) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec > >>>>>> ut > >>>>>> or > >>>>>>>>>>> .java:908) > >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:662) > >>>>>>>>>>>> > >> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > >>>>>>>>>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolExcep > >>>>>>>>>>>> ti > >>>>>>>>>>>> on > >>>>>>>>>>>> : No > >>>>>>>>>>> matching > >>>>>>>>>>>> handler for protocol > >>>>>>>>>>> org.apache.hadoop.hbase.coprocessor.AggregateProtocol > >>>>>>>>>>>> in region > >>>>>>> EDRPTestTbl,,1324485124322.7b9ee0d113db9b24ea9fdde90702d006. > >>>>>>>>>>>> at > >>>>>>>>>>>> org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion > >>>>>>>>>>>> .j > >>>>>>>>>>>> av > >>>>>>>>>>>> a:4010 > >>>>>>>>>>>> ) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocesso > >>>>>> r( > >>>>>> HR > >>>>>>>>>>> egionServer.java:3040) > >>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>>>>>>>>> Method) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>>>>>>>>> java:39) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodA > >>>>>> cc > >>>>>> es > >>>>>>>>>>> sorImpl.java:25) > >>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(Writabl > >>>>>> eR > >>>>>> pc > >>>>>>>>>>> Engine.java:364) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> > >> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java: > >>>>>>>>>>> 1325) > >>>>>>>>>>>> [sshexec] > >>>>>>>>>>>> at > >>>>>>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Na > >>>>>>>>>>>> ti > >>>>>>>>>>>> ve > >>>>>>>>>>>> Method) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConst > >>>>>> ru > >>>>>> ct > >>>>>>>>>>> orAccessorImpl.java:39) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Delegat > >>>>>> in > >>>>>> gC > >>>>>>>>>>> onstructorAccessorImpl.java:27) > >>>>>>>>>>>> at > >>>>>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java: > >>>>>>>>>>>> 51 > >>>>>>>>>>>> 3) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.RemoteExceptionHandler.decodeRemoteExcept > >>>>>> io > >>>>>> n( > >>>>>>>>>>> RemoteExceptionHandler.java:96) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpl > >>>>>> em > >>>>>> en > >>>>>>>>>>> tation.translateException(HConnectionManager.java:1651) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpl > >>>>>> em > >>>>>> en > >>>>>>>>>>> tation.getRegionServerWithRetries(HConnectionManager.java: > >>>>>>>>>>> 13 > >>>>>>>>>>> 27 > >>>>>>>>>>> ) > >>>>>>>>>>>> at > >>>>>>>>>>>> org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPC > >>>>>>>>>>>> In > >>>>>>>>>>>> vo > >>>>>>>>>>>> ker.ja > >>>>>>>>>>>> va:79) > >>>>>>>>>>>> at $Proxy3.getSum(Unknown Source) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient$4.ca > >>>>>> ll > >>>>>> (A > >>>>>>>>>>> ggregationClient.java:233) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient$4.ca > >>>>>> ll > >>>>>> (A > >>>>>>>>>>> ggregationClient.java:230) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImpl > >>>>>> em > >>>>>> en > >>>>>>>>>>> tation$4.call(HConnectionManager.java:1453) > >>>>>>>>>>>> at > >>>>>>>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask. > >>>>>>>>>>>> ja > >>>>>>>>>>>> va > >>>>>>>>>>>> :303) > >>>>>>>>>>>> at > >>>>>>>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPool > >>>>>> Ex > >>>>>> ec > >>>>>>>>>>> utor.java:886) > >>>>>>>>>>>> at > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec > >>>>>> ut > >>>>>> or > >>>>>>>>>>> .java:908) > >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:662) > >>>>>>>>>>>> > >>>>>>>>>>>> The source for our class is: > >>>>>>>>>>>> > >>>>>>>>>>>> import static org.junit.Assert.assertEquals; > >>>>>>>>>>>> > >>>>>>>>>>>> import java.io.IOException; > >>>>>>>>>>>> > >>>>>>>>>>>> import org.apache.hadoop.conf.Configuration; > >>>>>>>>>>>> import org.apache.hadoop.hbase.HBaseConfiguration; > >>>>>>>>>>>> import org.apache.hadoop.hbase.HBaseTestingUtility; > >>>>>>>>>>>> import org.apache.hadoop.hbase.HConstants; > >>>>>>>>>>>> import org.apache.hadoop.hbase.HTableDescriptor; > >>>>>>>>>>>> import org.apache.hadoop.hbase.client.HTable; > >>>>>>>>>>>> import org.apache.hadoop.hbase.client.Put; > >>>>>>>>>>>> import org.apache.hadoop.hbase.client.Scan; > >>>>>>>>>>>> import > >>>>>>>>>>>> org.apache.hadoop.hbase.client.coprocessor.AggregationCli > >>>>>>>>>>>> en > >>>>>>>>>>>> t; > >>>>>>>>>>>> import > >>>>>>>>>>>> org.apache.hadoop.hbase.client.coprocessor.LongColumnInte > >>>>>>>>>>>> rp re ter; import org.apache.hadoop.hbase.util.Bytes; > >>>>>>>>>>>> import org.apache.hadoop.util.GenericOptionsParser; > >>>>>>>>>>>> import > >>>>>>>>>>>> org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; > >>>>>>>>>>>> import > >>>>>>>>>>>> org.apache.hadoop.hbase.coprocessor.CoprocessorHost; > >>>>>>>>>>>> import org.junit.Test; > >>>>>>>>>>>> > >>>>>>>>>>>> public class EDRPAggregator { > >>>>>>>>>>>> > >>>>>>>>>>>> // private static final byte[] EDRP_FAMILY = > >>>>>>>>>>>> Bytes.toBytes("EDRP"); > >>>>>>>>>>>> // private static final byte[] EDRP_QUALIFIER = > >>>>>>>>>>>> Bytes.toBytes("advanceKWh"); > >>>>>>>>>>>> > >>>>>>>>>>>> private static byte[] ROW = Bytes.toBytes("testRow"); > >>>>>>>>>>>> private static final int ROWSIZE = 20; > >>>>>>>>>>>> private static byte[][] ROWS = makeN(ROW, ROWSIZE); > >>>>>>>>>>>> private static final byte[] TEST_QUALIFIER = > >>>>>>>>>>>> Bytes.toBytes("TestQualifier"); > >>>>>>>>>>>> private static final byte[] TEST_MULTI_CQ = > >>>>>>>>>>>> Bytes.toBytes("TestMultiCQ"); > >>>>>>>>>>>> private static final int rowSeperator1 = 5; > >>>>>>>>>>>> private static final int rowSeperator2 = 12; > >>>>>>>>>>>> > >>>>>>>>>>>> public static void > >>>>>>>>>>>> testSumWithValidRange(Configuration > >>>>>>>>>>>> conf, > >>>>>>>>>>>> String[] otherArgs) throws Throwable { > >>>>>>>>>>>> byte[] EDRP_TABLE = Bytes.toBytes(otherArgs[1]); > >>>>>>>>>>>> byte[] EDRP_FAMILY = > >>>>>>>>>>>> Bytes.toBytes(otherArgs[2]); > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, > >>>>>>>>>>>> > >>>>>>>>>>>> > >> "org.apache.hadoop.hbase.coprocessor.AggregateImplementation" > >>>>>>>>>>>> ); > >>>>>>>>>>>> > >>>>>>>>>>>> // Utility.CreateHBaseTable(conf, > >>>>>>>>>>>> otherArgs[1], > >>>>>>>>>>> otherArgs[2], > >>>>>>>>>>>> true); > >>>>>>>>>>>> > >>>>>>>>>>>> HBaseTestingUtility util = new > >>>>>>>>>>>> HBaseTestingUtility(); > >>>>>>>>>>>> HTable table = util.createTable(EDRP_TABLE, > >>>>>>>>>>>> EDRP_FAMILY); > >>>>>>>>>>>> > >>>>>>>>>>>> AggregationClient aClient = new > >>>>>>>>>>>> AggregationClient(conf); > >>>>>>>>>>>> Scan scan = new Scan(); > >>>>>>>>>>>> scan.addColumn(EDRP_TABLE, EDRP_FAMILY); > >>>>>>>>>>>> final ColumnInterpreter<Long, Long> ci = new > >>>>>>>>>>>> LongColumnInterpreter(); > >>>>>>>>>>>> long sum = > >>>>>>>>>>>> aClient.sum(Bytes.toBytes(otherArgs[0]), > >>>>>>>>>>>> ci, > >>>>>>>>>>> scan); > >>>>>>>>>>>> System.out.println(sum); > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> /** > >>>>>>>>>>>> * Main entry point. > >>>>>>>>>>>> * > >>>>>>>>>>>> * @param argsThe > >>>>>>>>>>>> * command line parameters. > >>>>>>>>>>>> * @throws Exception > >>>>>>>>>>>> * When running the job fails. > >>>>>>>>>>>> */ > >>>>>>>>>>>> public static void main(String[] args) throws > >>>>>>>>>>>> Exception { > >>>>>>>>>>>> Configuration conf = > >>>>>>>>>>>> HBaseConfiguration.create(); > >>>>>>>>>>>> > >>>>>>>>>>>> String[] otherArgs = new > >>>>>>>>>>>> GenericOptionsParser(conf, > >>>>>>>>>>>> args) > >>>>>>>>>>>> .getRemainingArgs(); > >>>>>>>>>>>> if (otherArgs.length != 3) { > >>>>>>>>>>>> System.err > >>>>>>>>>>>> .println("Wrong number of > >>>>>>>>>>>> arguments: " + otherArgs.length); > >>>>>>>>>>>> System.err.println("Usage: " + > >>>>>>>>>>>> "<tablename> <colfam> <qualifier>"); > >>>>>>>>>>>> System.exit(-1); > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> try { > >>>>>>>>>>>> testSumWithValidRange(conf, otherArgs); > >>>>>>>>>>>> } catch (Throwable e) { > >>>>>>>>>>>> e.printStackTrace(); > >>>>>>>>>>>> } > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> /** > >>>>>>>>>>>> * an infrastructure method to prepare rows for the > >>>> testtable. > >>>>>>>>>>>> * > >>>>>>>>>>>> * @param base > >>>>>>>>>>>> * @param n > >>>>>>>>>>>> * @return > >>>>>>>>>>>> */ > >>>>>>>>>>>> private static byte[][] makeN(byte[] base, int n) { > >>>>>>>>>>>> byte[][] ret = new byte[n][]; > >>>>>>>>>>>> for (int i = 0; i < n; i++) { > >>>>>>>>>>>> ret[i] = Bytes.add(base, > >>>>>>>>>>>> Bytes.toBytes(i)); > >>>>>>>>>>>> } > >>>>>>>>>>>> return ret; > >>>>>>>>>>>> } > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> Please can you suggest what might be causing and/or how > >>>>>>>>>>>> we might fix this UnknownProtocolException? > >>>>>>>>>>>> > >>>>>>>>>>>> Also, does anyone have any working examples using the > >>>>>>>>>>>> aggregation > >>>>>>>>>>> protocol > >>>>>>>>>>>> other than the test code? > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Tom > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>> > >>>> > >>> > >>> > >> > >> > >
