HBASE-20115 Reimplement serial replication based on the new replication storage 
layer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f29bf1d7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f29bf1d7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f29bf1d7

Branch: refs/heads/HBASE-20046-branch-2
Commit: f29bf1d7786fb9fedb3b735f4a8134b61283e1d4
Parents: 1d11cdb
Author: zhangduo <zhang...@apache.org>
Authored: Mon Mar 5 16:47:03 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |   8 +
 .../apache/hadoop/hbase/MetaTableAccessor.java  | 211 ++++++++++++---
 .../hadoop/hbase/client/TableDescriptor.java    |   8 +-
 .../hbase/client/TableDescriptorBuilder.java    |   9 +
 .../org/apache/hadoop/hbase/HConstants.java     |  12 +
 .../hadoop/hbase/master/MasterFileSystem.java   |  11 +-
 .../master/assignment/AssignmentManager.java    |   3 +-
 .../master/assignment/RegionStateStore.java     |  60 +++--
 .../assignment/SplitTableRegionProcedure.java   |   4 +-
 .../AbstractStateMachineTableProcedure.java     |   8 +-
 .../hbase/regionserver/HRegionFileSystem.java   |  11 +-
 .../NamespaceTableCfWALEntryFilter.java         |   8 +-
 .../hbase/replication/ScopeWALEntryFilter.java  |  34 ++-
 .../RecoveredReplicationSource.java             |   5 +
 .../RecoveredReplicationSourceShipper.java      |  12 +-
 .../RecoveredReplicationSourceWALReader.java    |   9 +-
 .../regionserver/ReplicationSource.java         |   8 +
 .../ReplicationSourceInterface.java             |   7 +
 .../regionserver/ReplicationSourceManager.java  |   4 +-
 .../regionserver/ReplicationSourceShipper.java  |  17 +-
 .../ReplicationSourceWALActionListener.java     |  39 ++-
 .../ReplicationSourceWALReader.java             | 188 +++++---------
 .../regionserver/SerialReplicationChecker.java  | 255 +++++++++++++++++++
 .../replication/regionserver/WALEntryBatch.java | 138 ++++++++++
 .../regionserver/WALEntryStream.java            |  29 +--
 .../hadoop/hbase/util/FSTableDescriptors.java   |   8 +
 .../org/apache/hadoop/hbase/util/FSUtils.java   |  28 +-
 .../org/apache/hadoop/hbase/wal/WALKeyImpl.java |  12 +-
 .../hadoop/hbase/TestMetaTableAccessor.java     |  14 +-
 .../regionserver/TestHRegionFileSystem.java     |  14 +-
 .../regionserver/TestRegionServerMetrics.java   |   4 +-
 .../TestReplicationDroppedTables.java           |   8 +-
 .../replication/TestSerialReplication.java      | 234 +++++++++++++++++
 .../TestReplicationSourceManager.java           |   2 +-
 .../TestSerialReplicationChecker.java           | 176 +++++++++++++
 .../regionserver/TestWALEntryStream.java        |  19 +-
 36 files changed, 1279 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 960b91f..ca0cb91 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -539,6 +539,14 @@ public class HTableDescriptor implements TableDescriptor, 
Comparable<HTableDescr
   }
 
   /**
+   * Return true if there are at least one cf whose replication scope is 
serial.
+   */
+  @Override
+  public boolean hasSerialReplicationScope() {
+    return delegatee.hasSerialReplicationScope();
+  }
+
+  /**
    * Returns the configured replicas per region
    */
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index d6bbf53..109f2d0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -34,6 +34,8 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell.Type;
 import org.apache.hadoop.hbase.client.Connection;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -137,7 +140,7 @@ public class MetaTableAccessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(MetaTableAccessor.class);
   private static final Logger METALOG = 
LoggerFactory.getLogger("org.apache.hadoop.hbase.META");
 
-  static final byte [] META_REGION_PREFIX;
+  private static final byte[] META_REGION_PREFIX;
   static {
     // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
     // FIRST_META_REGIONINFO == 'hbase:meta,,1'.  META_REGION_PREFIX == 
'hbase:meta,'
@@ -147,6 +150,11 @@ public class MetaTableAccessor {
       META_REGION_PREFIX, 0, len);
   }
 
+  private static final byte[] REPLICATION_PARENT_QUALIFIER = 
Bytes.toBytes("parent");
+
+  private static final String REPLICATION_PARENT_SEPARATOR = "|";
+
+  private static final String REPLICATION_PARENT_SEPARATOR_REGEX = "\\|";
   /**
    * Lists all of the table regions currently in META.
    * Deprecated, keep there until some test use this.
@@ -838,7 +846,7 @@ public class MetaTableAccessor {
 
   /**
    * Returns the column qualifier for serialized region state
-   * @return HConstants.TABLE_STATE_QUALIFIER
+   * @return HConstants.STATE_QUALIFIER
    */
   private static byte[] getRegionStateColumn() {
     return HConstants.STATE_QUALIFIER;
@@ -1266,7 +1274,6 @@ public class MetaTableAccessor {
   ////////////////////////
   // Editing operations //
   ////////////////////////
-
   /**
    * Generates and returns a Put containing the region into for the catalog 
table
    */
@@ -1438,7 +1445,7 @@ public class MetaTableAccessor {
    * Adds daughter region infos to hbase:meta row for the specified region. 
Note that this does not
    * add its daughter's as different rows, but adds information about the 
daughters in the same row
    * as the parent. Use
-   * {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, 
ServerName,int)}
+   * {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, 
ServerName, int)}
    * if you want to do that.
    * @param connection connection we're using
    * @param regionInfo RegionInfo of parent region
@@ -1464,7 +1471,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its 
daughters. Note that this
    * does not add its daughter's as different rows, but adds information about 
the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, 
ServerName, int)}
+   * {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, 
ServerName, int)}
    * if you want to do that.
    * @param connection connection we're using
    * @param regionInfo region information
@@ -1519,20 +1526,37 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Merge the two regions into one in an atomic operation. Deletes the two
-   * merging regions in hbase:meta and adds the merged region with the 
information of
-   * two merging regions.
+   * Merge the two regions into one in an atomic operation. Deletes the two 
merging regions in
+   * hbase:meta and adds the merged region with the information of two merging 
regions.
    * @param connection connection we're using
    * @param mergedRegion the merged region
    * @param regionA merge parent region A
+   * @param regionAOpenSeqNum the next open sequence id for region A, used by 
serial replication. -1
+   *          if not necessary.
    * @param regionB merge parent region B
+   * @param regionBOpenSeqNum the next open sequence id for region B, used by 
serial replication. -1
+   *          if not necessary.
    * @param sn the location of the region
    */
