HBASE-20147 Serial replication will be stuck if we create a table with serial 
replication but add it to a peer after there are region moves


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2d5c0e22
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2d5c0e22
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2d5c0e22

Branch: refs/heads/HBASE-20046-branch-2
Commit: 2d5c0e22c094456ae3e35af26c169ee726aa0939
Parents: 8f379fc
Author: zhangduo <zhang...@apache.org>
Authored: Wed Mar 21 21:03:14 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Sun Apr 8 11:21:36 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/AsyncMetaTableAccessor.java    |  50 ++--
 .../apache/hadoop/hbase/MetaTableAccessor.java  |  26 ++-
 .../src/main/protobuf/MasterProcedure.proto     |   7 +-
 .../replication/ReplicationQueueStorage.java    |   8 +
 .../hbase/replication/ReplicationUtils.java     |   6 +-
 .../replication/ZKReplicationQueueStorage.java  |  50 ++--
 .../master/replication/AddPeerProcedure.java    |  21 +-
 .../master/replication/ModifyPeerProcedure.java | 166 +++++++++++++-
 .../replication/ReplicationPeerManager.java     |  32 +--
 .../replication/UpdatePeerConfigProcedure.java  |  59 ++++-
 .../regionserver/PeerProcedureHandlerImpl.java  |  17 +-
 .../regionserver/ReplicationSourceManager.java  |   4 +-
 .../replication/regionserver/WALEntryBatch.java |   8 +
 .../replication/SerialReplicationTestBase.java  | 229 +++++++++++++++++++
 .../TestAddToSerialReplicationPeer.java         | 215 +++++++++++++++++
 .../replication/TestSerialReplication.java      | 191 +---------------
 16 files changed, 825 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
index 05e60d4..13245d3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
@@ -489,16 +489,17 @@ public class AsyncMetaTableAccessor {
       QueryType type) {
     return tableName.map((table) -> {
       switch (type) {
-      case REGION:
-        byte[] startRow = new byte[table.getName().length + 2];
-        System.arraycopy(table.getName(), 0, startRow, 0, 
table.getName().length);
-        startRow[startRow.length - 2] = HConstants.DELIMITER;
-        startRow[startRow.length - 1] = HConstants.DELIMITER;
-        return startRow;
-      case ALL:
-      case TABLE:
-      default:
-        return table.getName();
+        case REGION:
+        case REPLICATION:
+          byte[] startRow = new byte[table.getName().length + 2];
+          System.arraycopy(table.getName(), 0, startRow, 0, 
table.getName().length);
+          startRow[startRow.length - 2] = HConstants.DELIMITER;
+          startRow[startRow.length - 1] = HConstants.DELIMITER;
+          return startRow;
+        case ALL:
+        case TABLE:
+        default:
+          return table.getName();
       }
     });
   }
