thanks,but why the test code can not run properly?
On Fri, Jul 12, 2013 at 11:56 AM, Ted Yu <[email protected]> wrote:
> In 0.94, we already have:
>
> public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
> implements ColumnAggregationProtocol {
>
> @Override
> public long sum(byte[] family, byte[] qualifier)
>
> What additional functionality do you need ?
>
> On Thu, Jul 11, 2013 at 8:26 PM, ch huang <[email protected]> wrote:
>
> > i am testing coprocessor endpoint function, here is my testing process
> ,and
> > error i get ,hope any expert on coprocessor can help me out
> >
> >
> > # vi ColumnAggregationProtocol.java
> >
> > import java.io.IOException;
> > import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
> > // A sample protocol for performing aggregation at regions.
> > public interface ColumnAggregationProtocol
> > extends CoprocessorProtocol {
> > // Perform aggregation for a given column at the region. The aggregation
> > // will include all the rows inside the region. It can be extended to
> > // allow passing start and end rows for a fine-grained aggregation.
> > public long sum(byte[] family, byte[] qualifier) throws IOException;
> > }
> >
> >
> > # vi ColumnAggregationEndpoint.java
> >
> >
> > import java.io.FileWriter;
> > import java.io.IOException;
> > import java.util.ArrayList;
> > import java.util.List;
> > import org.apache.hadoop.hbase.CoprocessorEnvironment;
> > import org.apache.hadoop.hbase.KeyValue;
> > import org.apache.hadoop.hbase.client.Scan;
> > import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
> > import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
> > import org.apache.hadoop.hbase.ipc.ProtocolSignature;
> > import org.apache.hadoop.hbase.regionserver.HRegion;
> > import org.apache.hadoop.hbase.regionserver.InternalScanner;
> > import org.apache.hadoop.hbase.util.Bytes;
> >
> > //Aggregation implementation at a region.
> >
> > public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
> > implements ColumnAggregationProtocol {
> > @Override
> > public long sum(byte[] family, byte[] qualifier)
> > throws IOException {
> > // aggregate at each region
> > Scan scan = new Scan();
> > scan.addColumn(family, qualifier);
> > long sumResult = 0;
> >
> > CoprocessorEnvironment ce = getEnvironment();
> > HRegion hr = ((RegionCoprocessorEnvironment)ce).getRegion();
> > InternalScanner scanner = hr.getScanner(scan);
> >
> > try {
> > List<KeyValue> curVals = new ArrayList<KeyValue>();
> > boolean hasMore = false;
> > do {
> > curVals.clear();
> > hasMore = scanner.next(curVals);
> > KeyValue kv = curVals.get(0);
> > sumResult += Long.parseLong(Bytes.toString(kv.getValue()));
> >
> > } while (hasMore);
> > } finally {
> > scanner.close();
> > }
> > return sumResult;
> > }
> >
> > @Override
> > public long getProtocolVersion(String protocol, long clientVersion)
> > throws IOException {
> > // TODO Auto-generated method stub
> > return 0;
> > }
> >
> > @Override
> >
> > public ProtocolSignature getProtocolSignature(String protocol,
> > long clientVersion, int clientMethodsHash) throws
> IOException
> > {
> > // TODO Auto-generated method stub
> > return null;
> > }
> > }
> >
> > i compile and pack the two into test.jar,and put it into my HDFS
> filesystem
> >
> > and load it into my test table
> >
> > hbase(main):006:0> alter 'mytest', METHOD =>
> > 'table_att','coprocessor'=>'hdfs:///
> > 192.168.10.22:9000/alex/test.jar|ColumnAggregationEndpoint|1001<http://192.168.10.22:9000/alex/test.jar%7CColumnAggregationEndpoint%7C1001>
> '
> >
> > here is my testing java code
> >
> > package com.testme.demo;
> > import java.io.IOException;
> > import java.util.Map;
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.hbase.HBaseConfiguration;
> > import org.apache.hadoop.hbase.HTableDescriptor;
> > import org.apache.hadoop.hbase.client.*;
> > import org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol;
> > import org.apache.hadoop.hbase.util.*;
> > import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;;
> >
> > public class TestCop {
> > private static Configuration conf =null;
> > private static String TEST_TABLE = "mytest";
> > private static String TEST_FAMILY = "myfl";
> > private static String TEST_QUALIFIER = "myqf";
> > /**
> > * @param args
> > */
> > static {
> > conf = HBaseConfiguration.create();
> > conf.addResource( "hbase-site.xml");
> > }
> >
> > public static void main(String[] args) throws IOException,Throwable{
> > // TODO Auto-generated method stub
> > conf = HBaseConfiguration.create();
> >
> > HTable table = new HTable(conf,TEST_TABLE);
> > // HTableDescriptor htd = table.getTableDescriptor();
> >
> > Scan scan = new Scan();
> > Map<byte[], Long> results;
> >
> > results = table.coprocessorExec(ColumnAggregationProtocol.class,
> > "1".getBytes(),"5".getBytes(), new
> Call<ColumnAggregationProtocol,Long>(){
> > public Long call(ColumnAggregationProtocol instance)throws
> > IOException{
> > return (Long) instance.sum(TEST_FAMILY.getBytes(),
> > TEST_QUALIFIER.getBytes());
> > }});
> >
> > long sumResult = 0;
> > long expectedResult = 0;
> > for (Map.Entry<byte[], Long> e:results.entrySet()){
> > sumResult += e.getValue();
> > }
> > System.out.println(sumResult);
> > }
> > }
> > when i run it i get error
> > Exception in thread "main"
> > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException:
> > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No
> matching
> > handler for protocol
> > org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region
> > mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9.
> > at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463)
> > at
> >
> >
> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> > at java.lang.reflect.Method.invoke(Method.java:597)
> > at
> >
> >
> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320)
> > at
> >
> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
> > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
> > Source)
> > at java.lang.reflect.Constructor.newInstance(Unknown Source)
> > at
> >
> >
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:90)
> > at
> >
> >
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:79)
> > at
> >
> >
> org.apache.hadoop.hbase.client.ServerCallable.translateException(ServerCallable.java:228)
> > at
> >
> >
> org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:166)
> > at
> > org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPCInvoker.java:79)
> > at com.sun.proxy.$Proxy8.sum(Unknown Source)
> > at com.testme.demo.TestCop$1.call(TestCop.java:41)
> > at com.testme.demo.TestCop$1.call(TestCop.java:1)
> > at
> >
> >
> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$4.call(HConnectionManager.java:1466)
> > at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
> > at java.util.concurrent.FutureTask.run(Unknown Source)
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > at java.lang.Thread.run(Unknown Source)
> > Caused by:
> >
> >
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException):
> > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No
> matching
> > handler for protocol
> > org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region
> > mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9.
> > at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463)
> > at
> >
> >
> org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> > at java.lang.reflect.Method.invoke(Method.java:597)
> > at
> >
> >
> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320)
> > at
> >
> org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)
> > at org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:995)
> > at
> >
> >
> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:86)
> > at com.sun.proxy.$Proxy7.execCoprocessor(Unknown Source)
> > at
> > org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:75)
> > at
> > org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:73)
> > at
> >
> >
> org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:163)
> > ... 10 more
> >
> > hbase(main):020:0> describe
> > 'mytest'
> > DESCRIPTION ENABLED
> > {NAME => 'mytest', coprocessor$1 => 'hdfs:///192.16 true
> > 8.10.22:9000/alex/test.jar|ColumnAggregationEndpoin
> > t|1001', FAMILIES => [{NAME => 'myfl'
> > , DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NO
> > NE', REPLICATION_SCOPE => '0', VERSIONS => '3', COM
> > PRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '21
> > 47483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE
> > => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK =
> > > 'true', BLOCKCACHE => 'true'}]}
> > 1 row(s) in 0.0920 seconds
> >
>