-  public static void mergeRegions(final Connection connection, RegionInfo 
mergedRegion,
-      RegionInfo regionA, RegionInfo regionB, ServerName sn, int 
regionReplication)
-      throws IOException {
+  public static void mergeRegions(Connection connection, RegionInfo 
mergedRegion,
+      RegionInfo regionA, long regionAOpenSeqNum, RegionInfo regionB, long 
regionBOpenSeqNum,
+      ServerName sn, int regionReplication) throws IOException {
     try (Table meta = getMetaHTable(connection)) {
       long time = EnvironmentEdgeManager.currentTime();
+      List<Mutation> mutations = new ArrayList<>();
+
+      List<RegionInfo> replicationParents = new ArrayList<>(2);
+      // Deletes for merging regions
+      mutations.add(makeDeleteFromRegionInfo(regionA, time));
+      if (regionAOpenSeqNum > 0) {
+        mutations.add(makePutForReplicationBarrier(regionA, regionAOpenSeqNum, 
time));
+        replicationParents.add(regionA);
+      }
+      mutations.add(makeDeleteFromRegionInfo(regionB, time));
+      if (regionBOpenSeqNum > 0) {
+        mutations.add(makePutForReplicationBarrier(regionB, regionBOpenSeqNum, 
time));
+        replicationParents.add(regionB);
+      }
 
       // Put for parent
       Put putOfMerged = makePutFromRegionInfo(mergedRegion, time);
@@ -1552,18 +1576,13 @@ public class MetaTableAccessor {
               .setType(Type.Put)
               .setValue(RegionInfo.toByteArray(regionB))
               .build());
-
       // Set initial state to CLOSED
       // NOTE: If initial state is not set to CLOSED then merged region gets 
added with the
       // default OFFLINE state. If Master gets restarted after this step, 
start up sequence of
       // master tries to assign this offline region. This is followed by 
re-assignments of the
       // merged region from resumed {@link MergeTableRegionsProcedure}
       addRegionStateToPut(putOfMerged, RegionState.State.CLOSED);
-
-      // Deletes for merging regions
-      Delete deleteA = makeDeleteFromRegionInfo(regionA, time);
-      Delete deleteB = makeDeleteFromRegionInfo(regionB, time);
-
+      mutations.add(putOfMerged);
       // The merged is a new region, openSeqNum = 1 is fine. ServerName may be 
null
       // if crash after merge happened but before we got to here.. means 
in-memory
       // locations of offlined merged, now-closed, regions is lost. Should be 
ok. We
@@ -1577,26 +1596,30 @@ public class MetaTableAccessor {
       for (int i = 1; i < regionReplication; i++) {
         addEmptyLocation(putOfMerged, i);
       }
-
-      byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
-        + HConstants.DELIMITER);
-      multiMutate(connection, meta, tableRow, putOfMerged, deleteA, deleteB);
+      // add parent reference for serial replication
+      if (!replicationParents.isEmpty()) {
+        addReplicationParent(putOfMerged, replicationParents);
+      }
+      byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + 
HConstants.DELIMITER);
+      multiMutate(connection, meta, tableRow, mutations);
     }
   }
 
   /**
-   * Splits the region into two in an atomic operation. Offlines the parent
-   * region with the information that it is split into two, and also adds
-   * the daughter regions. Does not add the location information to the 
daughter
-   * regions since they are not open yet.
+   * Splits the region into two in an atomic operation. Offlines the parent 
region with the
+   * information that it is split into two, and also adds the daughter 
regions. Does not add the
+   * location information to the daughter regions since they are not open yet.
    * @param connection connection we're using
    * @param parent the parent region which is split
+   * @param parentOpenSeqNum the next open sequence id for parent region, used 
by serial
+   *          replication. -1 if not necessary.
    * @param splitA Split daughter region A
    * @param splitB Split daughter region B
    * @param sn the location of the region
    */