@@ -512,20 +513,21 @@ public class AsyncMetaTableAccessor {
     return tableName.map((table) -> {
       final byte[] stopRow;
       switch (type) {
-      case REGION:
-        stopRow = new byte[table.getName().length + 3];
-        System.arraycopy(table.getName(), 0, stopRow, 0, 
table.getName().length);
-        stopRow[stopRow.length - 3] = ' ';
-        stopRow[stopRow.length - 2] = HConstants.DELIMITER;
-        stopRow[stopRow.length - 1] = HConstants.DELIMITER;
-        break;
-      case ALL:
-      case TABLE:
-      default:
-        stopRow = new byte[table.getName().length + 1];
-        System.arraycopy(table.getName(), 0, stopRow, 0, 
table.getName().length);
-        stopRow[stopRow.length - 1] = ' ';
-        break;
+        case REGION:
+        case REPLICATION:
+          stopRow = new byte[table.getName().length + 3];
+          System.arraycopy(table.getName(), 0, stopRow, 0, 
table.getName().length);
+          stopRow[stopRow.length - 3] = ' ';
+          stopRow[stopRow.length - 2] = HConstants.DELIMITER;
+          stopRow[stopRow.length - 1] = HConstants.DELIMITER;
+          break;
+        case ALL:
+        case TABLE:
+        default:
+          stopRow = new byte[table.getName().length + 1];
+          System.arraycopy(table.getName(), 0, stopRow, 0, 
table.getName().length);
+          stopRow[stopRow.length - 1] = ' ';
+          break;
       }
       return stopRow;
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index a800c1c..4cc46c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -192,7 +192,8 @@ public class MetaTableAccessor {
   public enum QueryType {
     ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY),
     REGION(HConstants.CATALOG_FAMILY),
-    TABLE(HConstants.TABLE_FAMILY);
+    TABLE(HConstants.TABLE_FAMILY),
+    REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY);
 
     private final byte[][] families;
 
@@ -1168,8 +1169,9 @@ public class MetaTableAccessor {
     final List<T> results = new ArrayList<>();
     @Override
     public boolean visit(Result r) throws IOException {
-      if (r ==  null || r.isEmpty()) return true;
-      add(r);
+      if (r != null && !r.isEmpty()) {
+        add(r);
+      }
       return true;
     }
 
@@ -2108,6 +2110,24 @@ public class MetaTableAccessor {
     }
   }
 
+  public static List<Pair<String, Long>> 
getTableEncodedRegionNameAndLastBarrier(Connection conn,
+      TableName tableName) throws IOException {
+    List<Pair<String, Long>> list = new ArrayList<>();
+    scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION),
+      getTableStopRowForMeta(tableName, QueryType.REPLICATION), 
QueryType.REPLICATION, r -> {
+        byte[] value =
+          r.getValue(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER);
+        if (value == null) {
+          return true;
+        }
+        long lastBarrier = Bytes.toLong(value);
+        String encodedRegionName = RegionInfo.encodeRegionName(r.getRow());
+        list.add(Pair.newPair(encodedRegionName, lastBarrier));
+        return true;
+      });
+    return list;
+  }
+
   private static void debugLogMutations(List<? extends Mutation> mutations) 
throws IOException {
     if (!METALOG.isDebugEnabled()) {
       return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index fa6fa75..f710759 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -377,7 +377,11 @@ enum PeerModificationState {
   PRE_PEER_MODIFICATION = 1;
   UPDATE_PEER_STORAGE = 2;
   REFRESH_PEER_ON_RS = 3;
-  POST_PEER_MODIFICATION = 4;
+  SERIAL_PEER_REOPEN_REGIONS = 4;
+  SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID = 5;
+  SERIAL_PEER_SET_PEER_ENABLED = 6;
+  SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS = 7;
+  POST_PEER_MODIFICATION = 8;
 }
 
 message PeerModificationStateData {
@@ -415,4 +419,5 @@ message AddPeerStateData {
 
 message UpdatePeerConfigStateData {
   required ReplicationPeer peer_config = 1;
+  optional ReplicationPeer old_peer_config = 2;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index cfe9c9c..99a1e97 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -79,6 +79,14 @@ public interface ReplicationQueueStorage {
   long getLastSequenceId(String encodedRegionName, String peerId) throws 
ReplicationException;
 
   /**
+   * Set the max sequence id of a bunch of regions for a given peer. Will be 
called when setting up
+   * a serial replication peer.
+   * @param peerId peer id
+   * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for 
serial replication.
+   */
+  void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws 
ReplicationException;
+
+  /**
    * Get the current position for a specific WAL in a given queue for a given 
regionserver.
    * @param serverName the name of the regionserver
    * @param queueId a String that identifies the queue

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index e2479e0..1c42de4 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -111,13 +111,11 @@ public final class ReplicationUtils {
     return true;
   }
 
-  public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, 
ReplicationPeerConfig rpc2) {
+  public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig 
rpc1,
+      ReplicationPeerConfig rpc2) {
     if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
       return false;
     }
-    if (rpc1.isSerial() != rpc2.isSerial()) {
-      return false;
-    }
     if (rpc1.replicateAllUserTables()) {
       return isNamespacesEqual(rpc1.getExcludeNamespaces(), 
rpc2.getExcludeNamespaces()) &&
         isTableCFsEqual(rpc1.getExcludeTableCFsMap(), 
rpc2.getExcludeTableCFsMap());

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 1a5749e..2ab08ae 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -202,6 +202,24 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
     }
   }
 
+  private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
+      List<ZKUtilOp> listOfOps) throws KeeperException {
+    for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
+      String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+      String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), 
peerId);
+      /*
+       * Make sure the existence of path
+       * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As 
the javadoc in
+       * multiOrSequential() method said, if received a NodeExistsException, 
all operations will
+       * fail. So create the path here, and in fact, no need to add this 
operation to listOfOps,
+       * because only need to make sure that update file position and sequence 
id atomically.
+       */
+      ZKUtil.createWithParents(zookeeper, path);
+      // Persist the max sequence id of region to zookeeper.
+      listOfOps.add(ZKUtilOp.setData(path, 
ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+    }
+  }
+
   @Override
   public void setWALPosition(ServerName serverName, String queueId, String 
fileName, long position,
       Map<String, Long> lastSeqIds) throws ReplicationException {
@@ -212,23 +230,8 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
           ZKUtil.positionToByteArray(position)));
       }
       // Persist the max sequence id(s) of regions for serial replication 
atomically.
-      for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
-        String peerId = new ReplicationQueueInfo(queueId).getPeerId();
-        String path = 
getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
-        /*
-         * Make sure the existence of path
-         * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. 
As the javadoc in
-         * multiOrSequential() method said, if received a NodeExistsException, 
all operations will
-         * fail. So create the path here, and in fact, no need to add this 
operation to listOfOps,
-         * because only need to make sure that update file position and 
sequence id atomically.
-         */
-        ZKUtil.createWithParents(zookeeper, path);
-        // Persist the max sequence id of region to zookeeper.
-        listOfOps.add(ZKUtilOp.setData(path, 
ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
-      }
-      if (!listOfOps.isEmpty()) {
-        ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
-      }
+      addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
+      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
     } catch (KeeperException e) {
       throw new ReplicationException("Failed to set log position (serverName=" 
+ serverName
           + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" 
+ position + ")", e);
@@ -256,6 +259,19 @@ class ZKReplicationQueueStorage extends 
ZKReplicationStorageBase
   }
 
   @Override
+  public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
+      throws ReplicationException {
+    try {
+      List<ZKUtilOp> listOfOps = new ArrayList<>();
+      addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps);
+      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to set last sequence ids, 
peerId=" + peerId +
+        ", lastSeqIds.size=" + lastSeqIds.size(), e);
+    }
+  }
+
+  @Override
   public long getWALPosition(ServerName serverName, String queueId, String 
fileName)
       throws ReplicationException {
     byte[] bytes;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index f0f7704..72228f6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -57,6 +57,21 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
   }
 
   @Override
+  protected boolean reopenRegionsAfterRefresh() {
+    return true;
+  }
+
+  @Override
+  protected boolean enablePeerBeforeFinish() {
+    return enabled;
+  }
+
+  @Override
+  protected ReplicationPeerConfig getNewPeerConfig() {
+    return peerConfig;
+  }
+
+  @Override
   protected void prePeerModification(MasterProcedureEnv env)
       throws IOException, ReplicationException {
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
@@ -68,11 +83,13 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
 
   @Override
   protected void updatePeerStorage(MasterProcedureEnv env) throws 
ReplicationException {
-    env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled);
+    env.getReplicationPeerManager().addPeer(peerId, peerConfig,
+      peerConfig.isSerial() ? false : enabled);
   }
 
   @Override
-  protected void postPeerModification(MasterProcedureEnv env) throws 
IOException {
+  protected void postPeerModification(MasterProcedureEnv env)
+      throws IOException, ReplicationException {
     LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : 
"DISABLED", peerId,
       peerConfig);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 83c5134..2b76487 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -18,11 +18,28 @@
 package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.TableStateManager;
+import 
org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +55,8 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ModifyPeerProcedure.class);
 
+  private static final int SET_LAST_SEQ_ID_BATCH_SIZE = 1000;
+
   protected ModifyPeerProcedure() {
   }
 
@@ -73,6 +92,114 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
     ProcedurePrepareLatch.releaseLatch(latch, this);
   }
 
+  /**
+   * Implementation class can override this method. The default return value 
is false which means we
+   * will jump to POST_PEER_MODIFICATION and finish the procedure. If returns 
true, we will jump to
+   * SERIAL_PEER_REOPEN_REGIONS.
+   */
+  protected boolean reopenRegionsAfterRefresh() {
+    return false;
+  }
+
+  /**
+   * The implementation class should override this method if the procedure may 
enter the serial
+   * related states.
+   */
+  protected boolean enablePeerBeforeFinish() {
+    throw new UnsupportedOperationException();
+  }
+
+  private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
+    
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+      .map(sn -> new RefreshPeerProcedure(peerId, type, sn))
+      .toArray(RefreshPeerProcedure[]::new));
+  }
+
+  protected ReplicationPeerConfig getOldPeerConfig() {
+    return null;
+  }
+
+  protected ReplicationPeerConfig getNewPeerConfig() {
+    throw new UnsupportedOperationException();
+  }
+
+  private Stream<TableDescriptor> getTables(MasterProcedureEnv env) throws 
IOException {
+    ReplicationPeerConfig peerConfig = getNewPeerConfig();
+    Stream<TableDescriptor> stream = 
env.getMasterServices().getTableDescriptors().getAll().values()
+      .stream().filter(TableDescriptor::hasGlobalReplicationScope)
+      .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName()));
+    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
+    if (oldPeerConfig != null && oldPeerConfig.isSerial()) {
+      stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, 
td.getTableName()));
+    }
+    return stream;
+  }
+
+  private void reopenRegions(MasterProcedureEnv env) throws IOException {
+    Stream<TableDescriptor> stream = getTables(env);
+    TableStateManager tsm = env.getMasterServices().getTableStateManager();
+    stream.filter(td -> {
+      try {
+        return tsm.getTableState(td.getTableName()).isEnabled();
+      } catch (TableStateNotFoundException e) {
+        return false;
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }).forEach(td -> {
+      try {
+        addChildProcedure(env.getAssignmentManager().createReopenProcedures(
+          
env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName())));
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    });
+  }
+
+  private void addToMap(Map<String, Long> lastSeqIds, String 
encodedRegionName, long barrier,
+      ReplicationQueueStorage queueStorage) throws ReplicationException {
+    if (barrier >= 0) {
+      lastSeqIds.put(encodedRegionName, barrier);
+      if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) {
+        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
+        lastSeqIds.clear();
+      }
+    }
+  }
+
+  private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env)
+      throws IOException, ReplicationException {
+    Stream<TableDescriptor> stream = getTables(env);
+    TableStateManager tsm = env.getMasterServices().getTableStateManager();
+    ReplicationQueueStorage queueStorage = 
env.getReplicationPeerManager().getQueueStorage();
+    Connection conn = env.getMasterServices().getConnection();
+    RegionStates regionStates = env.getAssignmentManager().getRegionStates();
+    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
+    stream.forEach(td -> {
+      try {
+        if (tsm.getTableState(td.getTableName()).isEnabled()) {
+          for (Pair<String, Long> name2Barrier : MetaTableAccessor
+            .getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) 
{
+            addToMap(lastSeqIds, name2Barrier.getFirst(), 
name2Barrier.getSecond().longValue() - 1,
+              queueStorage);
+          }
+        } else {
+          for (RegionInfo region : 
regionStates.getRegionsOfTable(td.getTableName(), true)) {
+            long maxSequenceId =
+              WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), 
mfs.getRegionDir(region));
+            addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, 
queueStorage);
+          }
+        }
+      } catch (IOException | ReplicationException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    if (!lastSeqIds.isEmpty()) {
+      queueStorage.setLastSequenceIds(peerId, lastSeqIds);
+    }
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env, 
PeerModificationState state)
       throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
