This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch branch-2.2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push: new 877564c HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible… (#1047) 877564c is described below commit 877564c1aa77ca4389e80b6c28b8517709752408 Author: Wellington Ramos Chevreuil <wchevre...@apache.org> AuthorDate: Wed Jan 22 09:19:14 2020 +0000 HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible… (#1047) Signed-off-by: Bharath Vissapragada <bhara...@apache.org> Signed-off-by: binlijin <binli...@gmail.com> (cherry picked from commit 62e340901fa60afeb164a1ff22e6092483b0ac48) --- .../HBaseInterClusterReplicationEndpoint.java | 29 ++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index cc9d90e..1c1e053 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -64,8 +64,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; /** @@ -114,6 +116,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private boolean dropOnDeletedTables; private boolean isSerial = false; + /* + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating + * different Connection implementations, or initialize it in a different way, + * so defining createConnection as protected for possible overridings. + */ + protected Connection createConnection(Configuration conf) throws IOException { + return ConnectionFactory.createConnection(conf); + } + + /* + * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating + * different ReplicationSinkManager implementations, or initialize it in a different way, + * so defining createReplicationSinkManager as protected for possible overridings. + */ + protected ReplicationSinkManager createReplicationSinkManager(Connection conn) { + return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(), + this, this.conf); + } + @Override public void init(Context context) throws IOException { super.init(context); @@ -133,12 +154,16 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. - this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf); + Connection connection = createConnection(this.conf); + //Since createConnection method may be overridden by extending classes, we need to make sure + //it's indeed returning a ClusterConnection instance. + Preconditions.checkState(connection instanceof ClusterConnection); + this.conn = (ClusterConnection) connection; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); // ReplicationQueueInfo parses the peerId out of the znode for us - this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf); + this.replicationSinkMgr = createReplicationSinkManager(conn); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);