HBASE-20285 Delete all last pushed sequence ids when removing a peer or 
removing the serial flag for a peer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ead569c9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ead569c9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ead569c9

Branch: refs/heads/HBASE-20046-branch-2
Commit: ead569c9515368c2c7e1932561fd05c77ccd9482
Parents: 83488b8
Author: zhangduo <zhang...@apache.org>
Authored: Mon Mar 26 22:17:00 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/MasterProcedure.proto     | 10 +++
 .../replication/ReplicationQueueStorage.java    |  5 ++
 .../replication/ZKReplicationQueueStorage.java  | 37 ++++++++++-
 .../TestZKReplicationQueueStorage.java          | 31 ++++++++-
 .../replication/DisablePeerProcedure.java       | 15 +++++
 .../master/replication/EnablePeerProcedure.java | 15 +++++
 .../master/replication/RemovePeerProcedure.java | 31 ++++++++-
 .../replication/ReplicationPeerManager.java     |  8 ++-
 .../replication/UpdatePeerConfigProcedure.java  |  3 +
 .../replication/SerialReplicationTestBase.java  | 19 +++++-
 .../TestAddToSerialReplicationPeer.java         | 28 ++------
 .../replication/TestSerialReplication.java      | 68 ++++++++++++++++----
 12 files changed, 227 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index f710759..b37557c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -421,3 +421,13 @@ message UpdatePeerConfigStateData {
   required ReplicationPeer peer_config = 1;
   optional ReplicationPeer old_peer_config = 2;
 }
