i am testing coprocessor endpoint function, here is my testing process ,and
error i get ,hope any expert on coprocessor can help me out
# vi ColumnAggregationProtocol.java
import java.io.IOException;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
// A sample protocol for performing aggregation at regions.
public interface ColumnAggregationProtocol
extends CoprocessorProtocol {
// Perform aggregation for a given column at the region. The aggregation
// will include all the rows inside the region. It can be extended to
// allow passing start and end rows for a fine-grained aggregation.
public long sum(byte[] family, byte[] qualifier) throws IOException;
}
# vi ColumnAggregationEndpoint.java
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
//Aggregation implementation at a region.
public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
implements ColumnAggregationProtocol {
@Override
public long sum(byte[] family, byte[] qualifier)
throws IOException {
// aggregate at each region
Scan scan = new Scan();
scan.addColumn(family, qualifier);
long sumResult = 0;
CoprocessorEnvironment ce = getEnvironment();
HRegion hr = ((RegionCoprocessorEnvironment)ce).getRegion();
InternalScanner scanner = hr.getScanner(scan);
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean hasMore = false;
do {
curVals.clear();
hasMore = scanner.next(curVals);
KeyValue kv = curVals.get(0);
sumResult += Long.parseLong(Bytes.toString(kv.getValue()));
} while (hasMore);
} finally {
scanner.close();
}
return sumResult;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException
{
// TODO Auto-generated method stub
return null;
}
}
i compile and pack the two into test.jar,and put it into my HDFS filesystem
and load it into my test table
hbase(main):006:0> alter 'mytest', METHOD =>
'table_att','coprocessor'=>'hdfs:///
192.168.10.22:9000/alex/test.jar|ColumnAggregationEndpoint|1001'
here is my testing java code
package com.testme.demo;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;;
public class TestCop {
private static Configuration conf =null;
private static String TEST_TABLE = "mytest";
private static String TEST_FAMILY = "myfl";
private static String TEST_QUALIFIER = "myqf";
/**
* @param args
*/
static {
conf = HBaseConfiguration.create();
conf.addResource( "hbase-site.xml");
}
public static void main(String[] args) throws IOException,Throwable{
// TODO Auto-generated method stub
conf = HBaseConfiguration.create();
HTable table = new HTable(conf,TEST_TABLE);
// HTableDescriptor htd = table.getTableDescriptor();
Scan scan = new Scan();
Map<byte[], Long> results;
results = table.coprocessorExec(ColumnAggregationProtocol.class,
"1".getBytes(),"5".getBytes(), new Call<ColumnAggregationProtocol,Long>(){
public Long call(ColumnAggregationProtocol instance)throws IOException{
return (Long) instance.sum(TEST_FAMILY.getBytes(),
TEST_QUALIFIER.getBytes());
}});
long sumResult = 0;
long expectedResult = 0;
for (Map.Entry<byte[], Long> e:results.entrySet()){
sumResult += e.getValue();
}
System.out.println(sumResult);
}
}
when i run it i get error
Exception in thread "main"
org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException:
org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No matching
handler for protocol
org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region
mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9.
at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463)
at
org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320)
at
org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:90)
at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:79)
at
org.apache.hadoop.hbase.client.ServerCallable.translateException(ServerCallable.java:228)
at
org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:166)
at
org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPCInvoker.java:79)
at com.sun.proxy.$Proxy8.sum(Unknown Source)
at com.testme.demo.TestCop$1.call(TestCop.java:41)
at com.testme.demo.TestCop$1.call(TestCop.java:1)
at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$4.call(HConnectionManager.java:1466)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException):
org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No matching
handler for protocol
org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region
mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9.
at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463)
at
org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320)
at
org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)
at org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:995)
at
org.apache.hadoop.hbase.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:86)
at com.sun.proxy.$Proxy7.execCoprocessor(Unknown Source)
at
org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:75)
at
org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:73)
at
org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:163)
... 10 more
hbase(main):020:0> describe
'mytest'
DESCRIPTION ENABLED
{NAME => 'mytest', coprocessor$1 => 'hdfs:///192.16 true
8.10.22:9000/alex/test.jar|ColumnAggregationEndpoin
t|1001', FAMILIES => [{NAME => 'myfl'
, DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NO
NE', REPLICATION_SCOPE => '0', VERSIONS => '3', COM
PRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '21
47483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE
=> '65536', IN_MEMORY => 'false', ENCODE_ON_DISK =
> 'true', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0920 seconds