HBASE-19633 Clean up the replication queues in the postPeerModification stage 
when removing a peer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7bb1768d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7bb1768d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7bb1768d

Branch: refs/heads/HBASE-19397-branch-2
Commit: 7bb1768d7955f44ac7eff8d772532dd3789ece3b
Parents: 3fc2f85
Author: zhangduo <zhang...@apache.org>
Authored: Tue Jan 2 09:57:23 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Sun Feb 4 20:41:30 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationPeerConfig.java      |  2 +-
 .../replication/VerifyReplication.java          | 34 ++++++++++-------
 .../hbase/replication/ReplicationPeers.java     | 32 ++++++----------
 .../replication/ZKReplicationQueueStorage.java  |  3 +-
 .../replication/ZKReplicationStorageBase.java   |  4 +-
 .../replication/TestReplicationStateBasic.java  | 10 +----
 .../master/replication/AddPeerProcedure.java    |  5 +--
 .../replication/DisablePeerProcedure.java       |  3 +-
 .../master/replication/EnablePeerProcedure.java |  3 +-
 .../master/replication/ModifyPeerProcedure.java | 34 +++++++++--------
 .../replication/RefreshPeerProcedure.java       | 17 ++++-----
 .../master/replication/RemovePeerProcedure.java |  7 ++--
 .../replication/ReplicationPeerManager.java     | 31 +++++++++++++++-
 .../replication/UpdatePeerConfigProcedure.java  |  3 +-
 .../RemoteProcedureResultReporter.java          |  3 +-
 .../regionserver/RefreshPeerCallable.java       |  5 ++-
 .../regionserver/ReplicationSourceManager.java  | 39 +++++++-------------
 .../TestReplicationAdminUsingProcedure.java     |  7 ++--
 18 files changed, 124 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index b80ee16..fdae288 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -27,8 +27,8 @@ import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A configuration for the replication peer cluster.

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index f0070f0..fe45762 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mapreduce.replication;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,13 +44,14 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -66,6 +66,7 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -333,19 +334,24 @@ public class VerifyReplication extends Configured 
implements Tool {
       final Configuration conf, String peerId) throws IOException {
     ZKWatcher localZKW = null;
     try {
-      localZKW = new ZKWatcher(conf, "VerifyReplication",
-          new Abortable() {
-            @Override public void abort(String why, Throwable e) {}
-            @Override public boolean isAborted() {return false;}
-          });
-
-      ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, 
conf);
-      rp.init();
+      localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
+        @Override
+        public void abort(String why, Throwable e) {
+        }
 
-      return Pair.newPair(rp.getPeerConfig(peerId), 
rp.getPeerClusterConfiguration(peerId));
+        @Override
+        public boolean isAborted() {
+          return false;
+        }
+      });
+      ReplicationPeerStorage storage =
+          ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
+      ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
+      return Pair.newPair(peerConfig,
+        ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf));
     } catch (ReplicationException e) {
-      throw new IOException(
-          "An error occurred while trying to connect to the remove peer 
cluster", e);
+      throw new IOException("An error occurred while trying to connect to the 
remove peer cluster",
+          e);
     } finally {
       if (localZKW != null) {
         localZKW.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index e58482e..422801b 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
@@ -39,20 +37,22 @@ import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 @InterfaceAudience.Private
 public class ReplicationPeers {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationPeers.class);
-
   private final Configuration conf;
 
   // Map of peer clusters keyed by their id
   private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
   private final ReplicationPeerStorage peerStorage;
 
-  protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
+  ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
     this.conf = conf;
     this.peerCache = new ConcurrentHashMap<>();
     this.peerStorage = 
ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
   }
 
+  public Configuration getConf() {
+    return conf;
+  }
+
   public void init() throws ReplicationException {
     // Loading all existing peerIds into peer cache.
     for (String peerId : this.peerStorage.listPeerIds()) {
@@ -120,22 +120,13 @@ public class ReplicationPeers {
     return peerCache.keySet();
   }
 
-  public ReplicationPeerConfig getPeerConfig(String peerId) {
-    ReplicationPeer replicationPeer = this.peerCache.get(peerId);
-    if (replicationPeer == null) {
-      throw new IllegalArgumentException("Peer with id= " + peerId + " is not 
cached");
-    }
-    return replicationPeer.getPeerConfig();
-  }
-
-  public Configuration getPeerClusterConfiguration(String peerId) throws 
ReplicationException {
-    ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
-
+  public static Configuration 
getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
+      Configuration baseConf) throws ReplicationException {
     Configuration otherConf;
     try {
-      otherConf = HBaseConfiguration.createClusterConf(this.conf, 
peerConfig.getClusterKey());
+      otherConf = HBaseConfiguration.createClusterConf(baseConf, 
peerConfig.getClusterKey());
     } catch (IOException e) {
-      throw new ReplicationException("Can't get peer configuration for 
peerId=" + peerId, e);
+      throw new ReplicationException("Can't get peer configuration for peer " 
+ peerConfig, e);
     }
 
     if (!peerConfig.getConfiguration().isEmpty()) {
@@ -179,8 +170,9 @@ public class ReplicationPeers {
 >>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication 
 >>>>>>> storage interface
    */
   private ReplicationPeerImpl createPeer(String peerId) throws 
ReplicationException {
-    ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId);
+    ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
     boolean enabled = peerStorage.isPeerEnabled(peerId);
-    return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), 
peerId, enabled, peerConf);
+    return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, 
conf), peerId, enabled,
+        peerConfig);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 41f50d8..ee237f2 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
@@ -50,7 +49,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
  * ZK based replication queue storage.

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index d09a56b..2321e4f 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.replication;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 2589199..07c6c15 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -238,12 +237,6 @@ public abstract class TestReplicationStateBasic {
     } catch (ReplicationException e) {
     }
 
-    try {
-      assertNull(rp.getPeerClusterConfiguration("bogus"));
-      fail("Should have thrown an ReplicationException when passed a bogus 
peerId");
-    } catch (ReplicationException e) {
-    }
-
     assertNumberOfPeers(0);
 
     // Add some peers
@@ -258,7 +251,8 @@ public abstract class TestReplicationStateBasic {
       fail("There are no connected peers, should have thrown an 
IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
-    assertEquals(KEY_ONE, 
ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE)));
+    assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
+        
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), 
rp.getConf())));
     rp.getPeerStorage().removePeer(ID_ONE);
     rp.removePeer(ID_ONE);
     assertNumberOfPeers(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index a4f9b32..f0f7704 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -74,8 +73,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
 
   @Override
   protected void postPeerModification(MasterProcedureEnv env) throws 
IOException {
-    LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " 
peer " + peerId +
-      ", config " + peerConfig);
+    LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : 
"DISABLED", peerId,
+      peerConfig);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, 
peerConfig);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
index 10e35a8..0871575 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -62,7 +61,7 @@ public class DisablePeerProcedure extends ModifyPeerProcedure 
{
 
   @Override
   protected void postPeerModification(MasterProcedureEnv env) throws 
IOException {
-    LOG.info("Successfully disabled peer " + peerId);
+    LOG.info("Successfully disabled peer {}", peerId);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.postDisableReplicationPeer(peerId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
index f2a9f01..890462f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -62,7 +61,7 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
 
   @Override
   protected void postPeerModification(MasterProcedureEnv env) throws 
IOException {
-    LOG.info("Successfully enabled peer " + peerId);
+    LOG.info("Successfully enabled peer {}", peerId);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.postEnableReplicationPeer(peerId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index a682606..c225619 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@@ -84,10 +83,13 @@ public abstract class ModifyPeerProcedure
    * Called before we finish the procedure. The implementation can do some 
logging work, and also
    * call the coprocessor hook if any.
    * <p>
-   * Notice that, since we have already done the actual work, throwing 
exception here will not fail
-   * this procedure, we will just ignore it and finish the procedure as 
suceeded.
+   * Notice that, since we have already done the actual work, throwing {@code 
IOException} here will
+   * not fail this procedure, we will just ignore it and finish the procedure 
as suceeded. If
+   * {@code ReplicationException} is thrown we will retry since this usually 
means we fails to
+   * update the peer storage.
    */
-  protected abstract void postPeerModification(MasterProcedureEnv env) throws 
IOException;
+  protected abstract void postPeerModification(MasterProcedureEnv env)
+      throws IOException, ReplicationException;
 
   private void releaseLatch() {
     ProcedurePrepareLatch.releaseLatch(latch, this);
@@ -101,16 +103,14 @@ public abstract class ModifyPeerProcedure
         try {
           prePeerModification(env);
         } catch (IOException e) {
-          LOG.warn(
-            getClass().getName() + " failed to call CP hook or the pre check 
is failed for peer " +
-              peerId + ", mark the procedure as failure and give up",
-            e);
+          LOG.warn("{} failed to call pre CP hook or the pre check is failed 
for peer {}, " +
+            "mark the procedure as failure and give up", getClass().getName(), 
peerId, e);
           setFailure("master-" + getPeerOperationType().name().toLowerCase() + 
"-peer", e);
           releaseLatch();
           return Flow.NO_MORE_STATE;
         } catch (ReplicationException e) {
-          LOG.warn(getClass().getName() + " failed to call prePeerModification 
for peer " + peerId +
-            ", retry", e);
+          LOG.warn("{} failed to call prePeerModification for peer {}, retry", 
getClass().getName(),
+            peerId, e);
           throw new ProcedureYieldException();
         }
         setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
@@ -119,8 +119,8 @@ public abstract class ModifyPeerProcedure
         try {
           updatePeerStorage(env);
         } catch (ReplicationException e) {
-          LOG.warn(
-            getClass().getName() + " update peer storage for peer " + peerId + 
" failed, retry", e);
+          LOG.warn("{} update peer storage for peer {} failed, retry", 
getClass().getName(), peerId,
+            e);
           throw new ProcedureYieldException();
         }
         setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
@@ -134,9 +134,13 @@ public abstract class ModifyPeerProcedure
       case POST_PEER_MODIFICATION:
         try {
           postPeerModification(env);
+        } catch (ReplicationException e) {
+          LOG.warn("{} failed to call postPeerModification for peer {}, retry",
+            getClass().getName(), peerId, e);
+          throw new ProcedureYieldException();
         } catch (IOException e) {
-          LOG.warn(getClass().getName() + " failed to call prePeerModification 
for peer " + peerId +
-            ", ignore since the procedure has already done", e);
+          LOG.warn("{} failed to call post CP hook for peer {}, " +
+            "ignore since the procedure has already done", 
getClass().getName(), peerId, e);
         }
         releaseLatch();
         return Flow.NO_MORE_STATE;
@@ -175,7 +179,7 @@ public abstract class ModifyPeerProcedure
       throws IOException, InterruptedException {
     if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
       // actually the peer related operations has no rollback, but if we 
haven't done any
-      // modifications on the peer storage, we can just return.
+      // modifications on the peer storage yet, we can just return.
       return;
     }
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
index ba4285f..1253ef9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
@@ -122,17 +121,15 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
 
   private void complete(MasterProcedureEnv env, Throwable error) {
     if (event == null) {
-      LOG.warn("procedure event for " + getProcId() +
-          " is null, maybe the procedure is created when recovery",
-        new Exception());
+      LOG.warn("procedure event for {} is null, maybe the procedure is created 
when recovery",
+        getProcId());
       return;
     }
     if (error != null) {
-      LOG.warn("Refresh peer " + peerId + " for " + type + " on " + 
targetServer + " failed",
-        error);
+      LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, 
targetServer, error);
       this.succ = false;
     } else {
-      LOG.info("Refresh peer " + peerId + " for " + type + " on " + 
targetServer + " suceeded");
+      LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, 
targetServer);
       this.succ = true;
     }
 
@@ -168,9 +165,9 @@ public class RefreshPeerProcedure extends 
Procedure<MasterProcedureEnv>
       dispatched = false;
     }
     if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
-      LOG.info("Can not add remote operation for refreshing peer " + peerId + 
" for " + type +
-          " to " + targetServer + ", this usually because the server is 
already dead," +
-          " give up and mark the procedure as complete");
+      LOG.info("Can not add remote operation for refreshing peer {} for {} to 
{}, " +
+        "this usually because the server is already dead, " +
+        "give up and mark the procedure as complete", peerId, type, 
targetServer);
       return null;
     }
     dispatched = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 6e9c384..64faf2b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -61,8 +60,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure 
{
   }
 
   @Override
-  protected void postPeerModification(MasterProcedureEnv env) throws 
IOException {
-    LOG.info("Successfully removed peer " + peerId);
+  protected void postPeerModification(MasterProcedureEnv env)
+      throws IOException, ReplicationException {
+    env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
+    LOG.info("Successfully removed peer {}", peerId);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.postRemoveReplicationPeer(peerId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index b6732d7..1414d22 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -217,6 +216,36 @@ public class ReplicationPeerManager {
     return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
   }
 
+  private void removeAllQueues0(String peerId) throws ReplicationException {
+    for (ServerName replicator : queueStorage.getListOfReplicators()) {
+      List<String> queueIds = queueStorage.getAllQueues(replicator);
+      for (String queueId : queueIds) {
+        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+        if (queueInfo.getPeerId().equals(peerId)) {
+          queueStorage.removeQueue(replicator, queueId);
+        }
+      }
+      queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
+    }
+  }
+
+  public void removeAllQueuesAndHFileRefs(String peerId) throws 
ReplicationException {
+    // Here we need two passes to address the problem of claimQueue. Maybe a 
claimQueue is still
+    // on-going when the refresh peer config procedure is done, if a RS which 
has already been
+    // scanned claims the queue of a RS which has not been scanned yet, we 
will miss that queue in
+    // the scan here, and if the RS who has claimed the queue crashed before 
creating recovered
+    // source, then the queue will leave there until the another RS detects 
the crash and helps
+    // removing the queue.
+    // A two pass scan can solve the problem. Anyway, the queue will not 
disappear during the
+    // claiming, it will either under the old RS or under the new RS, and a 
queue can only be
+    // claimed once after the refresh peer procedure done(as the next claim 
queue will just delete
+    // it), so we can make sure that a two pass scan will finally find the 
queue and remove it,
+    // unless it has already been removed by others.
+    removeAllQueues0(peerId);
+    removeAllQueues0(peerId);
+    queueStorage.removePeerFromHFileRefs(peerId);
+  }
+
   private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws 
DoNotRetryIOException {
     checkClusterKey(peerConfig.getClusterKey());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index a43532d..3497447 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -70,7 +69,7 @@ public class UpdatePeerConfigProcedure extends 
ModifyPeerProcedure {
 
   @Override
   protected void postPeerModification(MasterProcedureEnv env) throws 
IOException {
-    LOG.info("Successfully updated peer config of " + peerId + " to " + 
peerConfig);
+    LOG.info("Successfully updated peer config of {} to {}", peerId, 
peerConfig);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
index e4be422..ac3e95a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
@@ -28,7 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
index c3f33aa..7ada24b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
 import org.apache.log4j.Logger;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import 
org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 12a806b..bd0ec77 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -59,7 +59,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -306,9 +305,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String id) throws IOException, 
ReplicationException {
-    ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
     ReplicationPeer peer = replicationPeers.getPeer(id);
-    ReplicationSourceInterface src = getReplicationSource(id, peerConfig, 
peer);
+    ReplicationSourceInterface src = getReplicationSource(id, peer);
     synchronized (this.walsById) {
       this.sources.add(src);
       Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@@ -503,8 +501,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param peerId the id of the peer cluster
    * @return the created source
    */
-  private ReplicationSourceInterface getReplicationSource(String peerId,
-      ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) 
throws IOException {
+  private ReplicationSourceInterface getReplicationSource(String peerId, 
ReplicationPeer peer)
+      throws IOException {
     RegionServerCoprocessorHost rsServerHost = null;
     TableDescriptors tableDescriptors = null;
     if (server instanceof HRegionServer) {
@@ -516,24 +514,24 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
     ReplicationEndpoint replicationEndpoint = null;
     try {
-      String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
+      String replicationEndpointImpl = 
peer.getPeerConfig().getReplicationEndpointImpl();
       if (replicationEndpointImpl == null) {
         // Default to HBase inter-cluster replication endpoint
         replicationEndpointImpl = 
HBaseInterClusterReplicationEndpoint.class.getName();
       }
       replicationEndpoint = Class.forName(replicationEndpointImpl)
           .asSubclass(ReplicationEndpoint.class).newInstance();
-      if(rsServerHost != null) {
-        ReplicationEndpoint newReplicationEndPoint = rsServerHost
-            .postCreateReplicationEndPoint(replicationEndpoint);
-        if(newReplicationEndPoint != null) {
+      if (rsServerHost != null) {
+        ReplicationEndpoint newReplicationEndPoint =
+            rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
+        if (newReplicationEndPoint != null) {
           // Override the newly created endpoint from the hook with configured 
end point
           replicationEndpoint = newReplicationEndPoint;
         }
       }
     } catch (Exception e) {
-      LOG.warn("Passed replication endpoint implementation throws errors"
-          + " while initializing ReplicationSource for peer: " + peerId, e);
+      LOG.warn("Passed replication endpoint implementation throws errors" +
+        " while initializing ReplicationSource for peer: " + peerId, e);
       throw new IOException(e);
     }
 
@@ -543,8 +541,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       replicationEndpoint, walFileLengthProvider, metrics);
 
     // init replication endpoint
-    replicationEndpoint.init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(),
-      fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, 
server));
+    replicationEndpoint.init(new ReplicationEndpoint.Context(conf, 
peer.getConfiguration(), fs,
+        peerId, clusterId, peer, metrics, tableDescriptors, server));
 
     return src;
   }
@@ -740,17 +738,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
             abortWhenFail(() -> 
queueStorage.removeQueue(server.getServerName(), peerId));
             continue;
           }
-
-          ReplicationPeerConfig peerConfig = null;
-          try {
-            peerConfig = replicationPeers.getPeerConfig(actualPeerId);
-          } catch (Exception e) {
-            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node 
" + deadRS
-                + ", failed to read peer config", e);
-            abortWhenFail(() -> 
queueStorage.removeQueue(server.getServerName(), peerId));
-            continue;
-          }
-
           // track sources in walsByIdRecoveredQueues
           Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
           walsByIdRecoveredQueues.put(peerId, walsByGroup);
@@ -765,7 +752,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           }
 
           // enqueue sources
-          ReplicationSourceInterface src = getReplicationSource(peerId, 
peerConfig, peer);
+          ReplicationSourceInterface src = getReplicationSource(peerId, peer);
           // synchronized on oldsources to avoid adding recovered source for 
the to-be-removed peer
           // see removePeer
           synchronized (oldsources) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7bb1768d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
index b09a8a7..1300376 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
@@ -15,19 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.client.replication;
 
 import java.io.IOException;
-
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -39,6 +35,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
 @Category({ MediumTests.class, ClientTests.class })
 public class TestReplicationAdminUsingProcedure extends TestReplicationBase {
 

Reply via email to