This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 877564c  HBASE-23683 Make HBaseInterClusterReplicationEndpoint more 
extensible… (#1047)
877564c is described below

commit 877564c1aa77ca4389e80b6c28b8517709752408
Author: Wellington Ramos Chevreuil <wchevre...@apache.org>
AuthorDate: Wed Jan 22 09:19:14 2020 +0000

    HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible… 
(#1047)
    
    Signed-off-by: Bharath Vissapragada <bhara...@apache.org>
    Signed-off-by: binlijin <binli...@gmail.com>
    (cherry picked from commit 62e340901fa60afeb164a1ff22e6092483b0ac48)
---
 .../HBaseInterClusterReplicationEndpoint.java      | 29 ++++++++++++++++++++--
 1 file changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index cc9d90e..1c1e053 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -64,8 +64,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 
 /**
@@ -114,6 +116,25 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   private boolean dropOnDeletedTables;
   private boolean isSerial = false;
 
+  /*
+   * Some implementations of HBaseInterClusterReplicationEndpoint may require 
instantiating
+   * different Connection implementations, or initialize it in a different way,
+   * so defining createConnection as protected for possible overridings.
+   */
+  protected Connection createConnection(Configuration conf) throws IOException 
{
+    return ConnectionFactory.createConnection(conf);
+  }
+
+  /*
+   * Some implementations of HBaseInterClusterReplicationEndpoint may require 
instantiating
+   * different ReplicationSinkManager implementations, or initialize it in a 
different way,
+   * so defining createReplicationSinkManager as protected for possible 
overridings.
+   */
+  protected ReplicationSinkManager createReplicationSinkManager(Connection 
conn) {
+    return new ReplicationSinkManager((ClusterConnection) conn, 
this.ctx.getPeerId(),
+      this, this.conf);
+  }
+
   @Override
   public void init(Context context) throws IOException {
     super.init(context);
@@ -133,12 +154,16 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
     // TODO: This connection is replication specific or we should make it 
particular to
     // replication and make replication specific settings such as compression 
or codec to use
     // passing Cells.
-    this.conn = (ClusterConnection) 
ConnectionFactory.createConnection(this.conf);
+    Connection connection = createConnection(this.conf);
+    //Since createConnection method may be overridden by extending classes, we 
need to make sure
+    //it's indeed returning a ClusterConnection instance.
+    Preconditions.checkState(connection instanceof ClusterConnection);
+    this.conn = (ClusterConnection) connection;
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.metrics = context.getMetrics();
     // ReplicationQueueInfo parses the peerId out of the znode for us
-    this.replicationSinkMgr = new ReplicationSinkManager(conn, 
ctx.getPeerId(), this, this.conf);
+    this.replicationSinkMgr = createReplicationSinkManager(conn);
     // per sink thread pool
     this.maxThreads = 
this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);

Reply via email to