You can't register and end point just for one table. It's like a stored procedure - you choose to run it and pass parameters to it.
On Friday, July 12, 2013, ch huang wrote: > what your describe is how to load endpoint coprocessor for every region in > the hbase, what i want to do is just load it into my test table ,only for > the regions of the table > > On Fri, Jul 12, 2013 at 12:07 PM, Asaf Mesika <[email protected]> > wrote: > > > The only way to register endpoint coprocessor jars is by placing them in > > lib dir if hbase and modifying hbase-site.xml to point to it under a > > property name I forgot at the moment. > > What you described is a way to register an Observer type coprocessor. > > > > > > On Friday, July 12, 2013, ch huang wrote: > > > > > i am testing coprocessor endpoint function, here is my testing process > > ,and > > > error i get ,hope any expert on coprocessor can help me out > > > > > > > > > # vi ColumnAggregationProtocol.java > > > > > > import java.io.IOException; > > > import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; > > > // A sample protocol for performing aggregation at regions. > > > public interface ColumnAggregationProtocol > > > extends CoprocessorProtocol { > > > // Perform aggregation for a given column at the region. The > aggregation > > > // will include all the rows inside the region. It can be extended to > > > // allow passing start and end rows for a fine-grained aggregation. > > > public long sum(byte[] family, byte[] qualifier) throws IOException; > > > } > > > > > > > > > # vi ColumnAggregationEndpoint.java > > > > > > > > > import java.io.FileWriter; > > > import java.io.IOException; > > > import java.util.ArrayList; > > > import java.util.List; > > > import org.apache.hadoop.hbase.CoprocessorEnvironment; > > > import org.apache.hadoop.hbase.KeyValue; > > > import org.apache.hadoop.hbase.client.Scan; > > > import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; > > > import > org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; > > > import org.apache.hadoop.hbase.ipc.ProtocolSignature; > > > import org.apache.hadoop.hbase.regionserver.HRegion; > > > import org.apache.hadoop.hbase.regionserver.InternalScanner; > > > import org.apache.hadoop.hbase.util.Bytes; > > > > > > //Aggregation implementation at a region. > > > > > > public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor > > > implements ColumnAggregationProtocol { > > > @Override > > > public long sum(byte[] family, byte[] qualifier) > > > throws IOException { > > > // aggregate at each region > > > Scan scan = new Scan(); > > > scan.addColumn(family, qualifier); > > > long sumResult = 0; > > > > > > CoprocessorEnvironment ce = getEnvironment(); > > > HRegion hr = ((RegionCoprocessorEnvironment)ce).getRegion(); > > > InternalScanner scanner = hr.getScanner(scan); > > > > > > try { > > > List<KeyValue> curVals = new ArrayList<KeyValue>(); > > > boolean hasMore = false; > > > do { > > > curVals.clear(); > > > hasMore = scanner.next(curVals); > > > KeyValue kv = curVals.get(0); > > > sumResult += Long.parseLong(Bytes.toString(kv.getValue())); > > > > > > } while (hasMore); > > > } finally { > > > scanner.close(); > > > } > > > return sumResult; > > > } > > > > > > @Override > > > public long getProtocolVersion(String protocol, long > clientVersion) > > > throws IOException { > > > // TODO Auto-generated method stub > > > return 0; > > > } > > > > > > > 192.168.10.22:9000/alex/test.jar|ColumnAggregationEndpoint|1001< > http://192.168.10.22:9000/alex/test.jar%7CColumnAggregationEndpoint%7C1001 > > > > ' > > > > > > here is my testing java code > > > > > > package com.testme.demo; > > > import java.io.IOException; > > > import java.util.Map; > > > import org.apache.hadoop.conf.Configuration; > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > import org.apache.hadoop.hbase.HTableDescriptor; > > > import org.apache.hadoop.hbase.client.*; > > > import org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol; > > > import org.apache.hadoop.hbase.util.*; > > > import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;; > > > > > > public class TestCop { > > > private static Configuration conf =null; > > > private static String TEST_TABLE = "mytest"; > > > private static String TEST_FAMILY = "myfl"; > > > private static String TEST_QUALIFIER = "myqf"; > > > /** > > > * @param args > > > */ > > > static { > > > conf = HBaseConfiguration.create(); > > > conf.addResource( "hbase-site.xml"); > > > } > > > > > > public static void main(String[] args) throws IOException,Throwable{ > > > // TODO Auto-generated method stub > > > conf = HBaseConfiguration.create(); > > > > > > HTable table = new HTable(conf,TEST_TABLE); > > > // HTableDescriptor htd = table.getTableDescriptor(); > > > > > > Scan scan = new Scan(); > > > Map<byte[], Long> results; > > > > > > results = table.coprocessorExec(ColumnAggregationProtocol.class, > > > "1".getBytes(),"5".getBytes(), new > > Call<ColumnAggregationProtocol,Long>(){ > > > public Long call(ColumnAggregationProtocol instance)throws > > > IOException{ > > > return (Long) instance.sum(TEST_FAMILY.getBytes(), > > > TEST_QUALIFIER.getBytes()); > > > }}); > > > > > > long sumResult = 0; > > > long expectedResult = 0; > > > for (Map.Entry<byte[], Long> e:results.entrySet()){ > > > sumResult += e.getValue(); > > > } > > > System.out.println(sumResult); > > > } > > > } > > > when i run it i get error > > > Exception in thread "main" > > > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > > > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No > > matching > > > handler for protocol > > > org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region > > > mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9. > > > at > org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463) > > > at > > > > > > > > > org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720) > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > at > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > > > at > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > > at java.lang.reflect.Method.invoke(Method.java:597) > > > at > > > > > > > > > org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320) > > > at > > > > > > org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426) > > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > > > at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown > Source) > > > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown > > > Source) > > > at java.lang.reflect.Constructor.newInstance(Unknown Source) > > > at > > > > > > > >
