Hi,
I want to implement EndPoint Coprocessor. From the link
(http://www.3pillarglobal.com/insights/hbase-coprocessors) I created my own
files as specified in the link by using Maven project. My file consist of
package name as main.java.coprocessor and another package is
main.java.coprocessor.generetated. RowCountEndPoint.java file is created under
package main.java.coprocessor and proto file is created under package name
main.java.coprocessor.generated after compiling proto file it create
RowCounterProtos.java file under the same package. Maven project after building
and compiling create jar file named as AVGCoprocessor-0.0.1-SNAPSHOT.jar, which
I have copied in lib folder of installed HBase directory. As well as I have
loaded the same coprocessor jar file in hdfs-hadoop for loading dynamic way.
---------------------------------------------------------------------------RowCountEndPoint.java-----------------------------------------------------------
package main.java.coprocessor;
1.
2. import com.google.protobuf.RpcCallback;
3. import com.google.protobuf.RpcController;
4. import com.google.protobuf.Service;
5. import main.java.coprocessor.generated.RowCounterProtos;
6. import org.apache.hadoop.hbase.Cell;
7. import org.apache.hadoop.hbase.CellUtil;
8. import org.apache.hadoop.hbase.Coprocessor;
9. import org.apache.hadoop.hbase.CoprocessorEnvironment;
10. import org.apache.hadoop.hbase.client.Scan;
11. import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
12. import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
13. import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
14. import org.apache.hadoop.hbase.filter.Filter;
15. import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
16. import org.apache.hadoop.hbase.protobuf.ResponseConverter;
17. import org.apache.hadoop.hbase.regionserver.InternalScanner;
18.
19. import java.io.IOException;
20. import java.util.ArrayList;
21. import java.util.List;
22.
23. public class RowCountEndpoint extends RowCounterProtos.RowCountService
24. implements Coprocessor, CoprocessorService {
25.
26. private RegionCoprocessorEnvironment env;
27.
28. @Override
29. public void start(CoprocessorEnvironment env) throws IOException {
30. if (env instanceof RegionCoprocessorEnvironment) {
31. this.env = (RegionCoprocessorEnvironment) env;
32. } else {
33. throw new CoprocessorException("Must be loaded on a table
region!");
34. }
35. }
36.
37. @Override
38. public void stop(CoprocessorEnvironment env) throws IOException {
39. // nothing to do when coprocessor is shutting down
40. }
41.
42. @Override
43. public Service getService() {
44. return this;
45. }
46.
47. @Override
48. public void getRowCount(RpcController controller,
49. RowCounterProtos.CountRequest request,
50. RpcCallback<RowCounterProtos.CountResponse> done) {
51. RowCounterProtos.CountResponse response = null;
52. try {
53. long count = getCount(new FirstKeyOnlyFilter(), false);
54. response = RowCounterProtos.CountResponse.newBuilder()
55. .setCount(count).build();
56. } catch (IOException ioe) {
57. ResponseConverter.setControllerException(controller, ioe);
58. }
59. done.run(response);
60. }
61.
62. @Override
63. public void getCellCount(RpcController controller,
64. RowCounterProtos.CountRequest request,
65. RpcCallback<RowCounterProtos.CountResponse> done) {
66. RowCounterProtos.CountResponse response = null;
67. try {
68. long count = getCount(null, true);
69. response = RowCounterProtos.CountResponse.newBuilder()
70. .setCount(count).build();
71. } catch (IOException ioe) {
72. ResponseConverter.setControllerException(controller, ioe);
73. }
74. done.run(response);
75. }
76.
77.
78. private long getCount(Filter filter, boolean countCells)
79. throws IOException {
80. long count = 0;
81. Scan scan = new Scan();
82. scan.setMaxVersions(1);
83. if (filter != null) {
84. scan.setFilter(filter);
85. }
86. try (
87. InternalScanner scanner = env.getRegion().getScanner(scan);
88. ) {
89. List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
byte[] lastRow = null;
do {
hasMore = scanner.next(results);
for (Cell cell : results) {
if (!countCells) {
if (lastRow == null || !CellUtil.matchingRow(cell, lastRow)) {
lastRow = CellUtil.cloneRow(cell);
count++;
}
} else count++;
}
results.clear();
} while (hasMore);
}
return count;
}
}
// ^^ RowCountEndpoint
----------------------------------------------------------------------------------------------------------------------------------------------------------------
In the link they loaded coprocessor in static way as well as Dynamic way. I
have tried loading in both the way but unable to succeed.
A) In case of static method of loading I tried like given below
1) In hbase-env.sh
export HBASE_CLASSPATH="/usr/local/hbase/lib/AvgCoprocessor-0.0.1-SNAPSHOT.jar"
2) In hbase-site.xml
<property>
<name>hbase.Coprocessor.region.classes</name>
<value> main.java.coprocessor.RowCountEndPoint</value>
</property>
After loading coprocessor jar file using static method and restarted the HBase.
I am unable to see coprocessor metric in webpage of master as well as
regionserver for my jar file which I have deployed over HBase.
B) For Dynamic loading, we have created a TagAggregate.java file where we just
disabling the table, loading the coprocessor for particular table and then
again enable the table.
---------------------------------------------------------------TagAggregate.java---------------------------------------------------------------------------------
1.
package main.java.coprocessor;
2.
3. import java.io.IOException;
4. import java.net.URI;
5. import java.net.URISyntaxException;
6.
7. import java.util.List;
8.
9. import org.apache.hadoop.conf.Configuration;
10. import org.apache.hadoop.fs.FileSystem;
11. import org.apache.hadoop.fs.Path;
12. import org.apache.hadoop.hbase.Coprocessor;
13. import org.apache.hadoop.hbase.HBaseConfiguration;
14. import org.apache.hadoop.hbase.HColumnDescriptor;
15. import org.apache.hadoop.hbase.HTableDescriptor;
16. import org.apache.hadoop.hbase.TableName;
17. import org.apache.hadoop.hbase.client.HBaseAdmin;
18. import org.apache.hadoop.hbase.client.HConnection;
19. import org.apache.hadoop.hbase.client.HConnectionManager;
20. import org.apache.hadoop.hbase.client.HTableInterface;
21.
22. public class TagAggregate {
23. public static void main(String args[]) throws IOException,
24. URISyntaxException {
25. Configuration conf = HBaseConfiguration.create();
26. conf.addResource(new
Path("/usr/local/hadoop/etc/hadoop/core-site.xml"));
27. conf.addResource(new
Path("/usr/local/hadoop/etc/hadoop/hdfs-site.xml"));
28. FileSystem hdfsFileSystem = FileSystem.get(new URI(
29. "hdfs://localhost.localdomain:9000"),
conf);
30.
31. HConnection connection =
HConnectionManager.createConnection(conf);
32. HTableInterface table = connection.getTable("TLG_1");
33.
34. String tableName = "TLG_1";
35. HBaseAdmin admin = new HBaseAdmin(conf);
36. admin.disableTable(tableName);
37. TableName name = table.getName();
38. HTableDescriptor desc = new HTableDescriptor(name);
39. HColumnDescriptor columnFamily1 = new
HColumnDescriptor("TagsOnDate");
40. desc.addFamily(columnFamily1);
41. Path p = new Path(
42.
"hdfs://localhost.localdomain:9000/hbase/local/jars/tmp/AvgCoprocessor-0.0.1-SNAPSHOT.jar");
desc.setValue(
43.
"COPROCESSOR$1",
44.
"hdfs://localhost.localdomain:9000/hbase/local/jars/tmp/AvgCoprocessor-0.0.1-SNAPSHOT.jar"
45. + "|"
46. +
main.java.coprocessor.generated.RowCounterProtos.class
47.
.getCanonicalName() + "|"
48. +
Coprocessor.PRIORITY_USER);
49. //
desc.addCoprocessor(TagAggregate.class.getCanonicalName(),p,Coprocessor.PRIORITY_LOWEST,null);
50. admin.modifyTable(tableName, desc);
51. admin.enableTable(tableName);
52.
53. }
54. }
The problem that I faced in loading coprocessor jar file using dynamic method
is after disabling the table it shows me error of
Wrong FS: hdfs://localhost.localdomain:9000/hbase/local/jat/tmp expected file://
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Please help me How should I load my coprocessor jar file in HBase !!!!!