Hello Royston, Sorry to hear that you are getting trouble while using Aggregation functionalities.
557k rows seems to be a small table and a SocketTimeout does not seem to be an ok response. It will be good to know the region distribution as such. (how many regions? Is it a full table scan?) You are using the sum function; how are you using the ColumnInterpreter. Can you enable the log level to debug to see why the RS is taking that long to respond (more than 113 sec). The 0 return value is the default result. Thanks for trying this out. Thanks, Himanshu On Sun, Jan 1, 2012 at 12:26 PM, Royston Sellman < [email protected]> wrote: > Hi Ted, > > I think 0 is the only value we ever see (I'll check tomorrow: the server > is down right now). Our table has 557,000 rows. I'll try a much shorter > table tomorrow. > > Yes, we have RS running on the NN, but it's a test cluster and we are used > to it :) > > Do you think using AggregationProtocol is the best strategy for the case > where we want to use basic SQL-style functions like SUM, AVG, STD, MIN, > MAX? Do you think there is a better strategy? > > Many thanks, > Royston > > > On 1 Jan 2012, at 17:58, Ted Yu wrote: > > > Royston: > > Happy New Year to you too. > > > >>> java.net.SocketTimeoutException: Call to namenode/10.0.0.235:60020failed > on > > > > It seems the namenode above actually refers to a region server. This is a > > little bit confusing :-) > > > > The sum value below is 0. > > Have you ever seen a value greater than 0 ? > > > > How many rows are there in this CF:CQ ? > > The timeout was reported earlier by other people where there're many rows > > in the table. > > > > There is a JIRA to provide streaming support for coprocessor but the > > development there has stalled. > > > > Cheers > > > > On Sun, Jan 1, 2012 at 9:35 AM, Royston Sellman < > > [email protected]> wrote: > > > >> Hi Gary and Ted, > >> > >> Royston (Tom's colleague) here. Back onto this after the Christmas/New > Year > >> break. > >> > >> Many thanks for your help so far. We enabled our database via your > >> hbase-site.xml mod and were able to move on. to other errors. But I > think > >> we > >> are now actually getting an aggregation partially calculated on our > table > >> (this feels like progress). The details: > >> > >> On running our client we now get this exception: > >> 11/12/31 17:51:09 WARN > >> client.HConnectionManager$HConnectionImplementation: Error executing for > >> row > >> > >> java.util.concurrent.ExecutionException: > >> org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after > >> attempts=10, exceptions: > >> Sat Dec 31 17:41:30 GMT 2011, > >> org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1@1fc4f0f8, > >> java.net.SocketTimeoutException: Call to namenode/10.0.0.235:60020failed > >> on > >> socket timeout exception: java.net.SocketTimeoutException: 60000 millis > >> timeout while waiting for channel to be ready for read. ch : > >> java.nio.channels.SocketChannel[connected local=/10.0.0.235:59999 > >> remote=namenode/10.0.0.235:60020] > >> (8 more of these, making for 10 tries) > >> Sat Dec 31 17:51:09 GMT 2011, > >> org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1@1fc4f0f8, > >> java.net.SocketTimeoutException: Call to namenode/10.0.0.235:60020failed > >> on > >> socket timeout exception: java.net.SocketTimeoutException: 60000 millis > >> timeout while waiting for channel to be ready for read. ch : > >> java.nio.channels.SocketChannel[connected local=/10.0.0.235:59364 > >> remote=namenode/10.0.0.235:60020] > >> > >> at > >> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) > >> at java.util.concurrent.FutureTask.get(FutureTask.java:83) > >> at > >> > >> > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation. > >> processExecs(HConnectionManager.java:1465) > >> at > >> org.apache.hadoop.hbase.client.HTable.coprocessorExec(HTable.java:1555) > >> at > >> > >> > org.apache.hadoop.hbase.client.coprocessor.AggregationClient.sum(Aggregation > >> Client.java:229) > >> at EDRPAggregator.testSumWithValidRange(EDRPAggregator.java:51) > >> at EDRPAggregator.main(EDRPAggregator.java:77) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39 > >> ) > >> at > >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl > >> .java:25) > >> at java.lang.reflect.Method.invoke(Method.java:597) > >> at org.apache.hadoop.util.RunJar.main(RunJar.java:156) > >> > >> > >> Looking at the log (.regionserver-namenode.log) I see this debug > message: > >> > >> 2011-12-31 17:42:23,472 DEBUG > >> org.apache.hadoop.hbase.coprocessor.AggregateImplementation: Sum from > this > >> region is EDRPTestTbl,,1324485124322.7b9ee0d113db9b24ea9fdde90702d006.: > 0 > >> > >> Where the sum value looks reasonable which makes me think the sum of a > >> CF:CQ > >> worked. But I never see this value on stdout. Then I see this warning: > >> > >> 2011-12-31 17:42:23,476 WARN org.apache.hadoop.ipc.HBaseServer: > >> (responseTooSlow): {"processingtimems":113146,"call":"execCoprocess$ > >> 2011-12-31 17:42:23,511 WARN org.apache.hadoop.ipc.HBaseServer: IPC > Server > >> Responder, call execCoprocessor([B@4b22fad6, getSum(org.$ > >> 2011-12-31 17:42:23,515 WARN org.apache.hadoop.ipc.HBaseServer: IPC > Server > >> handler 1 on 60020 caught: java.nio.channels.ClosedChann$ > >> at > >> sun.nio.ch.SocketChannelImpl.ensureWriteOpen(SocketChannelImpl.java:133) > >> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:324) > >> at > >> > org.apache.hadoop.hbase.ipc.HBaseServer.channelWrite(HBaseServer.java:1651) > >> at > >> > >> > org.apache.hadoop.hbase.ipc.HBaseServer$Responder.processResponse(HBaseServe > >> r.java:924) > >> at > >> > >> > org.apache.hadoop.hbase.ipc.HBaseServer$Responder.doRespond(HBaseServer.java > >> :1003) > >> at > >> > >> > org.apache.hadoop.hbase.ipc.HBaseServer$Call.sendResponseIfReady(HBaseServer > >> .java:409) > >> at > >> > org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1345) > >> > >> Have we missed out some step in the HBase/RegionServerconfig? Or is our > >> client code still deficient? > >> > >> Can you offer any suggestions? Is there any example code for the new > >> Aggregations stuff. > >> > >> Thanks and Happy New Year to you guys, > >> > >> Royston (and Tom). > >> > >> (HBase 0.92, Hadoop 1.0) > >> > >> > >> -----Original Message----- > >> From: Gary Helmling [mailto:[email protected]] > >> Sent: 23 December 2011 18:06 > >> To: [email protected] > >> Subject: Re: AggregateProtocol Help > >> > >> Hi Tom, > >> > >> The test code is not really the best guide for configuration. > >> > >> To enable the AggregateProtocol on all of your tables, add this to the > >> hbase-site.xml for the servers in your cluster: > >> > >> <property> > >> <name>hbase.coprocessor.user.region.classes</name> > >> > >> > <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> > >> </property> > >> > >> If you only want to use the aggregate functions on a specific table (or > >> tables), then you can enable that individually for the table from the > >> shell: > >> > >> 1) disable the table > >> hbase> disable 'EDRP7' > >> > >> 2) add the coprocessor > >> hbase> alter 'EDRP7', METHOD => 'table_att', > >> > >> > >> > 'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation > >> ||' > >> > >> (Note that the pipes in the value string are required) > >> > >> 3) re-enable the table > >> hbase> enable 'EDRP7' > >> > >> > >> Either way should work. With the second approach you will see the > >> coprocessor listed when you describe the table from the shell, as Ted > >> mentioned. With the first approach you will not, but it should be > loaded > >> all the same. > >> > >> --gh > >> > >> > >> On Fri, Dec 23, 2011 at 7:04 AM, Ted Yu <[email protected]> wrote: > >>> I don't know why you chose HBaseTestingUtility to create the table. > >>> I guess you followed test code example. > >>> > >>> At least you should pass the conf to this ctor: > >>> public HBaseTestingUtility(Configuration conf) { > >>> > >>> If coprocessor was installed correctly, you should see something > >>> like(from > >>> HBASE-5070): > >>> coprocessor$1 => > >>> '|org.apache.hadoop.hbase.constraint.ConstraintProcessor|1073741823|' > >>> > >>> Cheers > >>> > >>> On Fri, Dec 23, 2011 at 3:02 AM, Tom Wilcox <[email protected]> > >> wrote: > >>> > >>>> Hi, > >>>> > >>>> I am not sure how we load the AggregateImplementation into the table. > >>>> When we are creating a table, we use the same functions as the test as > >> follows... > >>>> > >>>> ... > >>>>> conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, > >>>>> > >>>>> "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"); > >>>>> > >>>>> // Utility.CreateHBaseTable(conf, otherArgs[1], > >>>> otherArgs[2], > >>>>> true); > >>>>> > >>>>> HBaseTestingUtility util = new HBaseTestingUtility(); > >>>>> HTable table = util.createTable(EDRP_TABLE, > >>>>> EDRP_FAMILY); > >>>>> > >>>>> AggregationClient aClient = new > >>>>> AggregationClient(conf); > >>>> ... > >>>> > >>>> Running DESCRIBE on a table produced shows the following output: > >>>> > >>>> hbase(main):002:0> describe 'EDRP7' > >>>> DESCRIPTION > >>>> ENABLED > >>>> {NAME => 'EDRP7', FAMILIES => [{NAME => 'advanceKWh', BLOOMFILTER => > >>>> 'NONE', REPLICATION_SCOPE => '0', VERSIONS => true > >>>> '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => > >>>> '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', B > >>>> LOCKCACHE => 'true'}]} > >>>> > >>>> We are using the tip of 0.92 (cloned from the Git repo). See the > >>>> version string below: > >>>> > >>>> hbase(main):005:0> version > >>>> 0.92.0, r1208286, Thu Dec 15 13:16:03 GMT 2011 > >>>> > >>>> We would really appreciate an example of how to create a table that > >>>> is enabled to handle Aggregation). > >>>> > >>>> Thanks > >>>> > >>>> > >>>> ________________________________________ > >>>> From: Ted Yu [[email protected]] > >>>> Sent: 22 December 2011 17:03 > >>>> To: [email protected] > >>>> Subject: Re: AggregateProtocol Help > >>>> > >>>> Have you loaded AggregateImplementation into your table ? > >>>> Can you show us the contents of the following command in hbase shell: > >>>> describe 'your-table' > >>>> > >>>> BTW are you using the tip of 0.92 ? > >>>> HBASE-4946 would be of help for dynamically loaded coprocessors which > >>>> you might use in the future. > >>>> > >>>> Cheers > >>>> > >>>> On Thu, Dec 22, 2011 at 8:09 AM, Tom Wilcox <[email protected]> > >> wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> We are trying to use the aggregation functionality in HBase 0.92 > >>>>> and we have managed to get the test code working using the following > >> command: > >>>>> > >>>>> java -classpath junit-4.10.jar:build/*:$HBASELIBS/* > >>>>> org.junit.runner.JUnitCore > >>>>> org.apache.hadoop.hbase.coprocessor.TestAggregateProtocol > >>>>> > >>>>> Closer inspection of this test class has revealed that it uses a > >>>>> mini DFS cluster to populate and run the tests. These tests return > >> successfully. > >>>>> > >>>>> However, when we attempt to run similar code on our development > >>>>> HDFS cluster we experience the following error: > >>>>> > >>>>> 11/12/22 15:46:28 WARN > >>>>> client.HConnectionManager$HConnectionImplementation: Error > >>>>> executing for > >>>> row > >>>>> java.util.concurrent.ExecutionException: > >>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > >>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No > >>>> matching > >>>>> handler for protocol > >>>> org.apache.hadoop.hbase.coprocessor.AggregateProtocol > >>>>> in region > EDRPTestTbl,,1324485124322.7b9ee0d113db9b24ea9fdde90702d006. > >>>>> at > >>>>> org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:4010 > >>>>> ) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HR > >>>> egionServer.java:3040) > >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>> Method) > >>>>> at > >>>>> > >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>> java:39) > >>>>> at > >>>>> > >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces > >>>> sorImpl.java:25) > >>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpc > >>>> Engine.java:364) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java: > >>>> 1325) > >>>>> [sshexec] > >>>>> at > >>>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) > >>>>> at > >>>>> java.util.concurrent.FutureTask.get(FutureTask.java:83) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplemen > >>>> tation.processExecs(HConnectionManager.java:1465) > >>>>> at > >>>>> org.apache.hadoop.hbase.client.HTable.coprocessorExec(HTable.java:1 > >>>>> 555) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient.sum(Aggr > >>>> egationClient.java:229) > >>>>> at > >>>>> EDRPAggregator.testSumWithValidRange(EDRPAggregator.java:51) > >>>>> at EDRPAggregator.main(EDRPAggregator.java:77) > >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>> Method) > >>>>> at > >>>>> > >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>> java:39) > >>>>> at > >>>>> > >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces > >>>> sorImpl.java:25) > >>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>> at org.apache.hadoop.util.RunJar.main(RunJar.java:156) > >>>>> Caused by: > >>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > >>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No > >>>> matching > >>>>> handler for protocol > >>>> org.apache.hadoop.hbase.coprocessor.AggregateProtocol > >>>>> in region > EDRPTestTbl,,1324485124322.7b9ee0d113db9b24ea9fdde90702d006. > >>>>> at > >>>>> org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:4010 > >>>>> ) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HR > >>>> egionServer.java:3040) > >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>> Method) > >>>>> at > >>>>> > >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>> java:39) > >>>>> at > >>>>> > >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces > >>>> sorImpl.java:25) > >>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpc > >>>> Engine.java:364) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java: > >>>> 1325) > >>>>> [sshexec] > >>>>> at > >>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >>>>> Method) > >>>>> at > >>>>> > >>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct > >>>> orAccessorImpl.java:39) > >>>>> at > >>>>> > >>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC > >>>> onstructorAccessorImpl.java:27) > >>>>> at > >>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:513) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.RemoteExceptionHandler.decodeRemoteException( > >>>> RemoteExceptionHandler.java:96) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplemen > >>>> tation.translateException(HConnectionManager.java:1651) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplemen > >>>> tation.getRegionServerWithRetries(HConnectionManager.java:1327) > >>>>> at > >>>>> org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPCInvoker.ja > >>>>> va:79) > >>>>> at $Proxy3.getSum(Unknown Source) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient$4.call(A > >>>> ggregationClient.java:233) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient$4.call(A > >>>> ggregationClient.java:230) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplemen > >>>> tation$4.call(HConnectionManager.java:1453) > >>>>> at > >>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>>>> at > >>>>> java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>>>> at > >>>>> > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec > >>>> utor.java:886) > >>>>> at > >>>>> > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor > >>>> .java:908) > >>>>> at java.lang.Thread.run(Thread.java:662) > >>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > >>>>> org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No > >>>> matching > >>>>> handler for protocol > >>>> org.apache.hadoop.hbase.coprocessor.AggregateProtocol > >>>>> in region > EDRPTestTbl,,1324485124322.7b9ee0d113db9b24ea9fdde90702d006. > >>>>> at > >>>>> org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:4010 > >>>>> ) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HR > >>>> egionServer.java:3040) > >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > >>>> Method) > >>>>> at > >>>>> > >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. > >>>> java:39) > >>>>> at > >>>>> > >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces > >>>> sorImpl.java:25) > >>>>> at java.lang.reflect.Method.invoke(Method.java:597) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpc > >>>> Engine.java:364) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java: > >>>> 1325) > >>>>> [sshexec] > >>>>> at > >>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > >>>>> Method) > >>>>> at > >>>>> > >>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct > >>>> orAccessorImpl.java:39) > >>>>> at > >>>>> > >>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC > >>>> onstructorAccessorImpl.java:27) > >>>>> at > >>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:513) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.RemoteExceptionHandler.decodeRemoteException( > >>>> RemoteExceptionHandler.java:96) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplemen > >>>> tation.translateException(HConnectionManager.java:1651) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplemen > >>>> tation.getRegionServerWithRetries(HConnectionManager.java:1327) > >>>>> at > >>>>> org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPCInvoker.ja > >>>>> va:79) > >>>>> at $Proxy3.getSum(Unknown Source) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient$4.call(A > >>>> ggregationClient.java:233) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient$4.call(A > >>>> ggregationClient.java:230) > >>>>> at > >>>>> > >>>> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplemen > >>>> tation$4.call(HConnectionManager.java:1453) > >>>>> at > >>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>>>> at > >>>>> java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>>>> at > >>>>> > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec > >>>> utor.java:886) > >>>>> at > >>>>> > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor > >>>> .java:908) > >>>>> at java.lang.Thread.run(Thread.java:662) > >>>>> > >>>>> The source for our class is: > >>>>> > >>>>> import static org.junit.Assert.assertEquals; > >>>>> > >>>>> import java.io.IOException; > >>>>> > >>>>> import org.apache.hadoop.conf.Configuration; > >>>>> import org.apache.hadoop.hbase.HBaseConfiguration; > >>>>> import org.apache.hadoop.hbase.HBaseTestingUtility; > >>>>> import org.apache.hadoop.hbase.HConstants; > >>>>> import org.apache.hadoop.hbase.HTableDescriptor; > >>>>> import org.apache.hadoop.hbase.client.HTable; > >>>>> import org.apache.hadoop.hbase.client.Put; > >>>>> import org.apache.hadoop.hbase.client.Scan; > >>>>> import > >>>>> org.apache.hadoop.hbase.client.coprocessor.AggregationClient; > >>>>> import > >>>>> org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; > >>>>> import org.apache.hadoop.hbase.util.Bytes; > >>>>> import org.apache.hadoop.util.GenericOptionsParser; > >>>>> import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; > >>>>> import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; > >>>>> import org.junit.Test; > >>>>> > >>>>> public class EDRPAggregator { > >>>>> > >>>>> // private static final byte[] EDRP_FAMILY = > >>>>> Bytes.toBytes("EDRP"); > >>>>> // private static final byte[] EDRP_QUALIFIER = > >>>>> Bytes.toBytes("advanceKWh"); > >>>>> > >>>>> private static byte[] ROW = Bytes.toBytes("testRow"); > >>>>> private static final int ROWSIZE = 20; > >>>>> private static byte[][] ROWS = makeN(ROW, ROWSIZE); > >>>>> private static final byte[] TEST_QUALIFIER = > >>>>> Bytes.toBytes("TestQualifier"); > >>>>> private static final byte[] TEST_MULTI_CQ = > >>>>> Bytes.toBytes("TestMultiCQ"); > >>>>> private static final int rowSeperator1 = 5; > >>>>> private static final int rowSeperator2 = 12; > >>>>> > >>>>> public static void testSumWithValidRange(Configuration conf, > >>>>> String[] otherArgs) throws Throwable { > >>>>> byte[] EDRP_TABLE = Bytes.toBytes(otherArgs[1]); > >>>>> byte[] EDRP_FAMILY = Bytes.toBytes(otherArgs[2]); > >>>>> > >>>>> conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, > >>>>> > >>>>> "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"); > >>>>> > >>>>> // Utility.CreateHBaseTable(conf, otherArgs[1], > >>>> otherArgs[2], > >>>>> true); > >>>>> > >>>>> HBaseTestingUtility util = new HBaseTestingUtility(); > >>>>> HTable table = util.createTable(EDRP_TABLE, > >>>>> EDRP_FAMILY); > >>>>> > >>>>> AggregationClient aClient = new > >>>>> AggregationClient(conf); > >>>>> Scan scan = new Scan(); > >>>>> scan.addColumn(EDRP_TABLE, EDRP_FAMILY); > >>>>> final ColumnInterpreter<Long, Long> ci = new > >>>>> LongColumnInterpreter(); > >>>>> long sum = aClient.sum(Bytes.toBytes(otherArgs[0]), > >>>>> ci, > >>>> scan); > >>>>> System.out.println(sum); > >>>>> } > >>>>> > >>>>> /** > >>>>> * Main entry point. > >>>>> * > >>>>> * @param argsThe > >>>>> * command line parameters. > >>>>> * @throws Exception > >>>>> * When running the job fails. > >>>>> */ > >>>>> public static void main(String[] args) throws Exception { > >>>>> Configuration conf = HBaseConfiguration.create(); > >>>>> > >>>>> String[] otherArgs = new GenericOptionsParser(conf, > >>>>> args) > >>>>> .getRemainingArgs(); > >>>>> if (otherArgs.length != 3) { > >>>>> System.err > >>>>> .println("Wrong number of > >>>>> arguments: " + otherArgs.length); > >>>>> System.err.println("Usage: " + "<tablename> > >>>>> <colfam> <qualifier>"); > >>>>> System.exit(-1); > >>>>> } > >>>>> > >>>>> try { > >>>>> testSumWithValidRange(conf, otherArgs); > >>>>> } catch (Throwable e) { > >>>>> e.printStackTrace(); > >>>>> } > >>>>> } > >>>>> > >>>>> /** > >>>>> * an infrastructure method to prepare rows for the testtable. > >>>>> * > >>>>> * @param base > >>>>> * @param n > >>>>> * @return > >>>>> */ > >>>>> private static byte[][] makeN(byte[] base, int n) { > >>>>> byte[][] ret = new byte[n][]; > >>>>> for (int i = 0; i < n; i++) { > >>>>> ret[i] = Bytes.add(base, Bytes.toBytes(i)); > >>>>> } > >>>>> return ret; > >>>>> } > >>>>> } > >>>>> > >>>>> Please can you suggest what might be causing and/or how we might > >>>>> fix this UnknownProtocolException? > >>>>> > >>>>> Also, does anyone have any working examples using the aggregation > >>>> protocol > >>>>> other than the test code? > >>>>> > >>>>> Thanks, > >>>>> Tom > >>>>> > >>>>> > >>>> > >> > >> > >