-  public static void splitRegion(final Connection connection, RegionInfo 
parent, RegionInfo splitA,
-      RegionInfo splitB, ServerName sn, int regionReplication) throws 
IOException {
+  public static void splitRegion(Connection connection, RegionInfo parent, 
long parentOpenSeqNum,
+      RegionInfo splitA, RegionInfo splitB, ServerName sn, int 
regionReplication)
+      throws IOException {
     try (Table meta = getMetaHTable(connection)) {
       long time = EnvironmentEdgeManager.currentTime();
       // Put for parent
@@ -1608,7 +1631,11 @@ public class MetaTableAccessor {
       // Puts for daughters
       Put putA = makePutFromRegionInfo(splitA, time);
       Put putB = makePutFromRegionInfo(splitB, time);
-
+      if (parentOpenSeqNum > 0) {
+        addReplicationBarrier(putParent, parentOpenSeqNum);
+        addReplicationParent(putA, Collections.singletonList(parent));
+        addReplicationParent(putB, Collections.singletonList(parent));
+      }
       // Set initial state to CLOSED
       // NOTE: If initial state is not set to CLOSED then daughter regions get 
added with the
       // default OFFLINE state. If Master gets restarted after this step, 
start up sequence of
@@ -1668,20 +1695,15 @@ public class MetaTableAccessor {
   }
 
   private static void multiMutate(Connection connection, Table table, byte[] 
row,
-      Mutation... mutations)
-  throws IOException {
+      Mutation... mutations) throws IOException {
     multiMutate(connection, table, row, Arrays.asList(mutations));
   }
 
   /**
    * Performs an atomic multi-mutate operation against the given table.
    */
-  // Used by the RSGroup Coprocessor Endpoint. It had a copy/paste of the 
below. Need to reveal
-  // this facility for CPEP use or at least those CPEPs that are on their way 
to becoming part of
-  // core as is the intent for RSGroup eventually.
-  public static void multiMutate(Connection connection, final Table table, 
byte[] row,
-      final List<Mutation> mutations)
-  throws IOException {
+  private static void multiMutate(Connection connection, final Table table, 
byte[] row,
+      final List<Mutation> mutations) throws IOException {
     debugLogMutations(mutations);
     // TODO: Need rollback!!!!
     // TODO: Need Retry!!!
@@ -1782,9 +1804,7 @@ public class MetaTableAccessor {
    * @param regionInfo region to be deleted from META
    * @throws IOException
    */
-  public static void deleteRegion(Connection connection,
-                                  RegionInfo regionInfo)
-    throws IOException {
+  public static void deleteRegion(Connection connection, RegionInfo 
regionInfo) throws IOException {
     long time = EnvironmentEdgeManager.currentTime();
     Delete delete = new Delete(regionInfo.getRegionName());
     delete.addFamily(getCatalogFamily(), time);
@@ -1901,6 +1921,33 @@ public class MetaTableAccessor {
               .build());
   }
 
+  private static void addReplicationParent(Put put, List<RegionInfo> parents) 
throws IOException {
+    byte[] value = 
parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica)
+      .map(RegionInfo::getRegionNameAsString).collect(Collectors
+        .collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), 
Bytes::toBytes));
+    
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
+      
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
+      
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
+  }
+
+  private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long 
openSeqNum, long ts)
+      throws IOException {
+    Put put = new Put(regionInfo.getRegionName(), ts);
+    addReplicationBarrier(put, openSeqNum);
+    return put;
+  }
+
+  public static void addReplicationBarrier(Put put, long openSeqNum) throws 
IOException {
+    put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+      .setRow(put.getRow())
+      .setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
+      .setQualifier(HConstants.SEQNUM_QUALIFIER)
+      .setTimestamp(put.getTimeStamp())
+      .setType(Type.Put)
+      .setValue(Bytes.toBytes(openSeqNum))
+      .build());
+  }
+
   private static Put addEmptyLocation(Put p, int replicaId) throws IOException 
{
     CellBuilder builder = 
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
     return p.add(builder.clear()
@@ -1926,6 +1973,92 @@ public class MetaTableAccessor {
                 .build());
   }
 
+  public static final class ReplicationBarrierResult {
+    private final long[] barriers;
+    private final RegionState.State state;
+    private final List<byte[]> parentRegionNames;
+
+    public ReplicationBarrierResult(long[] barriers, State state, List<byte[]> 
parentRegionNames) {
+      this.barriers = barriers;
+      this.state = state;
+      this.parentRegionNames = parentRegionNames;
+    }
+
+    public long[] getBarriers() {
+      return barriers;
+    }
+
+    public RegionState.State getState() {
+      return state;
+    }
+
+    public List<byte[]> getParentRegionNames() {
+      return parentRegionNames;
+    }
+  }
+
+  private static long getReplicationBarrier(Cell c) {
+    return Bytes.toLong(c.getValueArray(), c.getValueOffset(), 
c.getValueLength());
+  }
+
+  private static long[] getReplicationBarriers(Result result) {
+    return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER)
+      
.stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
+  }
+
+  private static ReplicationBarrierResult getReplicationBarrierResult(Result 
result) {
+    long[] barriers = getReplicationBarriers(result);
+    byte[] stateBytes = result.getValue(getCatalogFamily(), 
getRegionStateColumn());
+    RegionState.State state =
+      stateBytes != null ? 
RegionState.State.valueOf(Bytes.toString(stateBytes)) : null;
+    byte[] parentRegionsBytes =
+      result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, 
REPLICATION_PARENT_QUALIFIER);
+    List<byte[]> parentRegionNames =
+      parentRegionsBytes != null
+        ? 
Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX))
+          .map(Bytes::toBytes).collect(Collectors.toList())
+        : Collections.emptyList();
+    return new ReplicationBarrierResult(barriers, state, parentRegionNames);
+  }
+
+  public static ReplicationBarrierResult 
getReplicationBarrierResult(Connection conn,
+      TableName tableName, byte[] row, byte[] encodedRegionName) throws 
IOException {
+    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, 
HConstants.NINES, false);
+    byte[] metaStopKey =
+      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", 
false);
+    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey)
+      .addColumn(getCatalogFamily(), getRegionStateColumn())
+      
.addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true)
+      .setCaching(10);
+    try (Table table = getMetaHTable(conn); ResultScanner scanner = 
table.getScanner(scan)) {
+      for (Result result;;) {
+        result = scanner.next();
+        if (result == null) {
+          return new ReplicationBarrierResult(new long[0], null, 
Collections.emptyList());
+        }
+        byte[] regionName = result.getRow();
+        // TODO: we may look up a region which has already been split or 
merged so we need to check
+        // whether the encoded name matches. Need to find a way to quit 
earlier when there is no
+        // record for the given region, for now it will scan to the end of the 
table.
+        if (!Bytes.equals(encodedRegionName,
+          Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))) {
+          continue;
+        }
+        return getReplicationBarrierResult(result);
+      }
+    }
+  }
+
+  public static long[] getReplicationBarrier(Connection conn, byte[] 
regionName)
+      throws IOException {
+    try (Table table = getMetaHTable(conn)) {
+      Result result = table.get(new Get(regionName)
+        .addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER)
+        .readAllVersions());
+      return getReplicationBarriers(result);
+    }
+  }
+
   private static void debugLogMutations(List<? extends Mutation> mutations) 