@@ -104,9 +231,42 @@ public abstract class ModifyPeerProcedure extends 
AbstractPeerProcedure<PeerModi
         setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
         return Flow.HAS_MORE_STATE;
       case REFRESH_PEER_ON_RS:
-        
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
-            .map(sn -> new RefreshPeerProcedure(peerId, 
getPeerOperationType(), sn))
-            .toArray(RefreshPeerProcedure[]::new));
+        refreshPeer(env, getPeerOperationType());
+        setNextState(reopenRegionsAfterRefresh() ? 
PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
+          : PeerModificationState.POST_PEER_MODIFICATION);
+        return Flow.HAS_MORE_STATE;
+      case SERIAL_PEER_REOPEN_REGIONS:
+        try {
+          reopenRegions(env);
+        } catch (Exception e) {
+          LOG.warn("{} reopen regions for peer {} failed, retry", 
getClass().getName(), peerId, e);
+          throw new ProcedureYieldException();
+        }
+        
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
+        return Flow.HAS_MORE_STATE;
+      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
+        try {
+          setLastSequenceIdForSerialPeer(env);
+        } catch (Exception e) {
+          LOG.warn("{} set last sequence id for peer {} failed, retry", 
getClass().getName(),
+            peerId, e);
+          throw new ProcedureYieldException();
+        }
+        setNextState(enablePeerBeforeFinish() ? 
PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
+          : PeerModificationState.POST_PEER_MODIFICATION);
+        return Flow.HAS_MORE_STATE;
+      case SERIAL_PEER_SET_PEER_ENABLED:
+        try {
+          env.getReplicationPeerManager().enablePeer(peerId);
+        } catch (ReplicationException e) {
+          LOG.warn("{} enable peer before finish for peer {} failed, retry", 
getClass().getName(),
+            peerId, e);
+          throw new ProcedureYieldException();
+        }
+        
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
+        return Flow.HAS_MORE_STATE;
+      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
+        refreshPeer(env, PeerOperationType.ENABLE);
         setNextState(PeerModificationState.POST_PEER_MODIFICATION);
         return Flow.HAS_MORE_STATE;
       case POST_PEER_MODIFICATION:

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 1e93373..a0e01e0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -85,7 +85,7 @@ public class ReplicationPeerManager {
     }
   }
 