+
+message RemovePeerStateData {
+  optional ReplicationPeer peer_config = 1;
+}
+
+message EnablePeerStateData {
+}
+
+message DisablePeerStateData {
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 99a1e97..cd37ac2 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -87,6 +87,11 @@ public interface ReplicationQueueStorage {
   void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws 
ReplicationException;
 
   /**
+   * Remove all the max sequence id record for the given peer.
+   * @param peerId peer id
+   */
+  void removeLastSequenceIds(String peerId) throws ReplicationException;
+  /**
    * Get the current position for a specific WAL in a given queue for a given 
regionserver.
    * @param serverName the name of the regionserver
    * @param queueId a String that identifies the queue

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 2e7a012..96b0b91 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -102,7 +102,8 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
    */
   private final String hfileRefsZNode;
 
-  private final String regionsZNode;
+  @VisibleForTesting
+  final String regionsZNode;
 
   public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
     super(zookeeper, conf);
@@ -312,6 +313,40 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
   }
 
   @Override
+  public void removeLastSequenceIds(String peerId) throws ReplicationException 
{
+    String suffix = "-" + peerId;
+    try {
+      StringBuilder sb = new StringBuilder(regionsZNode);
+      int regionsZNodeLength = regionsZNode.length();
+      int levelOneLength = regionsZNodeLength + 3;
+      int levelTwoLength = levelOneLength + 3;
+      List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, 
regionsZNode);
+      // it is possible that levelOneDirs is null if we haven't write any last 
pushed sequence ids
+      // yet, so we need an extra check here.
+      if (CollectionUtils.isEmpty(levelOneDirs)) {
+        return;
+      }
+      for (String levelOne : levelOneDirs) {
+        sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne);
+        for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, 
sb.toString())) {
+          sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo);
+          for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, 
sb.toString())) {
+            if (znode.endsWith(suffix)) {
+              sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode);
+              ZKUtil.deleteNode(zookeeper, sb.toString());
+              sb.setLength(levelTwoLength);
+            }
+          }
+          sb.setLength(levelOneLength);
+        }
+        sb.setLength(regionsZNodeLength);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to remove all last sequence ids, 
peerId=" + peerId, e);
+    }
+  }
+
+  @Override
   public long getWALPosition(ServerName serverName, String queueId, String 
fileName)
       throws ReplicationException {
     byte[] bytes;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index 5821271..74a24ac 100644
--- 
a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -32,15 +32,17 @@ import java.util.SortedSet;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MD5Hash;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -71,7 +73,7 @@ public class TestZKReplicationQueueStorage {
   }
 
   @After
-  public void tearDownAfterTest() throws ReplicationException {
+  public void tearDownAfterTest() throws ReplicationException, 
KeeperException, IOException {
     for (ServerName serverName : STORAGE.getListOfReplicators()) {
       for (String queue : STORAGE.getAllQueues(serverName)) {
         STORAGE.removeQueue(serverName, queue);
@@ -301,6 +303,29 @@ public class TestZKReplicationQueueStorage {
     String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7";
     String expectedPath = 
"/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId;
     String path = 
STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
-    Assert.assertEquals(expectedPath, path);
+    assertEquals(expectedPath, path);
+  }
+
+  @Test
+  public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
+    String peerId = "1";
+    String peerIdToDelete = "2";
+    for (int i = 0; i < 100; i++) {
+      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+      STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, 
(long) i));
+      STORAGE.setLastSequenceIds(peerIdToDelete, 
ImmutableMap.of(encodedRegionName, (long) i));
+    }
+    for (int i = 0; i < 100; i++) {
+      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
+      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, 
peerIdToDelete));
+    }
+    STORAGE.removeLastSequenceIds(peerIdToDelete);
+    for (int i = 0; i < 100; i++) {
+      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+      assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
+      assertEquals(HConstants.NO_SEQNUM,
+        STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
index 0871575..7bda1d7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication;
 import java.io.IOException;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisablePeerStateData;
+
 /**
  * The procedure for disabling a replication peer.
  */
@@ -67,4 +70,16 @@ public class DisablePeerProcedure extends 
ModifyPeerProcedure {
       cpHost.postDisableReplicationPeer(peerId);
     }
   }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.serializeStateData(serializer);
+    serializer.serialize(DisablePeerStateData.getDefaultInstance());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.deserializeStateData(serializer);
+    serializer.deserialize(DisablePeerStateData.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
index 890462f..530d4cb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication;
 import java.io.IOException;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnablePeerStateData;
+
 /**
  * The procedure for enabling a replication peer.
  */
@@ -67,4 +70,16 @@ public class EnablePeerProcedure extends ModifyPeerProcedure 
{
       cpHost.postEnableReplicationPeer(peerId);
     }
   }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.serializeStateData(serializer);
+    serializer.serialize(EnablePeerStateData.getDefaultInstance());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.deserializeStateData(serializer);
+    serializer.deserialize(EnablePeerStateData.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 64faf2b..82dc07e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -18,13 +18,18 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RemovePeerStateData;
+
 /**
  * The procedure for removing a replication peer.
  */
@@ -33,6 +38,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RemovePeerProcedure.class);
 
+  private ReplicationPeerConfig peerConfig;
+
   public RemovePeerProcedure() {
   }
 
@@ -51,7 +58,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
     if (cpHost != null) {
       cpHost.preRemoveReplicationPeer(peerId);
     }
-    env.getReplicationPeerManager().preRemovePeer(peerId);
+    peerConfig = env.getReplicationPeerManager().preRemovePeer(peerId);
   }
 
   @Override
@@ -63,10 +70,32 @@ public class RemovePeerProcedure extends 
ModifyPeerProcedure {
   protected void postPeerModification(MasterProcedureEnv env)
       throws IOException, ReplicationException {
     env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
+    if (peerConfig.isSerial()) {
+      env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
+    }
     LOG.info("Successfully removed peer {}", peerId);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.postRemoveReplicationPeer(peerId);
     }
   }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.serializeStateData(serializer);
+    RemovePeerStateData.Builder builder = RemovePeerStateData.newBuilder();
+    if (peerConfig != null) {
+      builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
+    }
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.deserializeStateData(serializer);
+    RemovePeerStateData data = 
serializer.deserialize(RemovePeerStateData.class);
+    if (data.hasPeerConfig()) {
+      this.peerConfig = 
ReplicationPeerConfigUtil.convert(data.getPeerConfig());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index a0e01e0..87d0111 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -109,8 +109,8 @@ public class ReplicationPeerManager {
     return desc;
   }
 
-  void preRemovePeer(String peerId) throws DoNotRetryIOException {
-    checkPeerExists(peerId);
+  ReplicationPeerConfig preRemovePeer(String peerId) throws 
DoNotRetryIOException {
+    return checkPeerExists(peerId).getPeerConfig();
   }
 
   void preEnablePeer(String peerId) throws DoNotRetryIOException {
@@ -220,6 +220,10 @@ public class ReplicationPeerManager {
     return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
   }
 
+  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
+    queueStorage.removeLastSequenceIds(peerId);
+  }
+
   void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
     // Here we need two passes to address the problem of claimQueue. Maybe a 
claimQueue is still
     // on-going when the refresh peer config procedure is done, if a RS which 
has already been

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index b7e670a..ccfd4a0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -107,6 +107,9 @@ public class UpdatePeerConfigProcedure extends 
ModifyPeerProcedure {
   @Override
   protected void postPeerModification(MasterProcedureEnv env)
       throws IOException, ReplicationException {
+    if (oldPeerConfig.isSerial() && !peerConfig.isSerial()) {
+      env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
+    }
     LOG.info("Successfully updated peer config of {} to {}", peerId, 
peerConfig);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
index b5aae85..4b7fa87 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -26,8 +26,13 @@ import java.util.UUID;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -129,7 +134,10 @@ public class SerialReplicationTestBase {
 
   @After
   public void tearDown() throws Exception {
-    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+    Admin admin = UTIL.getAdmin();
+    for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
+      admin.removeReplicationPeer(pd.getPeerId());
+    }
     rollAllWALs();
     if (WRITER != null) {
       WRITER.close();
@@ -233,4 +241,13 @@ public class SerialReplicationTestBase {
       assertEquals(expectedEntries, count);
     }
   }
+
+  protected final TableName createTable() throws IOException, 
InterruptedException {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    UTIL.getAdmin().createTable(
+      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+    UTIL.waitTableAvailable(tableName);
+    return tableName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
index 64b5bb1..317c120 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
@@ -21,14 +21,11 @@ import java.io.IOException;
 import java.util.Collections;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
@@ -88,11 +85,7 @@ public class TestAddToSerialReplicationPeer extends 
SerialReplicationTestBase {
 
   @Test
   public void testAddPeer() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
-    UTIL.getAdmin().createTable(
-      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
-    UTIL.waitTableAvailable(tableName);
+    TableName tableName = createTable();
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       for (int i = 0; i < 100; i++) {
         table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
@@ -118,12 +111,7 @@ public class TestAddToSerialReplicationPeer extends 
SerialReplicationTestBase {
         
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
     UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
 
-    TableName tableName = TableName.valueOf(name.getMethodName());
-
-    UTIL.getAdmin().createTable(
-      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
-    UTIL.waitTableAvailable(tableName);
+    TableName tableName = createTable();
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       for (int i = 0; i < 100; i++) {
         table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
@@ -159,11 +147,7 @@ public class TestAddToSerialReplicationPeer extends 
SerialReplicationTestBase {
         .setReplicateAllUserTables(false).setSerial(true).build();
     UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
 
-    TableName tableName = TableName.valueOf(name.getMethodName());
-    UTIL.getAdmin().createTable(
-      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
-    UTIL.waitTableAvailable(tableName);
+    TableName tableName = createTable();
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       for (int i = 0; i < 100; i++) {
         table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
@@ -190,11 +174,7 @@ public class TestAddToSerialReplicationPeer extends 
SerialReplicationTestBase {
 
   @Test
   public void testDisabledTable() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
-    UTIL.getAdmin().createTable(
-      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
-    UTIL.waitTableAvailable(tableName);
+    TableName tableName = createTable();
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       for (int i = 0; i < 100; i++) {
         table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));

http://git-wip-us.apache.org/repos/asf/hbase/blob/ead569c9/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index bedb2ec..07e626b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -65,11 +65,7 @@ public class TestSerialReplication extends 
SerialReplicationTestBase {
 
   @Test
   public void testRegionMove() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
-    UTIL.getAdmin().createTable(
-      
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
-    UTIL.waitTableAvailable(tableName);
+    TableName tableName = createTable();
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       for (int i = 0; i < 100; i++) {
         table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
@@ -89,11 +85,7 @@ public class TestSerialReplication extends 
SerialReplicationTestBase {
 
   @Test
   public void testRegionSplit() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
-    UTIL.getAdmin().createTable(
-      
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
-        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
-    UTIL.waitTableAvailable(tableName);
+    TableName tableName = createTable();
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       for (int i = 0; i < 100; i++) {
         table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
@@ -148,7 +140,7 @@ public class TestSerialReplication extends 
SerialReplicationTestBase {
     TableName tableName = TableName.valueOf(name.getMethodName());
     UTIL.getAdmin().createTable(
       TableDescriptorBuilder.newBuilder(tableName)
-        .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
           .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
         .build(),
       new byte[][] { splitKey });
@@ -204,4 +196,58 @@ public class TestSerialReplication extends 
SerialReplicationTestBase {
       assertEquals(200, count);
     }
   }
+
+  @Test
+  public void testRemovePeerNothingReplicated() throws Exception {
+    TableName tableName = createTable();
+    String encodedRegionName =
+      
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
+    ReplicationQueueStorage queueStorage =
+      
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+    assertEquals(HConstants.NO_SEQNUM, 
queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
+    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+    assertEquals(HConstants.NO_SEQNUM, 
queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
+  }
+
+  @Test
+  public void testRemovePeer() throws Exception {
+    TableName tableName = createTable();
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    enablePeerAndWaitUntilReplicationDone(100);
+    checkOrder(100);
+    String encodedRegionName =
+      
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
+    ReplicationQueueStorage queueStorage =
+      
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+    assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
+    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+    // confirm that we delete the last pushed sequence id
+    assertEquals(HConstants.NO_SEQNUM, 
queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
+  }
+
+  @Test
+  public void testRemoveSerialFlag() throws Exception {
+    TableName tableName = createTable();
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    enablePeerAndWaitUntilReplicationDone(100);
+    checkOrder(100);
+    String encodedRegionName =
+      
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
+    ReplicationQueueStorage queueStorage =
+      
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+    assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
+    ReplicationPeerConfig peerConfig = 
UTIL.getAdmin().getReplicationPeerConfig(PEER_ID);
+    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
+      ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build());
+    // confirm that we delete the last pushed sequence id
+    assertEquals(HConstants.NO_SEQNUM, 
queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
+  }
 }

Reply via email to