throws IOException {
     if (!METALOG.isDebugEnabled()) {
       return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index 9456fd4..13ad0e2 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -245,6 +245,11 @@ public interface TableDescriptor {
   boolean hasRegionMemStoreReplication();
 
   /**
+   * @return true if there are at least one cf whose replication scope is 
serial.
+   */
+  boolean hasSerialReplicationScope();
+
+  /**
    * Check if the compaction enable flag of the table is true. If flag is false
    * then no minor/major compactions will be done in real.
    *
@@ -292,7 +297,8 @@ public interface TableDescriptor {
     boolean hasDisabled = false;
 
     for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
-      if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
+      if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL &&
+        cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
         hasDisabled = true;
       } else {
         hasEnabled = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 02901ac..2d6bfaf 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -1128,6 +1128,15 @@ public class TableDescriptorBuilder {
     }
 
     /**
+     * Return true if there are at least one cf whose replication scope is 
serial.
+     */
+    @Override
+    public boolean hasSerialReplicationScope() {
+      return families.values().stream()
+        .anyMatch(column -> column.getScope() == 
HConstants.REPLICATION_SCOPE_SERIAL);
+    }
+
+    /**
      * Returns the configured replicas per region
      */
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ac56ce5..edf8f9c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -535,6 +535,12 @@ public final class HConstants {
   /** The serialized table state qualifier */
   public static final byte[] TABLE_STATE_QUALIFIER = Bytes.toBytes("state");
 
+  /** The replication barrier family as a string*/
+  public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
+
+  /** The replication barrier family */
+  public static final byte[] REPLICATION_BARRIER_FAMILY =
+      Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
 
   /**
    * The meta table version column qualifier.
@@ -676,6 +682,12 @@ public final class HConstants {
   public static final int REPLICATION_SCOPE_GLOBAL = 1;
 
   /**
+   * Scope tag for serially scoped data
+   * This data will be replicated to all peers by the order of sequence id.
+   */
+  public static final int REPLICATION_SCOPE_SERIAL = 2;
+
+  /**
    * Default cluster ID, cannot be used to identify a cluster so a key with
    * this value means it wasn't meant for replication.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index a37fd4e..864be02 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -208,7 +208,16 @@ public class MasterFileSystem {
   /**
    * @return HBase root log dir.
    */
-  public Path getWALRootDir() { return this.walRootDir; }
+  public Path getWALRootDir() {
+    return this.walRootDir;
+  }
+
+  /**
+   * @return the directory for a give {@code region}.
+   */
+  public Path getRegionDir(RegionInfo region) {
+    return FSUtils.getRegionDir(FSUtils.getTableDir(getRootDir(), 
region.getTable()), region);
+  }
 
   /**
    * @return HBase temp dir.

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 754731b..0e47065 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -1571,8 +1571,7 @@ public class AssignmentManager implements ServerListener {
   }
 
   public void markRegionAsSplit(final RegionInfo parent, final ServerName 
serverName,
-      final RegionInfo daughterA, final RegionInfo daughterB)
-  throws IOException {
+      final RegionInfo daughterA, final RegionInfo daughterB) throws 
IOException {
     // Update hbase:meta. Parent will be marked offline and split up in 
hbase:meta.
     // The parent stays in regionStates until cleared when removed by 
CatalogJanitor.
     // Update its state in regionStates to it shows as offline and split when 
read

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 1eaa4c6..c98a2d1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.master.assignment;
 
 import java.io.IOException;
@@ -36,11 +34,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -163,6 +163,11 @@ public class RegionStateStore {
       Preconditions.checkArgument(state == State.OPEN && regionLocation != 
null,
           "Open region should be on a server");
       MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, 
replicaId);
+      // only update replication barrier for default replica
+      if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
+        hasSerialReplicationScope(regionInfo.getTable())) {
+        MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
+      }
       info.append(", openSeqNum=").append(openSeqNum);
       info.append(", regionLocation=").append(regionLocation);
     } else if (regionLocation != null && !regionLocation.equals(lastHost)) {
@@ -205,24 +210,41 @@ public class RegionStateStore {
     }
   }
 
+  private long getOpenSeqNumForParentRegion(RegionInfo region) throws 
IOException {
+    MasterFileSystem mfs = master.getMasterFileSystem();
+    long maxSeqId =
+        WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), 
mfs.getRegionDir(region));
+    return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
+  }
+
   // 
============================================================================================
   //  Update Region Splitting State helpers
   // 
============================================================================================
-  public void splitRegion(final RegionInfo parent, final RegionInfo hriA,
-      final RegionInfo hriB, final ServerName serverName)  throws IOException {
-    final TableDescriptor htd = getTableDescriptor(parent.getTable());
-    MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, 
serverName,
-        getRegionReplication(htd));
+  public void splitRegion(RegionInfo parent, RegionInfo hriA, RegionInfo hriB,
+      ServerName serverName) throws IOException {
+    TableDescriptor htd = getTableDescriptor(parent.getTable());
+    long parentOpenSeqNum = HConstants.NO_SEQNUM;
+    if (htd.hasSerialReplicationScope()) {
+      parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
+    }
+    MetaTableAccessor.splitRegion(master.getConnection(), parent, 
parentOpenSeqNum, hriA, hriB,
+      serverName, getRegionReplication(htd));
   }
 
   // 
============================================================================================
   //  Update Region Merging State helpers
   // 
============================================================================================
-  public void mergeRegions(final RegionInfo parent, final RegionInfo hriA, 
final RegionInfo hriB,
-      final ServerName serverName) throws IOException {
-    final TableDescriptor htd = getTableDescriptor(parent.getTable());
-    MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, 
serverName,
-      getRegionReplication(htd));
+  public void mergeRegions(RegionInfo child, RegionInfo hriA, RegionInfo hriB,
+      ServerName serverName) throws IOException {
+    TableDescriptor htd = getTableDescriptor(child.getTable());
+    long regionAOpenSeqNum = -1L;
+    long regionBOpenSeqNum = -1L;
+    if (htd.hasSerialReplicationScope()) {
+      regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA);
+      regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB);
+    }
+    MetaTableAccessor.mergeRegions(master.getConnection(), child, hriA, 
regionAOpenSeqNum, hriB,
+      regionBOpenSeqNum, serverName, getRegionReplication(htd));
   }
 
   // 
============================================================================================
@@ -239,11 +261,19 @@ public class RegionStateStore {
   // ==========================================================================
   //  Table Descriptors helpers
   // ==========================================================================
-  private int getRegionReplication(final TableDescriptor htd) {
-    return (htd != null) ? htd.getRegionReplication() : 1;
+  private boolean hasSerialReplicationScope(TableName tableName) throws 
IOException {
+    return hasSerialReplicationScope(getTableDescriptor(tableName));
+  }
+
+  private boolean hasSerialReplicationScope(TableDescriptor htd) {
+    return htd != null ? htd.hasSerialReplicationScope() : false;
+  }
+
+  private int getRegionReplication(TableDescriptor htd) {
+    return htd != null ? htd.getRegionReplication() : 1;
   }
 
-  private TableDescriptor getTableDescriptor(final TableName tableName) throws 
IOException {
+  private TableDescriptor getTableDescriptor(TableName tableName) throws 
IOException {
     return master.getTableDescriptors().get(tableName);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 994983f..341affb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -253,7 +253,7 @@ public class SplitTableRegionProcedure
           setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META);
           break;
         case SPLIT_TABLE_REGION_UPDATE_META:
-          updateMetaForDaughterRegions(env);
+          updateMeta(env);
           
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META);
           break;
         case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META:
@@ -762,7 +762,7 @@ public class SplitTableRegionProcedure
    * Add daughter regions to META
    * @param env MasterProcedureEnv
    */
-  private void updateMetaForDaughterRegions(final MasterProcedureEnv env) 
throws IOException {
+  private void updateMeta(final MasterProcedureEnv env) throws IOException {
     env.getAssignmentManager().markRegionAsSplit(getParentRegion(), 
getParentRegionServerName(env),
       daughter_1_RI, daughter_2_RI);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
index 60436c2..d296828 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
@@ -31,15 +30,12 @@ import 
org.apache.hadoop.hbase.client.DoNotRetryRegionException;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionOfflineException;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.TableStateManager;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -131,9 +127,7 @@ public abstract class 
AbstractStateMachineTableProcedure<TState>
   }
 
   protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) 
throws IOException {
-    MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
-    Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), getTableName());
-    return new Path(tableDir, 
ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+    return env.getMasterServices().getMasterFileSystem().getRegionDir(region);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 37a4309..9666aa5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.FileNotFoundException;
@@ -25,6 +23,7 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
@@ -84,6 +83,7 @@ public class HRegionFileSystem {
   private final Configuration conf;
   private final Path tableDir;
   private final FileSystem fs;
+  private final Path regionDir;
 
   /**
    * In order to handle NN connectivity hiccups, one need to retry 
non-idempotent operation at the
@@ -105,9 +105,10 @@ public class HRegionFileSystem {
       final RegionInfo regionInfo) {
     this.fs = fs;
     this.conf = conf;
-    this.tableDir = tableDir;
-    this.regionInfo = regionInfo;
+    this.tableDir = Objects.requireNonNull(tableDir, "tableDir is null");
+    this.regionInfo = Objects.requireNonNull(regionInfo, "regionInfo is null");
     this.regionInfoForFs = 
ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo);
+    this.regionDir = FSUtils.getRegionDir(tableDir, regionInfo);
     this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
       DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
     this.baseSleepBeforeRetries = 
conf.getInt("hdfs.client.sleep.before.retries",
@@ -135,7 +136,7 @@ public class HRegionFileSystem {
 
   /** @return {@link Path} to the region directory. */
   public Path getRegionDir() {
-    return new Path(this.tableDir, this.regionInfoForFs.getEncodedName());
+    return regionDir;
   }
 
   // 
===========================================================================

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index ad6e5a6..08c9f37 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -21,16 +21,13 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Filter a WAL Entry by the peer config: replicate_all flag, namespaces 
config, table-cfs config,
@@ -47,7 +44,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 @InterfaceAudience.Private
 public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, 
WALCellFilter {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(NamespaceTableCfWALEntryFilter.class);
   private final ReplicationPeer peer;
   private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 5cde40c..6a2fbcf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -15,17 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.replication;
 
 import java.util.NavigableMap;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
 
@@ -35,7 +33,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.base.Predicate;
 @InterfaceAudience.Private
 public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
 
-  BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
+  private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
 
   @Override
   public Entry filter(Entry entry) {
@@ -49,21 +47,21 @@ public class ScopeWALEntryFilter implements WALEntryFilter, 
WALCellFilter {
   @Override
   public Cell filterCell(Entry entry, Cell cell) {
     final NavigableMap<byte[], Integer> scopes = 
entry.getKey().getReplicationScopes();
-      // The scope will be null or empty if
-      // there's nothing to replicate in that WALEdit
-      byte[] fam = CellUtil.cloneFamily(cell);
-      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, 
WALEdit.BULK_LOAD)) {
-        cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
-          @Override
-          public boolean apply(byte[] fam) {
-            return !scopes.containsKey(fam) || scopes.get(fam) == 
HConstants.REPLICATION_SCOPE_LOCAL;
-          }
-        });
-      } else {
-        if (!scopes.containsKey(fam) || scopes.get(fam) == 
HConstants.REPLICATION_SCOPE_LOCAL) {
-          return null;
+    // The scope will be null or empty if
+    // there's nothing to replicate in that WALEdit
+    byte[] fam = CellUtil.cloneFamily(cell);
+    if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+      cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+        @Override
+        public boolean apply(byte[] fam) {
+          return !scopes.containsKey(fam) || scopes.get(fam) == 
HConstants.REPLICATION_SCOPE_LOCAL;
         }
+      });
+    } else {
+      if (!scopes.containsKey(fam) || scopes.get(fam) == 
HConstants.REPLICATION_SCOPE_LOCAL) {
+        return null;
       }
+    }
     return cell;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 3cae0f2..d9506c0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -194,4 +194,9 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   public ServerName getServerWALsBelongTo() {
     return this.replicationQueueInfo.getDeadRegionServers().get(0);
   }
+
+  @Override
+  public boolean isRecovered() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 38bbb48..9c36497 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,12 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.concurrent.PriorityBlockingQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -127,13 +124,6 @@ public class RecoveredReplicationSourceShipper extends 
ReplicationSourceShipper
     return startPosition;
   }
 
-  @Override
-  protected void updateLogPosition(long lastReadPosition) {
-    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, 
source.getQueueId(),
-      lastReadPosition, true);
-    lastLoggedPosition = lastReadPosition;
-  }
-
   private void terminate(String reason, Exception cause) {
     if (cause == null) {
       LOG.info("Closing worker for wal group " + this.walGroupId + " because: 
" + reason);

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
index 0af3f5c..114f139 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
@@ -35,8 +35,9 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class RecoveredReplicationSourceWALReader extends 
ReplicationSourceWALReader {
+
   private static final Logger LOG =
-      LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
+    LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
 
   public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
       PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter 
filter,
@@ -45,13 +46,11 @@ public class RecoveredReplicationSourceWALReader extends 
ReplicationSourceWALRea
   }
 
   @Override
-  protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path 
currentPath)
-      throws InterruptedException {
+  protected void handleEmptyWALEntryBatch(Path currentPath) throws 
InterruptedException {
     LOG.trace("Didn't read any new entries from WAL");
     // we're done with queue recovery, shut ourself down
     setReaderRunning(false);
     // shuts down shipper thread immediately
-    entryBatchQueue.put(batch != null ? batch
-        : new WALEntryBatch(replicationBatchCountCapacity, currentPath));
+    entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, 
currentPath));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 73d4652..3b65b25 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -607,4 +607,12 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   public ServerName getServerWALsBelongTo() {
     return server.getServerName();
   }
+
+  Server getServer() {
+    return server;
+  }
+
+  ReplicationQueueStorage getQueueStorage() {
+    return queueStorage;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index d7cf9a3..090b465 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -166,4 +166,11 @@ public interface ReplicationSourceInterface {
    * @return the server name which all WALs belong to
    */
   ServerName getServerWALsBelongTo();
+
+  /**
+   * @return whether this is a replication source for recovery.
+   */
+  default boolean isRecovered() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index eb9dba2..06fe977 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -480,10 +480,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param queueRecovered indicates if this queue comes from another region 
server
    */
   public void logPositionAndCleanOldLogs(Path log, String queueId, long 
position,
-      boolean queueRecovered) {
+      Map<String, Long> lastSeqIds, boolean queueRecovered) {
     String fileName = log.getName();
     abortWhenFail(() -> 
this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
-      position, null));
+      position, lastSeqIds));
     cleanOldLogs(fileName, queueId, queueRecovered);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 959f676..d207d77 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,13 +19,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -128,7 +127,7 @@ public class ReplicationSourceShipper extends Thread {
     int sleepMultiplier = 0;
     if (entries.isEmpty()) {
       if (lastLoggedPosition != lastReadPosition) {
-        updateLogPosition(lastReadPosition);
+        updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
         // if there was nothing to ship and it's not an error
         // set "ageOfLastShippedOp" to <now> to indicate that we're current
         
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
@@ -168,13 +167,13 @@ public class ReplicationSourceShipper extends Thread {
         }
 
         if (this.lastLoggedPosition != lastReadPosition) {
-          //Clean up hfile references
+          // Clean up hfile references
           int size = entries.size();
           for (int i = 0; i < size; i++) {
             cleanUpHFileRefs(entries.get(i).getEdit());
           }
-          //Log and clean up WAL logs
-          updateLogPosition(lastReadPosition);
+          // Log and clean up WAL logs
+          updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
         }
 
         source.postShipEdits(entries, currentSize);
@@ -222,9 +221,9 @@ public class ReplicationSourceShipper extends Thread {
     }
   }
 
-  protected void updateLogPosition(long lastReadPosition) {
+  private void updateLogPosition(long lastReadPosition, Map<String, Long> 
lastSeqIds) {
     source.getSourceManager().logPositionAndCleanOldLogs(currentPath, 
source.getQueueId(),
-      lastReadPosition, false);
+      lastReadPosition, lastSeqIds, source.isRecovered());
     lastLoggedPosition = lastReadPosition;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
index eb12614..95fc6a0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -31,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
 /**
  * Used to receive new wals.
  */
@@ -68,31 +65,25 @@ class ReplicationSourceWALActionListener implements 
WALActionsListener {
    * compaction WAL edits and if the scope is local.
    * @param logKey Key that may get scoped according to its edits
    * @param logEdit Edits used to lookup the scopes
-   * @throws IOException If failed to parse the WALEdit
    */
   @VisibleForTesting
-  static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration 
conf) throws IOException {
-    boolean replicationForBulkLoadEnabled =
-        ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
-    boolean foundOtherEdits = false;
-    for (Cell cell : logEdit.getCells()) {
-      if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
-        foundOtherEdits = true;
-        break;
-      }
+  static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration 
conf) {
+    // For bulk load replication we need meta family to know the file we want 
to replicate.
+    if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
+      return;
     }
-
-    if (!foundOtherEdits && logEdit.getCells().size() > 0) {
-      WALProtos.RegionEventDescriptor maybeEvent =
-          WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
-      if (maybeEvent != null &&
-        (maybeEvent.getEventType() == 
WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
-        // In serially replication, we use scopes when reading close marker.
-        foundOtherEdits = true;
-      }
+    WALKeyImpl keyImpl = (WALKeyImpl) logKey;
+    // For serial replication we need to count all the sequence ids even for 
markers, so here we
+    // always need to retain the replication scopes to let the replication wal 
reader to know that
+    // we need serial replication. The ScopeWALEntryFilter will help filtering 
out the cell for
+    // WALEdit.METAFAMILY.
+    if (keyImpl.hasSerialReplicationScope()) {
+      return;
     }
-    if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || 
logEdit.isReplay()) {
-      ((WALKeyImpl) logKey).serializeReplicationScope(false);
+    // For replay, or if all the cells are markers, do not need to store 
replication scope.
+    if (logEdit.isReplay() ||
+      logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, 
WALEdit.METAFAMILY))) {
+      keyImpl.clearReplicationScope();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 579d20f..fe87aec 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -46,8 +46,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
 /**
- * Reads and filters WAL entries, groups the filtered entries into batches, 
and puts the batches onto a queue
- *
+ * Reads and filters WAL entries, groups the filtered entries into batches, 
and puts the batches
+ * onto a queue
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -77,6 +77,8 @@ public class ReplicationSourceWALReader extends Thread {
   private AtomicLong totalBufferUsed;
   private long totalBufferQuota;
 
+  private final SerialReplicationChecker serialReplicationChecker;
+
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a 
given queue, batches the
    * entries, and puts them on a batch queue.
@@ -111,6 +113,7 @@ public class ReplicationSourceWALReader extends Thread {
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 
minutes @ 1 sec per
     this.eofAutoRecovery = 
conf.getBoolean("replication.source.eof.autorecovery", false);
     this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+    this.serialReplicationChecker = new SerialReplicationChecker(conf, source);
     LOG.info("peerClusterZnode=" + source.getQueueId()
         + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
         + " inited, replicationBatchSizeCapacity=" + 
replicationBatchSizeCapacity
@@ -131,15 +134,14 @@ public class ReplicationSourceWALReader extends Thread {
             continue;
           }
           WALEntryBatch batch = readWALEntries(entryStream);
-          if (batch != null && batch.getNbEntries() > 0) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
-                batch.getNbEntries()));
-            }
+          if (batch != null) {
+            // need to propagate the batch even it has no entries since it may 
carry the last
+            // sequence id information for serial replication.
+            LOG.trace("Read {} WAL entries eligible for replication", 
batch.getNbEntries());
             entryBatchQueue.put(batch);
             sleepMultiplier = 1;
           } else { // got no entries and didn't advance position in WAL
-            handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
+            handleEmptyWALEntryBatch(entryStream.getCurrentPath());
           }
           currentPosition = entryStream.getPosition();
           entryStream.reset(); // reuse stream
@@ -160,34 +162,66 @@ public class ReplicationSourceWALReader extends Thread {
     }
   }
 
-  private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws 
IOException {
-    WALEntryBatch batch = null;
-    while (entryStream.hasNext()) {
-      if (batch == null) {
-        batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
+  private WALEntryBatch readWALEntries(WALEntryStream entryStream)
+      throws IOException, InterruptedException {
+    if (!entryStream.hasNext()) {
+      return null;
+    }
+    WALEntryBatch batch =
+      new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
+    do {
+      Entry entry = entryStream.peek();
+      batch.setLastWalPosition(entryStream.getPosition());
+      boolean hasSerialReplicationScope = 
entry.getKey().hasSerialReplicationScope();
+      // Used to locate the region record in meta table. In WAL we only have 
the table name and
+      // encoded region name which can not be mapping to region name without 
scanning all the
+      // records for a table, so we need a start key, just like what we have 
done at client side
+      // when locating a region. For the markers, we will use the start key of 
the region as the row
+      // key for the edit. And we need to do this before filtering since all 
the cells may be
+      // filtered out, especially that for the markers.
+      Cell firstCellInEdit = null;
+      if (hasSerialReplicationScope) {
+        assert !entry.getEdit().isEmpty() : "should not write empty edits";
+        firstCellInEdit = entry.getEdit().getCells().get(0);
       }
-      Entry entry = entryStream.next();
       entry = filterEntry(entry);
       if (entry != null) {
+        if (hasSerialReplicationScope) {
+          if (!serialReplicationChecker.canPush(entry, firstCellInEdit)) {
+            if (batch.getNbEntries() > 0) {
+              // we have something that can push, break
+              break;
+            } else {
+              serialReplicationChecker.waitUntilCanPush(entry, 
firstCellInEdit);
+            }
+          }
+          // arrive here means we can push the entry, record the last sequence 
id
+          
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
+            entry.getKey().getSequenceId());
+        }
+        // actually remove the entry.
+        entryStream.next();
         WALEdit edit = entry.getEdit();
         if (edit != null && !edit.isEmpty()) {
           long entrySize = getEntrySize(entry);
           batch.addEntry(entry);
-          updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+          updateBatchStats(batch, entry, entrySize);
           boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
           // Stop if too many entries or too big
-          if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-              || batch.getNbEntries() >= replicationBatchCountCapacity) {
+          if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity ||
+            batch.getNbEntries() >= replicationBatchCountCapacity) {
             break;
           }
         }
+      } else {
+        // actually remove the entry.
+        entryStream.next();
       }
-    }
+    } while (entryStream.hasNext());
     return batch;
   }
 
-  protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path 
currentPath)
-      throws InterruptedException {
+  protected void handleEmptyWALEntryBatch(Path currentPath) throws 
InterruptedException {
     LOG.trace("Didn't read any new entries from WAL");
     Thread.sleep(sleepForRetries);
   }
@@ -214,7 +248,7 @@ public class ReplicationSourceWALReader extends Thread {
     // if we've read some WAL entries, get the Path we read from
     WALEntryBatch batchQueueHead = entryBatchQueue.peek();
     if (batchQueueHead != null) {
-      return batchQueueHead.lastWalPath;
+      return batchQueueHead.getLastWalPath();
     }
     // otherwise, we must be currently reading from the head of the log queue
     return logQueue.peek();
@@ -253,15 +287,12 @@ public class ReplicationSourceWALReader extends Thread {
     return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
   }
 
-  private void updateBatchStats(WALEntryBatch batch, Entry entry, long 
entryPosition, long entrySize) {
+  private void updateBatchStats(WALEntryBatch batch, Entry entry, long 
entrySize) {
     WALEdit edit = entry.getEdit();
-    if (edit != null && !edit.isEmpty()) {
-      batch.incrementHeapSize(entrySize);
-      Pair<Integer, Integer> nbRowsAndHFiles = 
countDistinctRowKeysAndHFiles(edit);
-      batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
-      batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
-    }
-    batch.lastWalPosition = entryPosition;
+    batch.incrementHeapSize(entrySize);
+    Pair<Integer, Integer> nbRowsAndHFiles = 
countDistinctRowKeysAndHFiles(edit);
+    batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
+    batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
   }
 
   /**
@@ -355,101 +386,4 @@ public class ReplicationSourceWALReader extends Thread {
   public void setReaderRunning(boolean readerRunning) {
     this.isReaderRunning = readerRunning;
   }
-
-  /**
-   * Holds a batch of WAL entries to replicate, along with some statistics
-   *
-   */
-  static class WALEntryBatch {
-    private List<Entry> walEntries;
-    // last WAL that was read
-    private Path lastWalPath;
-    // position in WAL of last entry in this batch
-    private long lastWalPosition = 0;
-    // number of distinct row keys in this batch
-    private int nbRowKeys = 0;
-    // number of HFiles
-    private int nbHFiles = 0;
-    // heap size of data we need to replicate
-    private long heapSize = 0;
-
-    /**
-     * @param walEntries
-     * @param lastWalPath Path of the WAL the last entry in this batch was 
read from
-     * @param lastWalPosition Position in the WAL the last entry in this batch 
was read from
-     */
-    WALEntryBatch(int maxNbEntries, Path lastWalPath) {
-      this.walEntries = new ArrayList<>(maxNbEntries);
-      this.lastWalPath = lastWalPath;
-    }
-
-    public void addEntry(Entry entry) {
-      walEntries.add(entry);
-    }
-
-    /**
-     * @return the WAL Entries.
-     */
-    public List<Entry> getWalEntries() {
-      return walEntries;
-    }
-
-    /**
-     * @return the path of the last WAL that was read.
-     */
-    public Path getLastWalPath() {
-      return lastWalPath;
-    }
-
-    /**
-     * @return the position in the last WAL that was read.
-     */
-    public long getLastWalPosition() {
-      return lastWalPosition;
-    }
-
-    public int getNbEntries() {
-      return walEntries.size();
-    }
-
-    /**
-     * @return the number of distinct row keys in this batch
-     */
-    public int getNbRowKeys() {
-      return nbRowKeys;
-    }
-
-    /**
-     * @return the number of HFiles in this batch
-     */
-    public int getNbHFiles() {
-      return nbHFiles;
-    }
-
-    /**
-     * @return total number of operations in this batch
-     */
-    public int getNbOperations() {
-      return getNbRowKeys() + getNbHFiles();
-    }
-
-    /**
-     * @return the heap size of this batch
-     */
-    public long getHeapSize() {
-      return heapSize;
-    }
-
-    private void incrementNbRowKeys(int increment) {
-      nbRowKeys += increment;
-    }
-
-    private void incrementNbHFiles(int increment) {
-      nbHFiles += increment;
-    }
-
-    private void incrementHeapSize(long increment) {
-      heapSize += increment;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f29bf1d7/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
new file mode 100644
index 0000000..95f3868
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MetaTableAccessor.ReplicationBarrierResult;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
+/**
+ * <p>
+ * Helper class to determine whether we can push a given WAL entry without 
breaking the replication
+ * order. The class is designed to per {@link ReplicationSourceWALReader}, so 
not thread safe.
+ * </p>
+ * <p>
+ * We record all the open sequence number for a region in a special family in 
meta, which is called
+ * 'barrier', so there will be a sequence of open sequence number (b1, b2, b3, 
...). We call [bn,
+ * bn+1) a range, and it is obvious that a region will always be on the same 
RS within a range.
+ * <p>
+ * When split and merge, we will also record the parent for the generated 
region(s) in the special
+ * family in meta. And also, we will write an extra 'open sequence number' for 
the parent region(s),
+ * which is the max sequence id of the region plus one.
+ * </p>
+ * </p>
+ * <p>
+ * For each peer, we record the last pushed sequence id for each region. It is 
managed by the
+ * replication storage.
+ * </p>
+ * <p>
+ * The algorithm works like this:
+ * <ol>
+ * <li>Locate the sequence id we want to push in the barriers</li>
+ * <li>If it is before the first barrier, we are safe to push. This usually 
because we enable serial
+ * replication for this table after we create the table and write data into 
the table.</li>
+ * <li>In general, if the previous range is finished, then we are safe to 
push. The way to determine
+ * whether a range is finish is straight-forward: check whether the last 
pushed sequence id is equal
+ * to the end barrier of the range minus 1. There are several exceptions:
+ * <ul>
+ * <li>If it is in the first range, we need to check whether there are parent 
regions. If so, we
+ * need to make sure that the data for parent regions have all been 
pushed.</li>
+ * <li>If it is in the last range, we need to check the region state. If state 
is OPENING, then we
+ * are not safe to push. This is because that, before we call reportRIT to 
master which update the
+ * open sequence number into meta table, we will write a open region event 
marker to WAL first, and
+ * its sequence id is greater than the newest open sequence number(which has 
not been updated to
+ * meta table yet so we do not know). For this scenario, the WAL entry for 
this open region event
+ * marker actually belongs to the range after the 'last' range, so we are not 
safe to push it.
+ * Otherwise the last pushed sequence id will be updated to this value and 
then we think the
+ * previous range has already been finished, but this is not true.</li>
+ * <li>Notice that the above two exceptions are not conflicts, since the first 
range can also be the
+ * last range if we only have one range.</li>
+ * </ul>
+ * </li>
+ * </ol>
+ * </p>
+ * <p>
+ * And for performance reason, we do not want to check meta for every WAL 
entry, so we introduce two
+ * in memory maps. The idea is simple:
+ * <ul>
+ * <li>If a range can be pushed, then put its end barrier into the {@code 
canPushUnder} map.</li>
+ * <li>Before accessing meta, first check the sequence id stored in the {@code 
canPushUnder} map. If
+ * the sequence id of WAL entry is less the one stored in {@code canPushUnder} 
map, then we are safe
+ * to push.</li>
+ * </ul>
+ * And for the last range, we do not have an end barrier, so we use the 
continuity of sequence id to
+ * determine whether we can push. The rule is:
+ * <ul>
+ * <li>When an entry is able to push, then put its sequence id into the {@code 
pushed} map.</li>
+ * <li>Check if the sequence id of WAL entry equals to the one stored in the 
{@code pushed} map plus
+ * one. If so, we are safe to push, and also update the {@code pushed} map 
with the sequence id of
+ * the WAL entry.</li>
+ * </ul>
+ * </p>
+ */
+@InterfaceAudience.Private
+class SerialReplicationChecker {
+
+  public static final String REPLICATION_SERIALLY_WAITING_KEY =
+    "hbase.serial.replication.waiting.ms";
+  public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
+
+  private final String peerId;
+
+  private final ReplicationQueueStorage storage;
+
+  private final Connection conn;
+
+  private final long waitTimeMs;
+
+  private final LoadingCache<String, MutableLong> pushed = 
CacheBuilder.newBuilder()
+    .expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, 
MutableLong>() {
+
+      @Override
+      public MutableLong load(String key) throws Exception {
+        return new MutableLong(HConstants.NO_SEQNUM);
+      }
+    });
+
+  // Use guava cache to set ttl for each key
+  private final Cache<String, Long> canPushUnder =
+    CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
+
+  public SerialReplicationChecker(Configuration conf, ReplicationSource 
source) {
+    this.peerId = source.getPeerId();
+    this.storage = source.getQueueStorage();
+    this.conn = source.getServer().getConnection();
+    this.waitTimeMs =
+      conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, 
REPLICATION_SERIALLY_WAITING_DEFAULT);
+  }
+
+  private boolean isRangeFinished(long endBarrier, String encodedRegionName) 
throws IOException {
+    long pushedSeqId;
+    try {
+      pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId);
+    } catch (ReplicationException e) {
+      throw new IOException(
+        "Failed to get pushed sequence id for " + encodedRegionName + ", peer 
" + peerId, e);
+    }
+    // endBarrier is the open sequence number. When opening a region, the open 
sequence number will
+    // be set to the old max sequence id plus one, so here we need to minus 
one.
+    return pushedSeqId >= endBarrier - 1;
+  }
+
+  private boolean isParentFinished(byte[] regionName) throws IOException {
+    long[] barriers = MetaTableAccessor.getReplicationBarrier(conn, 
regionName);
+    if (barriers.length == 0) {
+      return true;
+    }
+    return isRangeFinished(barriers[barriers.length - 1], 
RegionInfo.encodeRegionName(regionName));
+  }
+
+  // We may write a open region marker to WAL before we write the open 
sequence number to meta, so
+  // if a region is in OPENING state and we are in the last range, it is not 
safe to say we can push
+  // even if the previous range is finished.
+  private boolean isLastRangeAndOpening(ReplicationBarrierResult 
barrierResult, int index) {
+    return index == barrierResult.getBarriers().length &&
+      barrierResult.getState() == RegionState.State.OPENING;
+  }
+
+  private void recordCanPush(String encodedNameAsString, long seqId, long[] 
barriers, int index) {
+    if (barriers.length > index) {
+      canPushUnder.put(encodedNameAsString, barriers[index]);
+    }
+    pushed.getUnchecked(encodedNameAsString).setValue(seqId);
+  }
+
+  private boolean canPush(Entry entry, byte[] row) throws IOException {
+    String encodedNameAsString = 
Bytes.toString(entry.getKey().getEncodedRegionName());
+    long seqId = entry.getKey().getSequenceId();
+    ReplicationBarrierResult barrierResult = 
MetaTableAccessor.getReplicationBarrierResult(conn,
+      entry.getKey().getTableName(), row, 
entry.getKey().getEncodedRegionName());
+    long[] barriers = barrierResult.getBarriers();
+    int index = Arrays.binarySearch(barriers, seqId);
+    if (index == -1) {
+      // This means we are in the range before the first record openSeqNum, 
this usually because the
+      // wal is written before we enable serial replication for this table, 
just return true since
+      // we can not guarantee the order.
+      pushed.getUnchecked(encodedNameAsString).setValue(seqId);
+      return true;
+    }
+    // The sequence id range is left closed and right open, so either we 
decrease the missed insert
+    // point to make the index start from 0, or increase the hit insert point 
to make the index
+    // start from 1. Here we choose the latter one.
+    if (index < 0) {
+      index = -index - 1;
+    } else {
+      index++;
+    }
+    if (index == 1) {
+      // we are in the first range, check whether we have parents
+      for (byte[] regionName : barrierResult.getParentRegionNames()) {
+        if (!isParentFinished(regionName)) {
+          return false;
+        }
+      }
+      if (isLastRangeAndOpening(barrierResult, index)) {
+        return false;
+      }
+      recordCanPush(encodedNameAsString, seqId, barriers, 1);
+      return true;
+    }
+    // check whether the previous range is finished
+    if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
+      return false;
+    }
+    if (isLastRangeAndOpening(barrierResult, index)) {
+      return false;
+    }
+    recordCanPush(encodedNameAsString, seqId, barriers, index);
+    return true;
+  }
+
+  public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException 
{
+    String encodedNameAsString = 
Bytes.toString(entry.getKey().getEncodedRegionName());
+    long seqId = entry.getKey().getSequenceId();
+    Long canReplicateUnderSeqId = 
canPushUnder.getIfPresent(encodedNameAsString);
+    if (canReplicateUnderSeqId != null) {
+      if (seqId < canReplicateUnderSeqId.longValue()) {
+        return true;
+      }
+      // we are already beyond the last safe point, remove
+      canPushUnder.invalidate(encodedNameAsString);
+    }
+    // This is for the case where the region is currently opened on us, if the 
sequence id is
+    // continuous then we are safe to replicate. If there is a breakpoint, 
then maybe the region
+    // has been moved to another RS and then back, so we need to check the 
barrier.
+    MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
+    if (seqId == previousPushedSeqId.longValue() + 1) {
+      previousPushedSeqId.increment();
+      return true;
+    }
+    return canPush(entry, CellUtil.cloneRow(firstCellInEdit));
+  }
+
+  public void waitUntilCanPush(Entry entry, Cell firstCellInEdit)
+      throws IOException, InterruptedException {
+    byte[] row = CellUtil.cloneRow(firstCellInEdit);
+    while (!canPush(entry, row)) {
+      Thread.sleep(waitTimeMs);
+    }
+  }
+}

Reply via email to