-  public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
+  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
       throws DoNotRetryIOException, ReplicationException {
     if (peerId.contains("-")) {
       throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
@@ -109,43 +109,47 @@ public class ReplicationPeerManager {
     return desc;
   }
 
-  public void preRemovePeer(String peerId) throws DoNotRetryIOException {
+  void preRemovePeer(String peerId) throws DoNotRetryIOException {
     checkPeerExists(peerId);
   }
 
-  public void preEnablePeer(String peerId) throws DoNotRetryIOException {
+  void preEnablePeer(String peerId) throws DoNotRetryIOException {
     ReplicationPeerDescription desc = checkPeerExists(peerId);
     if (desc.isEnabled()) {
       throw new DoNotRetryIOException("Replication peer " + peerId + " has 
already been enabled");
     }
   }
 
-  public void preDisablePeer(String peerId) throws DoNotRetryIOException {
+  void preDisablePeer(String peerId) throws DoNotRetryIOException {
     ReplicationPeerDescription desc = checkPeerExists(peerId);
     if (!desc.isEnabled()) {
       throw new DoNotRetryIOException("Replication peer " + peerId + " has 
already been disabled");
     }
   }
 
-  public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig 
peerConfig)
+  /**
+   * Return the old peer description. Can never be null.
+   */
+  ReplicationPeerDescription preUpdatePeerConfig(String peerId, 
ReplicationPeerConfig peerConfig)
       throws DoNotRetryIOException {
     checkPeerConfig(peerConfig);
     ReplicationPeerDescription desc = checkPeerExists(peerId);
     ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
     if (!isStringEquals(peerConfig.getClusterKey(), 
oldPeerConfig.getClusterKey())) {
       throw new DoNotRetryIOException(
-          "Changing the cluster key on an existing peer is not allowed. 
Existing key '" +
-              oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does 
not match new key '" +
-              peerConfig.getClusterKey() + "'");
+        "Changing the cluster key on an existing peer is not allowed. Existing 
key '" +
+          oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not 
match new key '" +
+          peerConfig.getClusterKey() + "'");
     }
 
     if (!isStringEquals(peerConfig.getReplicationEndpointImpl(),
       oldPeerConfig.getReplicationEndpointImpl())) {
       throw new DoNotRetryIOException("Changing the replication endpoint 
implementation class " +
-          "on an existing peer is not allowed. Existing class '" +
-          oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
-          " does not match new class '" + 
peerConfig.getReplicationEndpointImpl() + "'");
+        "on an existing peer is not allowed. Existing class '" +
+        oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
+        " does not match new class '" + 
peerConfig.getReplicationEndpointImpl() + "'");
     }
+    return desc;
   }
 
   public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean 
enabled)
@@ -216,7 +220,7 @@ public class ReplicationPeerManager {
     return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
   }
 
-  public void removeAllQueuesAndHFileRefs(String peerId) throws 
ReplicationException {
+  void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
     // Here we need two passes to address the problem of claimQueue. Maybe a 
claimQueue is still
     // on-going when the refresh peer config procedure is done, if a RS which 
has already been
     // scanned claims the queue of a RS which has not been scanned yet, we 
will miss that queue in
@@ -340,7 +344,7 @@ public class ReplicationPeerManager {
   public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
       throws ReplicationException {
     ReplicationPeerStorage peerStorage =
-        ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
+      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
     ConcurrentMap<String, ReplicationPeerDescription> peers = new 
ConcurrentHashMap<>();
     for (String peerId : peerStorage.listPeerIds()) {
       ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
@@ -348,7 +352,7 @@ public class ReplicationPeerManager {
       peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, 
peerConfig));
     }
     return new ReplicationPeerManager(peerStorage,
-        ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
+      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index 3497447..b7e670a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -24,6 +24,8 @@ import 
org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +42,10 @@ public class UpdatePeerConfigProcedure extends 
ModifyPeerProcedure {
 
   private ReplicationPeerConfig peerConfig;
 
+  private ReplicationPeerConfig oldPeerConfig;
+
+  private boolean enabled;
+
   public UpdatePeerConfigProcedure() {
   }
 
@@ -54,21 +60,53 @@ public class UpdatePeerConfigProcedure extends 
ModifyPeerProcedure {
   }
 
   @Override
+  protected boolean reopenRegionsAfterRefresh() {
+    // If we remove some tables from the peer config then we do not need to 
enter the extra states
+    // for serial replication. Could try to optimize later since it is not 
easy to determine this...
+    return peerConfig.isSerial() && (!oldPeerConfig.isSerial() ||
+      !ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, 
oldPeerConfig));
+  }
+
+  @Override
+  protected boolean enablePeerBeforeFinish() {
+    // do not need to test reopenRegionsAfterRefresh since we can only enter 
here if
+    // reopenRegionsAfterRefresh returns true.
+    return enabled;
+  }
+
+  @Override
+  protected ReplicationPeerConfig getOldPeerConfig() {
+    return oldPeerConfig;
+  }
+
+  @Override
+  protected ReplicationPeerConfig getNewPeerConfig() {
+    return peerConfig;
+  }
+
+  @Override
   protected void prePeerModification(MasterProcedureEnv env) throws 
IOException {
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
     }
-    env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
+    ReplicationPeerDescription desc =
+      env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
+    oldPeerConfig = desc.getPeerConfig();
+    enabled = desc.isEnabled();
   }
 
   @Override
   protected void updatePeerStorage(MasterProcedureEnv env) throws 
ReplicationException {
     env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
+    if (enabled && reopenRegionsAfterRefresh()) {
+      env.getReplicationPeerManager().disablePeer(peerId);
+    }
   }
 
   @Override
-  protected void postPeerModification(MasterProcedureEnv env) throws 
IOException {
+  protected void postPeerModification(MasterProcedureEnv env)
+      throws IOException, ReplicationException {
     LOG.info("Successfully updated peer config of {} to {}", peerId, 
peerConfig);
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
@@ -79,14 +117,23 @@ public class UpdatePeerConfigProcedure extends 
ModifyPeerProcedure {
   @Override
   protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
     super.serializeStateData(serializer);
-    serializer.serialize(UpdatePeerConfigStateData.newBuilder()
-        .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build());
+    UpdatePeerConfigStateData.Builder builder = 
UpdatePeerConfigStateData.newBuilder()
+      .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
+    if (oldPeerConfig != null) {
+      
builder.setOldPeerConfig(ReplicationPeerConfigUtil.convert(oldPeerConfig));
+    }
+    serializer.serialize(builder.build());
   }
 
   @Override
   protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
     super.deserializeStateData(serializer);
-    peerConfig = ReplicationPeerConfigUtil
-        
.convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig());
+    UpdatePeerConfigStateData data = 
serializer.deserialize(UpdatePeerConfigStateData.class);
+    peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
+    if (data.hasOldPeerConfig()) {
+      oldPeerConfig = 
ReplicationPeerConfigUtil.convert(data.getOldPeerConfig());
+    } else {
+      oldPeerConfig = null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index a02d181..78c1977 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -99,19 +100,26 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
   @Override
   public void updatePeerConfig(String peerId) throws ReplicationException, 
IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
+    ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
     ReplicationPeerImpl peer = null;
     ReplicationPeerConfig oldConfig = null;
+    PeerState oldState = null;
     boolean success = false;
     try {
-      peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
+      peer = peers.getPeer(peerId);
       if (peer == null) {
         throw new ReplicationException("Peer with id=" + peerId + " is not 
cached.");
       }
       oldConfig = peer.getPeerConfig();
-      ReplicationPeerConfig newConfig =
-          
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+      oldState = peer.getPeerState();
+      ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
+      // also need to refresh peer state here. When updating a serial 
replication peer we may
+      // disable it first and then enable it.
+      PeerState newState = peers.refreshPeerState(peerId);
       // RS need to start work with the new replication config change
-      if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
+      if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) 
||
+        oldConfig.isSerial() != newConfig.isSerial() ||
+        (oldState.equals(PeerState.ENABLED) && 
newState.equals(PeerState.DISABLED))) {
         replicationSourceManager.refreshSources(peerId);
       }
       success = true;
@@ -119,6 +127,7 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
       if (!success && peer != null) {
         // Reset peer config if refresh source failed
         peer.setPeerConfig(oldConfig);
+        peer.setPeerState(oldState.equals(PeerState.ENABLED));
       }
       peerLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 23e1115..3ecc50a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -510,7 +510,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
       // synchronized on walsById to avoid race with preLogRoll
       synchronized (this.walsById) {
         NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
-        if (wals != null && !wals.first().equals(log)) {
+        if (wals != null) {
           cleanOldLogs(wals, log, inclusive, queueId);
         }
       }
@@ -755,7 +755,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @return a sorted set of wal names
    */
   @VisibleForTesting
-  Map<String, Map<String, NavigableSet<String>>> getWALs() {
+  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
     return Collections.unmodifiableMap(walsById);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 960d473..22b2de7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -157,4 +157,12 @@ class WALEntryBatch {
   public void setLastSeqId(String region, long sequenceId) {
     lastSeqIds.put(region, sequenceId);
   }
+
+  @Override
+  public String toString() {
+    return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + 
lastWalPath +
+      ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", 
nbHFiles=" +
+      nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", 
endOfFile=" +
+      endOfFile + "]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
new file mode 100644
index 0000000..83afd81
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Base class for testing serial replication.
+ */
+public class SerialReplicationTestBase {
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  protected static String PEER_ID = "1";
+
+  protected static byte[] CF = Bytes.toBytes("CF");
+
+  protected static byte[] CQ = Bytes.toBytes("CQ");
+
+  protected static FileSystem FS;
+
+  protected static Path LOG_DIR;
+
+  protected static WALProvider.Writer WRITER;
+
+  @Rule
+  public final TestName name = new TestName();
+
+  protected Path logPath;
+
+  public static final class LocalReplicationEndpoint extends 
BaseReplicationEndpoint {
+
+    private static final UUID PEER_UUID = UUID.randomUUID();
+
+    @Override
+    public UUID getPeerUUID() {
+      return PEER_UUID;
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      synchronized (WRITER) {
+        try {
+          for (Entry entry : replicateContext.getEntries()) {
+            WRITER.append(entry);
+          }
+          WRITER.sync(false);
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public void start() {
+      startAsync();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
+    UTIL.startMiniCluster(3);
+    // disable balancer
+    UTIL.getAdmin().balancerSwitch(false, true);
+    LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
+    FS = UTIL.getTestFileSystem();
+    FS.mkdirs(LOG_DIR);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+    rollAllWALs();
+    if (WRITER != null) {
+      WRITER.close();
+      WRITER = null;
+    }
+  }
+
+  protected static void moveRegion(RegionInfo region, HRegionServer rs) throws 
Exception {
+    UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
+      Bytes.toBytes(rs.getServerName().getServerName()));
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return rs.getRegion(region.getEncodedName()) != null;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return region + " is still not on " + rs;
+      }
+    });
+  }
+
+  protected static void rollAllWALs() throws Exception {
+    for (RegionServerThread t : 
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
+      t.getRegionServer().getWalRoller().requestRollAll();
+    }
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
+          .map(t -> 
t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Log roll has not finished yet";
+      }
+    });
+  }
+
+  protected final void setupWALWriter() throws IOException {
+    logPath = new Path(LOG_DIR, name.getMethodName());
+    WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
+  }
+
+  protected final void waitUntilReplicationDone(int expectedEntries) throws 
Exception {
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        try (WAL.Reader reader = WALFactory.createReader(FS, logPath, 
UTIL.getConfiguration())) {
+          int count = 0;
+          while (reader.next() != null) {
+            count++;
+          }
+          return count >= expectedEntries;
+        } catch (IOException e) {
+          return false;
+        }
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Not enough entries replicated";
+      }
+    });
+  }
+
+  protected final void addPeer(boolean enabled) throws IOException {
+    UTIL.getAdmin().addReplicationPeer(PEER_ID,
+      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+        
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
+        .build(),
+      enabled);
+  }
+
+  protected final void checkOrder(int expectedEntries) throws IOException {
+    try (WAL.Reader reader =
+      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, 
UTIL.getConfiguration())) {
+      long seqId = -1L;
+      int count = 0;
+      for (Entry entry;;) {
+        entry = reader.next();
+        if (entry == null) {
+          break;
+        }
+        assertTrue(
+          "Sequence id go backwards from " + seqId + " to " + 
entry.getKey().getSequenceId(),
+          entry.getKey().getSequenceId() >= seqId);
+        count++;
+      }
+      assertEquals(expectedEntries, count);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
new file mode 100644
index 0000000..64b5bb1
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * Testcase for HBASE-20147.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class);
+
+  @Before
+  public void setUp() throws IOException, StreamLacksCapabilityException {
+    setupWALWriter();
+  }
+
+  // make sure that we will start replication for the sequence id after move, 
that's what we want to
+  // test here.
+  private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer 
rs) throws Exception {
+    moveRegion(region, rs);
+    rollAllWALs();
+  }
+
+  private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs) throws 
Exception {
+    Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
+    String logPrefix = 
AbstractFSWALProvider.getWALPrefixFromWALName(path.getName());
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        ReplicationSourceManager manager =
+          ((Replication) 
rs.getReplicationSourceService()).getReplicationManager();
+        return manager.getWALs().get(PEER_ID).get(logPrefix).size() == 1;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Still not replicated to the current WAL file yet";
+      }
+    });
+  }
+
+  @Test
+  public void testAddPeer() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    UTIL.getAdmin().createTable(
+      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+    UTIL.waitTableAvailable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+    HRegionServer rs = 
UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+    moveRegionAndArchiveOldWals(region, rs);
+    addPeer(true);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    waitUntilReplicationDone(100);
+    checkOrder(100);
+  }
+
+  @Test
+  public void testChangeToSerial() throws Exception {
+    ReplicationPeerConfig peerConfig =
+      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+        
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
+    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
+
+    TableName tableName = TableName.valueOf(name.getMethodName());
+
+    UTIL.getAdmin().createTable(
+      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+    UTIL.waitTableAvailable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+
+    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
+    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
+    moveRegionAndArchiveOldWals(region, rs);
+    waitUntilReplicationDone(100);
+    waitUntilReplicatedToTheCurrentWALFile(srcRs);
+
+    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
+    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
+      ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build());
+    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
+
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    waitUntilReplicationDone(200);
+    checkOrder(200);
+  }
+
+  @Test
+  public void testAddToSerialPeer() throws Exception {
+    ReplicationPeerConfig peerConfig =
+      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
+        .setReplicateAllUserTables(false).setSerial(true).build();
+    UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
+
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    UTIL.getAdmin().createTable(
+      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+    UTIL.waitTableAvailable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+    HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
+    HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
+    moveRegionAndArchiveOldWals(region, rs);
+    waitUntilReplicatedToTheCurrentWALFile(rs);
+    UTIL.getAdmin().disableReplicationPeer(PEER_ID);
+    UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
+      ReplicationPeerConfig.newBuilder(peerConfig)
+        .setTableCFsMap(ImmutableMap.of(tableName, 
Collections.emptyList())).build());
+    UTIL.getAdmin().enableReplicationPeer(PEER_ID);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    waitUntilReplicationDone(100);
+    checkOrder(100);
+  }
+
+  @Test
+  public void testDisabledTable() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    UTIL.getAdmin().createTable(
+      
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+    UTIL.waitTableAvailable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    UTIL.getAdmin().disableTable(tableName);
+    rollAllWALs();
+    addPeer(true);
+    UTIL.getAdmin().enableTable(tableName);
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    waitUntilReplicationDone(100);
+    checkOrder(100);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d5c0e22/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index 37f11f6..94b79d9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -23,211 +23,49 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import 
org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALProvider;
-import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
 
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestSerialReplication {
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestSerialReplication extends SerialReplicationTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestSerialReplication.class);
 
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
-  private static String PEER_ID = "1";
-
-  private static byte[] CF = Bytes.toBytes("CF");
-
-  private static byte[] CQ = Bytes.toBytes("CQ");
-
-  private static FileSystem FS;
-
-  private static Path LOG_DIR;
-
-  private static WALProvider.Writer WRITER;
-
-  public static final class LocalReplicationEndpoint extends 
BaseReplicationEndpoint {
-
-    private static final UUID PEER_UUID = UUID.randomUUID();
-
-    @Override
-    public UUID getPeerUUID() {
-      return PEER_UUID;
-    }
-
-    @Override
-    public boolean replicate(ReplicateContext replicateContext) {
-      synchronized (WRITER) {
-        try {
-          for (Entry entry : replicateContext.getEntries()) {
-            WRITER.append(entry);
-          }
-          WRITER.sync(false);
-        } catch (IOException e) {
-          throw new UncheckedIOException(e);
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public void start() {
-      startAsync();
-    }
-
-    @Override
-    public void stop() {
-      stopAsync();
-    }
-
-    @Override
-    protected void doStart() {
-      notifyStarted();
-    }
-
-    @Override
-    protected void doStop() {
-      notifyStopped();
-    }
-  }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
-    UTIL.startMiniCluster(3);
-    // disable balancer
-    UTIL.getAdmin().balancerSwitch(false, true);
-    LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
-    FS = UTIL.getTestFileSystem();
-    FS.mkdirs(LOG_DIR);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    UTIL.shutdownMiniCluster();
-  }
-
-  @Rule
-  public final TestName name = new TestName();
-
-  private Path logPath;
-
   @Before
   public void setUp() throws IOException, StreamLacksCapabilityException {
-    logPath = new Path(LOG_DIR, name.getMethodName());
-    WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
+    setupWALWriter();
     // add in disable state, so later when enabling it all sources will start 
push together.
-    UTIL.getAdmin().addReplicationPeer(PEER_ID,
-      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
-        
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
-        .build(),
-      false);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    UTIL.getAdmin().removeReplicationPeer(PEER_ID);
-    for (RegionServerThread t : 
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
-      t.getRegionServer().getWalRoller().requestRollAll();
-    }
-    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
-
-      @Override
-      public boolean evaluate() throws Exception {
-        return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
-          .map(t -> 
t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
-      }
-
-      @Override
-      public String explainFailure() throws Exception {
-        return "Log roll has not finished yet";
-      }
-    });
-    for (RegionServerThread t : 
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
-      t.getRegionServer().getWalRoller().requestRollAll();
-    }
-    if (WRITER != null) {
-      WRITER.close();
-      WRITER = null;
-    }
-  }
-
-  private void moveRegion(RegionInfo region, HRegionServer rs) throws 
Exception {
-    UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
-      Bytes.toBytes(rs.getServerName().getServerName()));
-    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
-
-      @Override
-      public boolean evaluate() throws Exception {
-        return rs.getRegion(region.getEncodedName()) != null;
-      }
-
-      @Override
-      public String explainFailure() throws Exception {
-        return region + " is still not on " + rs;
-      }
-    });
+    addPeer(false);
   }
 
   private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) 
