what your describe is how to load endpoint coprocessor for every region in the hbase, what i want to do is just load it into my test table ,only for the regions of the table
On Fri, Jul 12, 2013 at 12:07 PM, Asaf Mesika <[email protected]> wrote: > The only way to register endpoint coprocessor jars is by placing them in > lib dir if hbase and modifying hbase-site.xml to point to it under a > property name I forgot at the moment. > What you described is a way to register an Observer type coprocessor. > > > On Friday, July 12, 2013, ch huang wrote: > > > i am testing coprocessor endpoint function, here is my testing process > ,and > > error i get ,hope any expert on coprocessor can help me out > > > > > > # vi ColumnAggregationProtocol.java > > > > import java.io.IOException; > > import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; > > // A sample protocol for performing aggregation at regions. > > public interface ColumnAggregationProtocol > > extends CoprocessorProtocol { > > // Perform aggregation for a given column at the region. The aggregation > > // will include all the rows inside the region. It can be extended to > > // allow passing start and end rows for a fine-grained aggregation. > > public long sum(byte[] family, byte[] qualifier) throws IOException; > > } > > > > > > # vi ColumnAggregationEndpoint.java > > > > > > import java.io.FileWriter; > > import java.io.IOException; > > import java.util.ArrayList; > > import java.util.List; > > import org.apache.hadoop.hbase.CoprocessorEnvironment; > > import org.apache.hadoop.hbase.KeyValue; > > import org.apache.hadoop.hbase.client.Scan; > > import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; > > import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; > > import org.apache.hadoop.hbase.ipc.ProtocolSignature; > > import org.apache.hadoop.hbase.regionserver.HRegion; > > import org.apache.hadoop.hbase.regionserver.InternalScanner; > > import org.apache.hadoop.hbase.util.Bytes; > > > > //Aggregation implementation at a region. > > > > public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor > > implements ColumnAggregationProtocol { > > @Override > > public long sum(byte[] family, byte[] qualifier) > > throws IOException { > > // aggregate at each region > > Scan scan = new Scan(); > > scan.addColumn(family, qualifier); > > long sumResult = 0; > > > > CoprocessorEnvironment ce = getEnvironment(); > > HRegion hr = ((RegionCoprocessorEnvironment)ce).getRegion(); > > InternalScanner scanner = hr.getScanner(scan); > > > > try { > > List<KeyValue> curVals = new ArrayList<KeyValue>(); > > boolean hasMore = false; > > do { > > curVals.clear(); > > hasMore = scanner.next(curVals); > > KeyValue kv = curVals.get(0); > > sumResult += Long.parseLong(Bytes.toString(kv.getValue())); > > > > } while (hasMore); > > } finally { > > scanner.close(); > > } > > return sumResult; > > } > > > > @Override > > public long getProtocolVersion(String protocol, long clientVersion) > > throws IOException { > > // TODO Auto-generated method stub > > return 0; > > } > > > > @Override > > > > public ProtocolSignature getProtocolSignature(String protocol, > > long clientVersion, int clientMethodsHash) throws > IOException > > { > > // TODO Auto-generated method stub > > return null; > > } > > } > > > > i compile and pack the two into test.jar,and put it into my HDFS > filesystem > > > > and load it into my test table > > > > hbase(main):006:0> alter 'mytest', METHOD => > > 'table_att','coprocessor'=>'hdfs:/// > > 192.168.10.22:9000/alex/test.jar|ColumnAggregationEndpoint|1001<http://192.168.10.22:9000/alex/test.jar%7CColumnAggregationEndpoint%7C1001> > ' > > > > here is my testing java code > > > > package com.testme.demo; > > import java.io.IOException; > > import java.util.Map; > > import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.hbase.HBaseConfiguration; > > import org.apache.hadoop.hbase.HTableDescriptor; > > import org.apache.hadoop.hbase.client.*; > > import org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol; > > import org.apache.hadoop.hbase.util.*; > > import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;; > > > > public class TestCop { > > private static Configuration conf =null; > > private static String TEST_TABLE = "mytest"; > > private static String TEST_FAMILY = "myfl"; > > private static String TEST_QUALIFIER = "myqf"; > > /** > > * @param args > > */ > > static { > > conf = HBaseConfiguration.create(); > > conf.addResource( "hbase-site.xml"); > > } > > > > public static void main(String[] args) throws IOException,Throwable{ > > // TODO Auto-generated method stub > > conf = HBaseConfiguration.create(); > > > > HTable table = new HTable(conf,TEST_TABLE); > > // HTableDescriptor htd = table.getTableDescriptor(); > > > > Scan scan = new Scan(); > > Map<byte[], Long> results; > > > > results = table.coprocessorExec(ColumnAggregationProtocol.class, > > "1".getBytes(),"5".getBytes(), new > Call<ColumnAggregationProtocol,Long>(){ > > public Long call(ColumnAggregationProtocol instance)throws > > IOException{ > > return (Long) instance.sum(TEST_FAMILY.getBytes(), > > TEST_QUALIFIER.getBytes()); > > }}); > > > > long sumResult = 0; > > long expectedResult = 0; > > for (Map.Entry<byte[], Long> e:results.entrySet()){ > > sumResult += e.getValue(); > > } > > System.out.println(sumResult); > > } > > } > > when i run it i get error > > Exception in thread "main" > > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No > matching > > handler for protocol > > org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region > > mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9. > > at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463) > > at > > > > > org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > > at > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > at java.lang.reflect.Method.invoke(Method.java:597) > > at > > > > > org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320) > > at > > > org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) > > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown > > Source) > > at java.lang.reflect.Constructor.newInstance(Unknown Source) > > at > > > > > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:90) > > at > > > > > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:79) > > at > > > > > org.apache.hadoop.hbase.client.ServerCallable.translateException(ServerCallable.java:228) > > at > > > > > org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:166) > > at > > org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPCInvoker.java:79) > > at com.sun.proxy.$Proxy8.sum(Unknown Source) > > at com.testme.demo.TestCop$1.call(TestCop.java:41) > > at com.testme.demo.TestCop$1.call(TestCop.java:1) > > at > > > > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$4.call(HConnectionManager.java:1466) > > at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) > > at java.util.concurrent.FutureTask.run(Unknown Source) > > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > > at java.lang.Thread.run(Unknown Source) > > Caused by: > > > > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException): > > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No > matching > > handler for protocol > > org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region > > mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9. > > at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463) > > at > > > > > org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > > at > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > at java.lang.reflect.Method.invoke(Method.java:597) > > at > > > > > org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320) > > at > > > org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426) > > at org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:995) > > at > > > > > org.apache.hadoop.hbase.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:86) > > at com.sun.proxy.$Proxy7.execCoprocessor(Unknown Source) > > at > > org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:75) > > at > > org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:73) > > at > > > > > org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:163) > > ... 10 more > > > > hbase(main):020:0> describe > > 'mytest' > > DESCRIPTION ENABLED > > {NAME => 'mytest', coprocessor$1 => 'hdfs:///192.16 true > > 8.10.22:9000/alex/test.jar|ColumnAggregationEndpoin > > t|1001', FAMILIES => [{NAME => 'myfl' > > , DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NO > > NE', REPLICATION_SCOPE => '0', VERSIONS => '3', COM > > PRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '21 > > 47483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE > > => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK = > > > 'true', BLOCKCACHE => 'true'}]} > > 1 row(s) in 0.0920 seconds > > >
