Endpoint coprocessors can be loaded on a single table. They are no different from RegionObservers in this regard. Both are instantiated per region by RegionCoprocessorHost. You should be able to load the coprocessor by setting it as a table attribute. If it doesn't seem to be loading, check the region server logs after you re-enable the table where you have added it. Do you see any log messages from RegionCoprocessorHost?
On Fri, Jul 12, 2013 at 4:33 AM, Asaf Mesika <[email protected]> wrote: > You can't register and end point just for one table. It's like a stored > procedure - you choose to run it and pass parameters to it. > > On Friday, July 12, 2013, ch huang wrote: > > > what your describe is how to load endpoint coprocessor for every region > in > > the hbase, what i want to do is just load it into my test table ,only for > > the regions of the table > > > > On Fri, Jul 12, 2013 at 12:07 PM, Asaf Mesika <[email protected]> > > wrote: > > > > > The only way to register endpoint coprocessor jars is by placing them > in > > > lib dir if hbase and modifying hbase-site.xml to point to it under a > > > property name I forgot at the moment. > > > What you described is a way to register an Observer type coprocessor. > > > > > > > > > On Friday, July 12, 2013, ch huang wrote: > > > > > > > i am testing coprocessor endpoint function, here is my testing > process > > > ,and > > > > error i get ,hope any expert on coprocessor can help me out > > > > > > > > > > > > # vi ColumnAggregationProtocol.java > > > > > > > > import java.io.IOException; > > > > import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; > > > > // A sample protocol for performing aggregation at regions. > > > > public interface ColumnAggregationProtocol > > > > extends CoprocessorProtocol { > > > > // Perform aggregation for a given column at the region. The > > aggregation > > > > // will include all the rows inside the region. It can be extended to > > > > // allow passing start and end rows for a fine-grained aggregation. > > > > public long sum(byte[] family, byte[] qualifier) throws > IOException; > > > > } > > > > > > > > > > > > # vi ColumnAggregationEndpoint.java > > > > > > > > > > > > import java.io.FileWriter; > > > > import java.io.IOException; > > > > import java.util.ArrayList; > > > > import java.util.List; > > > > import org.apache.hadoop.hbase.CoprocessorEnvironment; > > > > import org.apache.hadoop.hbase.KeyValue; > > > > import org.apache.hadoop.hbase.client.Scan; > > > > import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; > > > > import > > org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; > > > > import org.apache.hadoop.hbase.ipc.ProtocolSignature; > > > > import org.apache.hadoop.hbase.regionserver.HRegion; > > > > import org.apache.hadoop.hbase.regionserver.InternalScanner; > > > > import org.apache.hadoop.hbase.util.Bytes; > > > > > > > > //Aggregation implementation at a region. > > > > > > > > public class ColumnAggregationEndpoint extends > BaseEndpointCoprocessor > > > > implements ColumnAggregationProtocol { > > > > @Override > > > > public long sum(byte[] family, byte[] qualifier) > > > > throws IOException { > > > > // aggregate at each region > > > > Scan scan = new Scan(); > > > > scan.addColumn(family, qualifier); > > > > long sumResult = 0; > > > > > > > > CoprocessorEnvironment ce = getEnvironment(); > > > > HRegion hr = ((RegionCoprocessorEnvironment)ce).getRegion(); > > > > InternalScanner scanner = hr.getScanner(scan); > > > > > > > > try { > > > > List<KeyValue> curVals = new ArrayList<KeyValue>(); > > > > boolean hasMore = false; > > > > do { > > > > curVals.clear(); > > > > hasMore = scanner.next(curVals); > > > > KeyValue kv = curVals.get(0); > > > > sumResult += Long.parseLong(Bytes.toString(kv.getValue())); > > > > > > > > } while (hasMore); > > > > } finally { > > > > scanner.close(); > > > > } > > > > return sumResult; > > > > } > > > > > > > > @Override > > > > public long getProtocolVersion(String protocol, long > > clientVersion) > > > > throws IOException { > > > > // TODO Auto-generated method stub > > > > return 0; > > > > } > > > > > > > > > 192.168.10.22:9000/alex/test.jar|ColumnAggregationEndpoint|1001< > > > http://192.168.10.22:9000/alex/test.jar%7CColumnAggregationEndpoint%7C1001 > > > > > > ' > > > > > > > > here is my testing java code > > > > > > > > package com.testme.demo; > > > > import java.io.IOException; > > > > import java.util.Map; > > > > import org.apache.hadoop.conf.Configuration; > > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > > import org.apache.hadoop.hbase.HTableDescriptor; > > > > import org.apache.hadoop.hbase.client.*; > > > > import org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol; > > > > import org.apache.hadoop.hbase.util.*; > > > > import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;; > > > > > > > > public class TestCop { > > > > private static Configuration conf =null; > > > > private static String TEST_TABLE = "mytest"; > > > > private static String TEST_FAMILY = "myfl"; > > > > private static String TEST_QUALIFIER = "myqf"; > > > > /** > > > > * @param args > > > > */ > > > > static { > > > > conf = HBaseConfiguration.create(); > > > > conf.addResource( "hbase-site.xml"); > > > > } > > > > > > > > public static void main(String[] args) throws IOException,Throwable{ > > > > // TODO Auto-generated method stub > > > > conf = HBaseConfiguration.create(); > > > > > > > > HTable table = new HTable(conf,TEST_TABLE); > > > > // HTableDescriptor htd = table.getTableDescriptor(); > > > > > > > > Scan scan = new Scan(); > > > > Map<byte[], Long> results; > > > > > > > > results = table.coprocessorExec(ColumnAggregationProtocol.class, > > > > "1".getBytes(),"5".getBytes(), new > > > Call<ColumnAggregationProtocol,Long>(){ > > > > public Long call(ColumnAggregationProtocol instance)throws > > > > IOException{ > > > > return (Long) instance.sum(TEST_FAMILY.getBytes(), > > > > TEST_QUALIFIER.getBytes()); > > > > }}); > > > > > > > > long sumResult = 0; > > > > long expectedResult = 0; > > > > for (Map.Entry<byte[], Long> e:results.entrySet()){ > > > > sumResult += e.getValue(); > > > > } > > > > System.out.println(sumResult); > > > > } > > > > } > > > > when i run it i get error > > > > Exception in thread "main" > > > > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: > > > > org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No > > > matching > > > > handler for protocol > > > > org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in > region > > > > mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9. > > > > at > > org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463) > > > > at > > > > > > > > > > > > > > org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720) > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > at > > > > > > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > > > > at > > > > > > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > > > at java.lang.reflect.Method.invoke(Method.java:597) > > > > at > > > > > > > > > > > > > > org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320) > > > > at > > > > > > > > > > org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426) > > > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > Method) > > > > at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown > > Source) > > > > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown > > > > Source) > > > > at java.lang.reflect.Constructor.newInstance(Unknown Source) > > > > at > > > > > > > > > > > >