throws Exception {
     UTIL.getAdmin().enableReplicationPeer(PEER_ID);
-    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
-
-      @Override
-      public boolean evaluate() throws Exception {
-        try (WAL.Reader reader = WALFactory.createReader(FS, logPath, 
UTIL.getConfiguration())) {
-          int count = 0;
-          while (reader.next() != null) {
-            count++;
-          }
-          return count >= expectedEntries;
-        } catch (IOException e) {
-          return false;
-        }
-      }
-
-      @Override
-      public String explainFailure() throws Exception {
-        return "Not enough entries replicated";
-      }
-    });
+    waitUntilReplicationDone(expectedEntries);
   }
 
   @Test
@@ -251,22 +89,7 @@ public class TestSerialReplication {
       }
     }
     enablePeerAndWaitUntilReplicationDone(200);
-    try (WAL.Reader reader =
-      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, 
UTIL.getConfiguration())) {
-      long seqId = -1L;
-      int count = 0;
-      for (Entry entry;;) {
-        entry = reader.next();
-        if (entry == null) {
-          break;
-        }
-        assertTrue(
-          "Sequence id go backwards from " + seqId + " to " + 
entry.getKey().getSequenceId(),
-          entry.getKey().getSequenceId() >= seqId);
-        count++;
-      }
-      assertEquals(200, count);
-    }
+    checkOrder(200);
   }
 
   @Test

Reply via email to