Revert "HBASE-9465 Push entries to peer clusters serially"

This reverts commit 441bc050b991c14c048617bc443b97f46e21b76f.

 Conflicts:
        
hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
        
hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
        
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
        hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Signed-off-by: Andrew Purtell <apurt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba7a936f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba7a936f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba7a936f

Branch: refs/heads/branch-1
Commit: ba7a936f74985eb9d974fdc87b0d06cb8cd8473d
Parents: 0a284d2
Author: Sean Busbey <bus...@apache.org>
Authored: Tue Nov 7 23:50:35 2017 -0600
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Feb 23 14:42:15 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |  46 +--
 .../apache/hadoop/hbase/MetaTableAccessor.java  | 243 ++---------
 .../client/replication/ReplicationAdmin.java    |  14 +-
 .../org/apache/hadoop/hbase/HConstants.java     |  26 --
 .../src/main/resources/hbase-default.xml        |  14 -
 .../hbase/protobuf/generated/WALProtos.java     |  16 +-
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 -
 .../org/apache/hadoop/hbase/master/HMaster.java |   9 -
 .../hadoop/hbase/master/RegionStateStore.java   |  43 +-
 .../master/cleaner/ReplicationMetaCleaner.java  | 187 ---------
 .../RegionMergeTransactionImpl.java             |   3 +-
 .../hbase/regionserver/ReplicationService.java  |   1 -
 .../regionserver/SplitTransactionImpl.java      |   2 +-
 .../replication/regionserver/Replication.java   |  14 +-
 .../regionserver/ReplicationSource.java         |  68 +---
 .../regionserver/ReplicationSourceManager.java  |  87 +---
 .../ReplicationSourceWALReaderThread.java       |  31 --
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  13 -
 .../hadoop/hbase/TestMetaTableAccessor.java     |  10 +-
 .../hadoop/hbase/client/TestMetaScanner.java    |   2 +-
 .../master/TestAssignmentManagerOnCluster.java  |   2 +-
 .../replication/TestSerialReplication.java      | 401 -------------------
 .../regionserver/TestGlobalThrottler.java       |   2 +-
 23 files changed, 69 insertions(+), 1166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 1fd950a..7f48976 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -34,12 +34,13 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
 
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -51,7 +52,6 @@ import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.WritableComparable;
@@ -1217,18 +1217,6 @@ public class HTableDescriptor implements 
WritableComparable<HTableDescriptor> {
   }
 
   /**
-   * Return true if there are at least one cf whose replication scope is 
serial.
-   */
-  public boolean hasSerialReplicationScope() {
-    for (HColumnDescriptor column: getFamilies()){
-      if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * Returns the configured replicas per region
    */
   public int getRegionReplication() {
@@ -1772,32 +1760,8 @@ public class HTableDescriptor implements 
WritableComparable<HTableDescriptor> {
           .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
           // Disable blooms for meta.  Needs work.  Seems to mess w/ 
getClosestOrBefore.
           .setBloomFilterType(BloomType.NONE)
-            .setCacheDataInL1(true),
-          new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY)
-              .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
-                  HConstants.DEFAULT_HBASE_META_VERSIONS))
-              .setInMemory(true)
-              .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
-                  HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
-              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-              // Disable blooms for meta.  Needs work.  Seems to mess w/ 
getClosestOrBefore.
-              .setBloomFilterType(BloomType.NONE)
-              // Enable cache of data blocks in L1 if more than one caching 
tier deployed:
-              // e.g. if using CombinedBlockCache (BucketCache).
-              .setCacheDataInL1(true),
-          new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY)
-              .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
-                  HConstants.DEFAULT_HBASE_META_VERSIONS))
-              .setInMemory(true)
-              .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
-                  HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
-              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-              // Disable blooms for meta.  Needs work.  Seems to mess w/ 
getClosestOrBefore.
-              .setBloomFilterType(BloomType.NONE)
-              // Enable cache of data blocks in L1 if more than one caching 
tier deployed:
-              // e.g. if using CombinedBlockCache (BucketCache).
-              .setCacheDataInL1(true),
-      });
+          .setCacheDataInL1(true)
+         });
     metaDescriptor.addCoprocessor(
       "org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",
       null, Coprocessor.PRIORITY_SYSTEM, null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index c7e3757..3f11558 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -20,20 +20,6 @@ package org.apache.hadoop.hbase;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -63,6 +49,18 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * Read/write operations on region and assignment information store in
  * <code>hbase:meta</code>.
@@ -109,27 +107,10 @@ public class MetaTableAccessor {
    *
    * The actual layout of meta should be encapsulated inside MetaTableAccessor 
methods,
    * and should not leak out of it (through Result objects, etc)
-   *
-   * For replication serially, there are two column families "rep_barrier", 
"rep_position" whose
-   * row key is encodedRegionName.
-   * rep_barrier:{seqid}      => in each time a RS opens a region, it saves 
the open sequence
-   *                                  id in this region
-   * rep_position:{peerid}    => to save the max sequence id we have pushed 
for each peer
-   * rep_position:_TABLENAME_ => a special cell to save this region's table 
name, will used when
-   *                             we clean old data
-   * rep_position:_DAUGHTER_  => a special cell to present this region is 
split or merged, in this
-   *                             cell the value is merged encoded name or two 
split encoded names
-   *                             separated by ","
    */
 
   private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
 
-  // Save its daughter region(s) when split/merge
-  private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
-  // Save its table name because we only know region's encoded name
-  private static final String tableNamePeer = "_TABLENAME_";
-  private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
-
   static final byte [] META_REGION_PREFIX;
   static {
     // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
@@ -981,19 +962,6 @@ public class MetaTableAccessor {
     return delete;
   }
 
-  public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] 
tableName) {
-    byte[] seqBytes = Bytes.toBytes(seq);
-    return new Put(encodedRegionName)
-        .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, 
seqBytes)
-        .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, 
tableName);
-  }
-
-
-  public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] 
value) {
-    return new 
Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
-        daughterNamePosCq, value);
-  }
-
   /**
    * Adds split daughters to the Put
    */
@@ -1010,24 +978,24 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
-   * Non-atomic for multi puts.
+   * Put the passed <code>p</code> to the <code>hbase:meta</code> table.
    * @param connection connection we're using
-   * @param puts Put to add to hbase:meta
+   * @param p Put to add to hbase:meta
    * @throws IOException
    */
-  static void putToMetaTable(final Connection connection, final Put... puts) 
throws IOException {
-    put(getMetaHTable(connection), Arrays.asList(puts));
+  static void putToMetaTable(final Connection connection, final Put p)
+    throws IOException {
+    put(getMetaHTable(connection), p);
   }
 
   /**
    * @param t Table to use (will be closed when done).
-   * @param puts puts to make
+   * @param p put to make
    * @throws IOException
    */
-  private static void put(final Table t, final List<Put> puts) throws 
IOException {
+  private static void put(final Table t, final Put p) throws IOException {
     try {
-      t.put(puts);
+      t.put(p);
     } finally {
       t.close();
     }
@@ -1153,7 +1121,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its 
daughters. Note that this
    * does not add its daughter's as different rows, but adds information about 
the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, 
ServerName,int,boolean)}
+   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, 
ServerName, int)}
    * if you want to do that.
    * @param meta the Table for META
    * @param regionInfo region information
@@ -1175,7 +1143,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its 
daughters. Note that this
    * does not add its daughter's as different rows, but adds information about 
the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, 
ServerName,int,boolean)}
+   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, 
ServerName, int)}
    * if you want to do that.
    * @param connection connection we're using
    * @param regionInfo region information
@@ -1264,7 +1232,7 @@ public class MetaTableAccessor {
    */
   public static void mergeRegions(final Connection connection, HRegionInfo 
mergedRegion,
       HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int 
regionReplication,
-      long masterSystemTime, boolean saveBarrier)
+      long masterSystemTime)
           throws IOException {
     Table meta = getMetaHTable(connection);
     try {
@@ -1295,17 +1263,7 @@ public class MetaTableAccessor {
 
       byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
         + HConstants.DELIMITER);
-      Mutation[] mutations;
-      if (saveBarrier) {
-        Put putBarrierA = 
makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
-            Bytes.toBytes(mergedRegion.getEncodedName()));
-        Put putBarrierB = 
makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
-            Bytes.toBytes(mergedRegion.getEncodedName()));
-        mutations = new Mutation[] { putOfMerged, deleteA, deleteB, 
putBarrierA, putBarrierB };
-      } else {
-        mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
-      }
-      multiMutate(meta, tableRow, mutations);
+      multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
     } finally {
       meta.close();
     }
@@ -1321,11 +1279,10 @@ public class MetaTableAccessor {
    * @param splitA Split daughter region A
    * @param splitB Split daughter region A
    * @param sn the location of the region
-   * @param saveBarrier true if need save replication barrier in meta, used 
for serial replication
    */
-  public static void splitRegion(final Connection connection, HRegionInfo 
parent,
-      HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int 
regionReplication,
-      boolean saveBarrier) throws IOException {
+  public static void splitRegion(final Connection connection,
+                                 HRegionInfo parent, HRegionInfo splitA, 
HRegionInfo splitB,
+                                 ServerName sn, int regionReplication) throws 
IOException {
     Table meta = getMetaHTable(connection);
     try {
       HRegionInfo copyOfParent = new HRegionInfo(parent);
@@ -1350,17 +1307,8 @@ public class MetaTableAccessor {
         addEmptyLocation(putB, i);
       }
 
-      Mutation[] mutations;
-      if (saveBarrier) {
-        Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
-            Bytes
-                .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + 
splitB.getEncodedName()));
-        mutations = new Mutation[]{putParent, putA, putB, putBarrier};
-      } else {
-        mutations = new Mutation[]{putParent, putA, putB};
-      }
       byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + 
HConstants.DELIMITER);
-      multiMutate(meta, tableRow, mutations);
+      multiMutate(meta, tableRow, putParent, putA, putB);
     } finally {
       meta.close();
     }
@@ -1418,27 +1366,6 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Updates the progress of pushing entries to peer cluster. Skip entry if 
value is -1.
-   * @param connection connection we're using
-   * @param peerId the peerId to push
-   * @param positions map that saving positions for each region
-   * @throws IOException
-   */
-  public static void updateReplicationPositions(Connection connection, String 
peerId,
-      Map<String, Long> positions) throws IOException {
-    List<Put> puts = new ArrayList<>();
-    for (Map.Entry<String, Long> entry : positions.entrySet()) {
-      long value = Math.abs(entry.getValue());
-      Put put = new Put(Bytes.toBytes(entry.getKey()));
-      put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, 
Bytes.toBytes(peerId),
-          Bytes.toBytes(value));
-      puts.add(put);
-    }
-    getMetaHTable(connection).put(puts);
-  }
-
-
-  /**
    * Updates the location of the specified region to be the specified server.
    * <p>
    * Connects to the specified server which should be hosting the specified
@@ -1623,120 +1550,6 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Get replication position for a peer in a region.
-   * @param connection connection we're using
-   * @return the position of this peer, -1 if no position in meta.
-   */
-  public static long getReplicationPositionForOnePeer(Connection connection,
-      byte[] encodedRegionName, String peerId) throws IOException {
-    Get get = new Get(encodedRegionName);
-    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, 
Bytes.toBytes(peerId));
-    Result r = get(getMetaHTable(connection), get);
-    if (r.isEmpty()) {
-      return -1;
-    }
-    Cell cell = r.rawCells()[0];
-    return 
Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
-  }
-
-  /**
-   * Get replication positions for all peers in a region.
-   * @param connection connection we're using
-   * @param encodedRegionName region's encoded name
-   * @return the map of positions for each peer
-   */
-  public static Map<String, Long> getReplicationPositionForAllPeer(Connection 
connection,
-      byte[] encodedRegionName) throws IOException {
-    Get get = new Get(encodedRegionName);
-    get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
-    Result r = get(getMetaHTable(connection), get);
-    Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
-    for (Cell c : r.listCells()) {
-      if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, 
c.getQualifierArray(),
-          c.getQualifierOffset(), c.getQualifierLength()) &&
-          !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, 
c.getQualifierArray(),
-              c.getQualifierOffset(), c.getQualifierLength())) {
-        map.put(
-            Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), 
c.getQualifierLength()),
-            Bytes.toLong(c.getValueArray(), c.getValueOffset(), 
c.getValueLength()));
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Get all barriers in all regions.
-   * @return a map of barrier lists in all regions
-   * @throws IOException
-   */
-  public static List<Long> getReplicationBarriers(Connection connection, 
byte[] encodedRegionName)
-      throws IOException {
-    Get get = new Get(encodedRegionName);
-    get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
-    Result r = get(getMetaHTable(connection), get);
-    List<Long> list = new ArrayList<>();
-    if (!r.isEmpty()) {
-      for (Cell cell : r.rawCells()) {
-        list.add(Bytes.toLong(cell.getQualifierArray(), 
cell.getQualifierOffset(),
-            cell.getQualifierLength()));
-      }
-    }
-    return list;
-  }
-
-  public static Map<String, List<Long>> getAllBarriers(Connection connection) 
throws IOException {
-    Map<String, List<Long>> map = new HashMap<>();
-    Scan scan = new Scan();
-    scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
-    try (Table t = getMetaHTable(connection);
-        ResultScanner scanner = t.getScanner(scan)) {
-      Result result;
-      while ((result = scanner.next()) != null) {
-        String key = Bytes.toString(result.getRow());
-        List<Long> list = new ArrayList<>();
-        for (Cell cell : result.rawCells()) {
-          list.add(Bytes.toLong(cell.getQualifierArray(), 
cell.getQualifierOffset(),
-              cell.getQualifierLength()));
-        }
-        map.put(key, list);
-      }
-    }
-    return map;
-  }
-
-  /**
-   * Get daughter region(s) for a region, only used in serial replication.
-   * @throws IOException
-   */
-  public static String getSerialReplicationDaughterRegion(Connection 
connection, byte[] encodedName)
-      throws IOException {
-    Get get = new Get(encodedName);
-    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
-    Result result = get(getMetaHTable(connection), get);
-    if (!result.isEmpty()) {
-      Cell c = result.rawCells()[0];
-      return Bytes.toString(c.getValueArray(), c.getValueOffset(), 
c.getValueLength());
-    }
-    return null;
-  }
-
-  /**
-   * Get the table name for a region, only used in serial replication.
-   * @throws IOException
-   */
-  public static String getSerialReplicationTableName(Connection connection, 
byte[] encodedName)
-      throws IOException {
-    Get get = new Get(encodedName);
-    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
-    Result result = get(getMetaHTable(connection), get);
-    if (!result.isEmpty()) {
-      Cell c = result.rawCells()[0];
-      return Bytes.toString(c.getValueArray(), c.getValueOffset(), 
c.getValueLength());
-    }
-    return null;
-  }
-
-  /**
    * Checks whether hbase:meta contains any info:server entry.
    * @param connection connection we're using
    * @return true if hbase:meta contains any info:server entry, false if not

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 7bef9ed..55653d5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -91,10 +91,8 @@ public class ReplicationAdmin implements Closeable {
   // only Global for now, can add other type
   // such as, 1) no global replication, or 2) the table is replicated to this 
cluster, etc.
   public static final String REPLICATIONTYPE = "replicationType";
-  public static final String REPLICATIONGLOBAL =
-      Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
-  public static final String REPLICATIONSERIAL =
-      Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
+  public static final String REPLICATIONGLOBAL = Integer
+      .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
 
   private final Connection connection;
   // TODO: replication should be managed by master. All the classes except 
ReplicationAdmin should
@@ -488,10 +486,7 @@ public class ReplicationAdmin implements Closeable {
           HashMap<String, String> replicationEntry = new HashMap<String, 
String>();
           replicationEntry.put(TNAME, tableName);
           replicationEntry.put(CFNAME, column.getNameAsString());
-          replicationEntry.put(REPLICATIONTYPE,
-              column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
-                  REPLICATIONGLOBAL :
-                  REPLICATIONSERIAL);
+          replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
           replicationColFams.add(replicationEntry);
         }
       }
@@ -703,8 +698,7 @@ public class ReplicationAdmin implements Closeable {
     boolean hasDisabled = false;
 
     for (HColumnDescriptor hcd : htd.getFamilies()) {
-      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
-          && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
+      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
         hasDisabled = true;
       } else {
         hasEnabled = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index dc44c77..e702236 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -449,20 +449,6 @@ public final class HConstants {
   /** The catalog family */
   public static final byte [] CATALOG_FAMILY = 
Bytes.toBytes(CATALOG_FAMILY_STR);
 
-  /** The replication barrier family as a string*/
-  public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
-
-  /** The replication barrier family */
-  public static final byte [] REPLICATION_BARRIER_FAMILY =
-      Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
-
-  /** The replication barrier family as a string*/
-  public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
-
-  /** The replication barrier family */
-  public static final byte [] REPLICATION_POSITION_FAMILY =
-      Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
-
   /** The RegionInfo qualifier as a string */
   public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
 
@@ -658,12 +644,6 @@ public final class HConstants {
   public static final int REPLICATION_SCOPE_GLOBAL = 1;
 
   /**
-   * Scope tag for serially scoped data
-   * This data will be replicated to all peers by the order of sequence id.
-   */
-  public static final int REPLICATION_SCOPE_SERIAL = 2;
-
-  /**
    * Default cluster ID, cannot be used to identify a cluster so a key with
    * this value means it wasn't meant for replication.
    */
@@ -914,12 +894,6 @@ public final class HConstants {
   public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
   /** Replication cluster id of source cluster which uniquely identifies 
itself with peer cluster */
   public static final String REPLICATION_CLUSTER_ID = 
"hbase.replication.cluster.id";
-
-  public static final String
-      REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
-  public static final long
-      REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
-
   /**
    * Max total size of buffered entries in all replication peers. It will 
prevent server getting
    * OOM if there are many peers. Default value is 256MB which is four times 
to default

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml 
b/hbase-common/src/main/resources/hbase-default.xml
index 5908359..e1ae0ef 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1542,20 +1542,6 @@ possible configurations would overwhelm and obscure the 
important.
         slave clusters. The default of 10 will rarely need to be changed.
     </description>
   </property>
-  <property>
-    <name>hbase.serial.replication.waitingMs</name>
-    <value>10000</value>
-    <description>
-      By default, in replication we can not make sure the order of operations 
in slave cluster is
-      same as the order in master. If set REPLICATION_SCOPE to 2, we will push 
edits by the order
-      of written. This configure is to set how long (in ms) we will wait 
before next checking if a
-      log can not push right now because there are some logs written before it 
have not been pushed.
-      A larger waiting will decrease the number of queries on hbase:meta but 
will enlarge the delay
-      of replication. This feature relies on zk-less assignment, and conflicts 
with distributed log
-      replay. So users must set hbase.assignment.usezk and 
hbase.master.distributed.log.replay to
-      false to support it.
-    </description>
-  </property>
   <!-- Static Web User Filter properties. -->
   <property>
     <description>

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index a466e6c..e0efab4 100644
--- 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -21,10 +21,6 @@ public final class WALProtos {
      * <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
      */
     REPLICATION_SCOPE_GLOBAL(1, 1),
-    /**
-     * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
-     */
-    REPLICATION_SCOPE_SERIAL(2, 2),
     ;
 
     /**
@@ -35,10 +31,6 @@ public final class WALProtos {
      * <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
      */
     public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1;
-    /**
-     * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
-     */
-    public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2;
 
 
     public final int getNumber() { return value; }
@@ -47,7 +39,6 @@ public final class WALProtos {
       switch (value) {
         case 0: return REPLICATION_SCOPE_LOCAL;
         case 1: return REPLICATION_SCOPE_GLOBAL;
-        case 2: return REPLICATION_SCOPE_SERIAL;
         default: return null;
       }
     }
@@ -12023,11 +12014,10 @@ public final class WALProtos {
       "e.pb.StoreDescriptor\022$\n\006server\030\006 \001(\0132\024.h" +
       "base.pb.ServerName\022\023\n\013region_name\030\007 \001(\014\"" +
       ".\n\tEventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_" +
-      "CLOSE\020\001\"\014\n\nWALTrailer*d\n\tScopeType\022\033\n\027RE" +
+      "CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027RE" +
       "PLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_S" +
-      "COPE_GLOBAL\020\001\022\034\n\030REPLICATION_SCOPE_SERIA" +
-      "L\020\002B?\n*org.apache.hadoop.hbase.protobuf." +
-      "generatedB\tWALProtosH\001\210\001\000\240\001\001"
+      "COPE_GLOBAL\020\001B?\n*org.apache.hadoop.hbase" +
+      ".protobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto 
b/hbase-protocol/src/main/protobuf/WAL.proto
index 08925f8..a888686 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -77,7 +77,6 @@ message WALKey {
 enum ScopeType {
   REPLICATION_SCOPE_LOCAL = 0;
   REPLICATION_SCOPE_GLOBAL = 1;
-  REPLICATION_SCOPE_SERIAL = 2;
 }
 
 message FamilyScope {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index f8bbc65..6951098 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -99,7 +99,6 @@ import 
org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
-import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
@@ -331,7 +330,6 @@ public class HMaster extends HRegionServer implements 
MasterServices, Server {
   CatalogJanitor catalogJanitorChore;
   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
   private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
-  private ReplicationMetaCleaner replicationMetaCleaner;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
 
@@ -1250,12 +1248,6 @@ public class HMaster extends HRegionServer implements 
MasterServices, Server {
     } catch (Exception e) {
       LOG.error("start replicationZKNodeCleanerChore failed", e);
     }
-    try {
-      replicationMetaCleaner = new ReplicationMetaCleaner(this, this, 
cleanerInterval);
-      getChoreService().scheduleChore(replicationMetaCleaner);
-    } catch (Exception e) {
-      LOG.error("start ReplicationMetaCleaner failed", e);
-    }
   }
 
   @Override
@@ -1291,7 +1283,6 @@ public class HMaster extends HRegionServer implements 
MasterServices, Server {
     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
     if (this.replicationZKLockCleanerChore != null) 
this.replicationZKLockCleanerChore.cancel(true);
     if (this.replicationZKNodeCleanerChore != null) 
this.replicationZKNodeCleanerChore.cancel(true);
-    if (this.replicationMetaCleaner != null) 
this.replicationMetaCleaner.cancel(true);
     if (this.quotaManager != null) this.quotaManager.stop();
     if (this.activeMasterManager != null) this.activeMasterManager.stop();
     if (this.serverManager != null) this.serverManager.stop();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
index 2d445e2..476b4d5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -17,15 +17,11 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -49,6 +45,8 @@ import org.apache.hadoop.hbase.util.MultiHConnection;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A helper to persist region state in meta. We may change this class
  * to StateStore later if we also use it to store other states in meta
@@ -65,7 +63,7 @@ public class RegionStateStore {
   private volatile boolean initialized;
 
   private final boolean noPersistence;
-  private final MasterServices server;
+  private final Server server;
 
   /**
    * Returns the {@link ServerName} from catalog table {@link Result}
@@ -135,7 +133,7 @@ public class RegionStateStore {
           State.SPLITTING_NEW, State.MERGED));
   }
 
-  RegionStateStore(final MasterServices server) {
+  RegionStateStore(final Server server) {
     Configuration conf = server.getConfiguration();
     // No need to persist if using ZK but not migrating
     noPersistence = ConfigUtil.useZKForAssignment(conf)
@@ -200,41 +198,31 @@ public class RegionStateStore {
     State state = newState.getState();
 
       int replicaId = hri.getReplicaId();
-      Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
+      Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
       StringBuilder info = new StringBuilder("Updating hbase:meta row ");
       info.append(hri.getRegionNameAsString()).append(" with 
state=").append(state);
       if (serverName != null && !serverName.equals(oldServer)) {
-        metaPut.addImmutable(HConstants.CATALOG_FAMILY, 
getServerNameColumn(replicaId),
+        put.addImmutable(HConstants.CATALOG_FAMILY, 
getServerNameColumn(replicaId),
           Bytes.toBytes(serverName.getServerName()));
         info.append(", sn=").append(serverName);
       }
       if (openSeqNum >= 0) {
         Preconditions.checkArgument(state == State.OPEN
           && serverName != null, "Open region should be on a server");
-        MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, 
replicaId);
+        MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, 
replicaId);
         info.append(", openSeqNum=").append(openSeqNum);
         info.append(", server=").append(serverName);
       }
-      metaPut.addImmutable(HConstants.CATALOG_FAMILY, 
getStateColumn(replicaId),
+      put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
         Bytes.toBytes(state.name()));
       LOG.info(info);
-      HTableDescriptor descriptor = 
server.getTableDescriptors().get(hri.getTable());
-      boolean serial = false;
-      if (descriptor != null) {
-        serial = 
server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
-      }
-      boolean shouldPutBarrier = serial && state == State.OPEN;
+
       // Persist the state change to meta
       if (metaRegion != null) {
         try {
           // Assume meta is pinned to master.
           // At least, that's what we want.
-          metaRegion.put(metaPut);
-          if (shouldPutBarrier) {
-            Put barrierPut = 
MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
-                openSeqNum, hri.getTable().getName());
-            metaRegion.put(barrierPut);
-          }
+          metaRegion.put(put);
           return; // Done here
         } catch (Throwable t) {
           // In unit tests, meta could be moved away by intention
@@ -253,10 +241,7 @@ public class RegionStateStore {
         }
       }
       // Called when meta is not on master
-      List<Put> list = shouldPutBarrier ?
-          Arrays.asList(metaPut, 
MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
-              openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut);
-      multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, 
null, null);
+      multiHConnection.processBatchCallback(Arrays.asList(put), 
TableName.META_TABLE_NAME, null, null);
 
     } catch (IOException ioe) {
       LOG.error("Failed to persist region state " + newState, ioe);
@@ -266,14 +251,12 @@ public class RegionStateStore {
 
   void splitRegion(HRegionInfo p,
       HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) 
throws IOException {
-    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, 
regionReplication,
-        
server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
+    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, 
regionReplication);
   }
 
   void mergeRegions(HRegionInfo p,
       HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) 
throws IOException {
     MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, 
regionReplication,
-        EnvironmentEdgeManager.currentTime(),
-        
server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
+               EnvironmentEdgeManager.currentTime());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
deleted file mode 100644
index 41864b9..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.cleaner;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * This chore is to clean up the useless data in hbase:meta which is used by 
serial replication.
- */
-@InterfaceAudience.Private
-public class ReplicationMetaCleaner extends ScheduledChore {
-
-  private static final Log LOG = 
LogFactory.getLog(ReplicationMetaCleaner.class);
-
-  private ReplicationAdmin replicationAdmin;
-  private MasterServices master;
-
-  public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, 
int period)
-      throws IOException {
-    super("ReplicationMetaCleaner", stoppable, period);
-    this.master = master;
-    replicationAdmin = new ReplicationAdmin(master.getConfiguration());
-  }
-
-  @Override
-  protected void chore() {
-    try {
-      Map<String, HTableDescriptor> tables = 
master.getTableDescriptors().getAll();
-      Map<String, Set<String>> serialTables = new HashMap<>();
-      for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) {
-        boolean hasSerialScope = false;
-        for (HColumnDescriptor column : entry.getValue().getFamilies()) {
-          if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
-            hasSerialScope = true;
-            break;
-          }
-        }
-        if (hasSerialScope) {
-          serialTables.put(entry.getValue().getTableName().getNameAsString(), 
new HashSet<String>());
-        }
-      }
-      if (serialTables.isEmpty()){
-        return;
-      }
-
-      Map<String, ReplicationPeerConfig> peers = 
replicationAdmin.listPeerConfigs();
-      for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
-        for (Map.Entry<byte[], byte[]> map : entry.getValue().getPeerData()
-            .entrySet()) {
-          String tableName = Bytes.toString(map.getKey());
-          if (serialTables.containsKey(tableName)) {
-            serialTables.get(tableName).add(entry.getKey());
-            break;
-          }
-        }
-      }
-
-      Map<String, List<Long>> barrierMap = 
MetaTableAccessor.getAllBarriers(master.getConnection());
-      for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
-        String encodedName = entry.getKey();
-        byte[] encodedBytes = Bytes.toBytes(encodedName);
-        boolean canClearRegion = false;
-        Map<String, Long> posMap = 
MetaTableAccessor.getReplicationPositionForAllPeer(
-            master.getConnection(), encodedBytes);
-        if (posMap.isEmpty()) {
-          continue;
-        }
-
-        String tableName = MetaTableAccessor.getSerialReplicationTableName(
-            master.getConnection(), encodedBytes);
-        Set<String> confPeers = serialTables.get(tableName);
-        if (confPeers == null) {
-          // This table doesn't exist or all cf's scope is not serial any 
more, we can clear meta.
-          canClearRegion = true;
-        } else {
-          if (!allPeersHavePosition(confPeers, posMap)) {
-            continue;
-          }
-
-          String daughterValue = MetaTableAccessor
-              .getSerialReplicationDaughterRegion(master.getConnection(), 
encodedBytes);
-          if (daughterValue != null) {
-            //this region is merged or split
-            boolean allDaughterStart = true;
-            String[] daughterRegions = daughterValue.split(",");
-            for (String daughter : daughterRegions) {
-              byte[] region = Bytes.toBytes(daughter);
-              if (!MetaTableAccessor.getReplicationBarriers(
-                  master.getConnection(), region).isEmpty() &&
-                  !allPeersHavePosition(confPeers,
-                      MetaTableAccessor
-                          
.getReplicationPositionForAllPeer(master.getConnection(), region))) {
-                allDaughterStart = false;
-                break;
-              }
-            }
-            if (allDaughterStart) {
-              canClearRegion = true;
-            }
-          }
-        }
-        if (canClearRegion) {
-          Delete delete = new Delete(encodedBytes);
-          delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
-          delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
-          try (Table metaTable = 
master.getConnection().getTable(TableName.META_TABLE_NAME)) {
-            metaTable.delete(delete);
-          }
-        } else {
-
-          // Barriers whose seq is larger than min pos of all peers, and the 
last barrier whose seq
-          // is smaller than min pos should be kept. All other barriers can be 
deleted.
-
-          long minPos = Long.MAX_VALUE;
-          for (Map.Entry<String, Long> pos : posMap.entrySet()) {
-            minPos = Math.min(minPos, pos.getValue());
-          }
-          List<Long> barriers = entry.getValue();
-          int index = Collections.binarySearch(barriers, minPos);
-          if (index < 0) {
-            index = -index - 1;
-          }
-          Delete delete = new Delete(encodedBytes);
-          for (int i = 0; i < index - 1; i++) {
-            delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, 
Bytes.toBytes(barriers.get(i)));
-          }
-          try (Table metaTable = 
master.getConnection().getTable(TableName.META_TABLE_NAME)) {
-            metaTable.delete(delete);
-          }
-        }
-
-      }
-
-    } catch (IOException e) {
-      LOG.error("Exception during cleaning up.", e);
-    }
-
-  }
-
-  private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> 
posMap)
-      throws IOException {
-    for(String peer:peers){
-      if (!posMap.containsKey(peer)){
-        return false;
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
index 9e31fb0..03aa059 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
@@ -434,8 +434,7 @@ public class RegionMergeTransactionImpl implements 
RegionMergeTransaction {
       if (metaEntries.isEmpty()) {
         MetaTableAccessor.mergeRegions(server.getConnection(),
           mergedRegion.getRegionInfo(), region_a.getRegionInfo(), 
region_b.getRegionInfo(),
-          server.getServerName(), 
region_a.getTableDesc().getRegionReplication(), masterSystemTime,
-            false);
+          server.getServerName(), 
region_a.getTableDesc().getRegionReplication(), masterSystemTime);
       } else {
         mergeRegionsAndPutMetaEntries(server.getConnection(),
           mergedRegion.getRegionInfo(), region_a.getRegionInfo(), 
region_b.getRegionInfo(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 95da92a..25a27a9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
index a3eea6d..f9a5d31 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
@@ -338,7 +338,7 @@ public class SplitTransactionImpl implements 
SplitTransaction {
         MetaTableAccessor.splitRegion(server.getConnection(),
           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
           daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
-          parent.getTableDesc().getRegionReplication(), false);
+          parent.getTableDesc().getRegionReplication());
       } else {
         offlineParentInMetaAndputMetaEntries(server.getConnection(),
           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), 
daughterRegions

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b2b403b..d6f48b9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -49,7 +49,6 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@@ -296,19 +295,8 @@ public class Replication extends WALActionsListener.Base 
implements
         if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, 
WALEdit.BULK_LOAD)) {
           scopeBulkLoadEdits(htd, replicationManager, scopes, 
logKey.getTablename(), cell);
         } else {
-          WALProtos.RegionEventDescriptor maybeEvent = 
WALEdit.getRegionEventDescriptor(cell);
-          if (maybeEvent != null && (maybeEvent.getEventType() ==
-              WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
-            // In serially replication, we use scopes when reading close 
marker.
-            for (HColumnDescriptor cf : families) {
-              if (cf.getScope() != REPLICATION_SCOPE_LOCAL) {
-                scopes.put(cf.getName(), cf.getScope());
-              }
-            }
-          }
-          // Skip the flush/compaction
+          // Skip the flush/compaction/region events
           continue;
-
         }
       } else if (hasReplication) {
         byte[] family = CellUtil.cloneFamily(cell);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 10f2e7b..add1043 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,9 +18,6 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
@@ -49,7 +46,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -76,6 +72,10 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+
 /**
  * Class that handles the source of a replication stream.
  * Currently does not handle more than 1 slave
@@ -105,8 +105,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
   private String peerId;
-
-  String actualPeerId;
   // The manager of all sources to which we ping back our progress
   private ReplicationSourceManager manager;
   // Should we stop everything?
@@ -191,8 +189,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
-    ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(peerId);
-    this.actualPeerId = replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = 
this.conf.getInt("replication.source.log.queue.warn", 2);
     this.replicationEndpoint = replicationEndpoint;
 
@@ -523,16 +519,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     // Current state of the worker thread
     private WorkerState state;
     ReplicationSourceWALReaderThread entryReader;
-    // Use guava cache to set ttl for each key
-    private LoadingCache<String, Boolean> canSkipWaitingSet = 
CacheBuilder.newBuilder()
-        .expireAfterAccess(1, TimeUnit.DAYS).build(
-            new CacheLoader<String, Boolean>() {
-              @Override
-              public Boolean load(String key) throws Exception {
-                return false;
-              }
-            }
-        );
 
     public ReplicationSourceShipperThread(String walGroupId,
         PriorityBlockingQueue<Path> queue, ReplicationQueueInfo 
replicationQueueInfo,
@@ -568,9 +554,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
 
         try {
           WALEntryBatch entryBatch = entryReader.take();
-          for (Map.Entry<String, Long> entry : 
entryBatch.getLastSeqIds().entrySet()) {
-            waitingUntilCanPush(entry);
-          }
           shipEdits(entryBatch);
           releaseBufferQuota((int) entryBatch.getHeapSize());
           if (replicationQueueInfo.isQueueRecovered() && 
entryBatch.getWalEntries().isEmpty()
@@ -611,33 +594,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       }
     }
 
-    private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
-      String key = entry.getKey();
-      long seq = entry.getValue();
-      boolean deleteKey = false;
-      if (seq <= 0) {
-        // There is a REGION_CLOSE marker, we can not continue skipping after 
this entry.
-        deleteKey = true;
-        seq = -seq;
-      }
-
-      if (!canSkipWaitingSet.getUnchecked(key)) {
-        try {
-          manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
-        } catch (IOException e) {
-          LOG.error("waitUntilCanBePushed fail", e);
-          stopper.stop("waitUntilCanBePushed fail");
-        } catch (InterruptedException e) {
-          LOG.warn("waitUntilCanBePushed interrupted", e);
-          Thread.currentThread().interrupt();
-        }
-        canSkipWaitingSet.put(key, true);
-      }
-      if (deleteKey) {
-        canSkipWaitingSet.invalidate(key);
-      }
-    }
-
     private void cleanUpHFileRefs(WALEdit edit) throws IOException {
       String peerId = peerClusterZnode;
       if (peerId.contains("-")) {
@@ -682,8 +638,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       int sleepMultiplier = 0;
       if (entries.isEmpty()) {
         if (lastLoggedPosition != lastReadPosition) {
-          // Save positions to meta table before zk.
-          updateSerialRepPositions(entryBatch.getLastSeqIds());
           updateLogPosition(lastReadPosition);
           // if there was nothing to ship and it's not an error
           // set "ageOfLastShippedOp" to <now> to indicate that we're current
@@ -738,10 +692,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
             for (int i = 0; i < size; i++) {
               cleanUpHFileRefs(entries.get(i).getEdit());
             }
-
-            // Save positions to meta table before zk.
-            updateSerialRepPositions(entryBatch.getLastSeqIds());
-
             //Log and clean up WAL logs
             updateLogPosition(lastReadPosition);
           }
@@ -770,16 +720,6 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       }
     }
 
-    private void updateSerialRepPositions(Map<String, Long> 
lastPositionsForSerialScope) {
-      try {
-        MetaTableAccessor.updateReplicationPositions(manager.getConnection(), 
actualPeerId,
-          lastPositionsForSerialScope);
-      } catch (IOException e) {
-        LOG.error("updateReplicationPositions fail", e);
-        stopper.stop("updateReplicationPositions fail");
-      }
-    }
-
     private void updateLogPosition(long lastReadPosition) {
       manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, 
lastReadPosition,
         this.replicationQueueInfo.isQueueRecovered(), false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 6ec30de..63bba8d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -50,13 +49,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -70,7 +66,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 
 /**
@@ -125,8 +120,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   private final Random rand;
   private final boolean replicationForBulkLoadDataEnabled;
 
-  private Connection connection;
-  private long replicationWaitTime;
 
   private AtomicLong totalBufferUsed = new AtomicLong();
 
@@ -145,7 +138,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final ReplicationTracker 
replicationTracker,
       final Configuration conf, final Server server, final FileSystem fs, 
final Path logDir,
-      final Path oldLogDir, final UUID clusterId) throws IOException {
+      final Path oldLogDir, final UUID clusterId) {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@@ -182,9 +175,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     replicationForBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-    this.replicationWaitTime = 
conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
-          HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
-    connection = ConnectionFactory.createConnection(conf);
   }
 
   /**
@@ -830,10 +820,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    */
   public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
 
-  public Connection getConnection() {
-    return this.connection;
-  }
-
   /**
    * Get a string representation of all the sources' metrics
    */
@@ -860,75 +846,4 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void cleanUpHFileRefs(String peerId, List<String> files) {
     this.replicationQueues.removeHFileRefs(peerId, files);
   }
-
-  /**
-   * Whether an entry can be pushed to the peer or not right now.
-   * If we enable serial replication, we can not push the entry until all 
entries in its region
-   * whose sequence numbers are smaller than this entry have been pushed.
-   * For each ReplicationSource, we need only check the first entry in each 
region, as long as it
-   * can be pushed, we can push all in this ReplicationSource.
-   * This method will be blocked until we can push.
-   * @return the first barrier of entry's region, or -1 if there is no 
barrier. It is used to
-   *         prevent saving positions in the region of no barrier.
-   */
-  void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
-      throws IOException, InterruptedException {
-
-    /**
-     * There are barriers for this region and position for this peer. N 
barriers form N intervals,
-     * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id 
is not greater than
-     * the first barrier and the last interval is start from the last barrier.
-     *
-     * There are several conditions that we can push now, otherwise we should 
block:
-     * 1) "Serial replication" is not enabled, we can push all logs just like 
before. This case
-     *    should not call this method.
-     * 2) There is no barriers for this region, or the seq id is smaller than 
the first barrier.
-     *    It is mainly because we alter REPLICATION_SCOPE = 2. We can not 
guarantee the
-     *    order of logs that is written before altering.
-     * 3) This entry is in the first interval of barriers. We can push them 
because it is the
-     *    start of a region. Splitting/merging regions are also ok because the 
first section of
-     *    daughter region is in same region of parents and the order in one RS 
is guaranteed.
-     * 4) If the entry's seq id and the position are in same section, or the 
pos is the last
-     *    number of previous section. Because when open a region we put a 
barrier the number
-     *    is the last log's id + 1.
-     * 5) Log's seq is smaller than pos in meta, we are retrying. It may 
happen when a RS crashes
-     *    after save replication meta and before save zk offset.
-     */
-    List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, 
encodedName);
-    if (barriers.isEmpty() || seq <= barriers.get(0)) {
-      // Case 2
-      return;
-    }
-    int interval = Collections.binarySearch(barriers, seq);
-    if (interval < 0) {
-      interval = -interval - 1;// get the insert position if negative
-    }
-    if (interval == 1) {
-      // Case 3
-      return;
-    }
-
-    while (true) {
-      long pos = 
MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, 
peerId);
-      if (seq <= pos) {
-        // Case 5
-      }
-      if (pos >= 0) {
-        // Case 4
-        int posInterval = Collections.binarySearch(barriers, pos);
-        if (posInterval < 0) {
-          posInterval = -posInterval - 1;// get the insert position if negative
-        }
-        if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
-          return;
-        }
-      }
-
-      LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " 
+ peerId
-          + " because previous log has not been pushed: sequence=" + seq + " 
pos=" + pos
-          + " barriers=" + Arrays.toString(barriers.toArray()));
-      Thread.sleep(replicationWaitTime);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 40828b7..306ba8f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -141,10 +141,6 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
               batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
             }
             Entry entry = entryStream.next();
-            if (updateSerialReplPos(batch, entry)) {
-              batch.lastWalPosition = entryStream.getPosition();
-              break;
-            }
             entry = filterEntry(entry);
             if (entry != null) {
               WALEdit edit = entry.getEdit();
@@ -246,33 +242,6 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
   }
 
   /**
-   * @return true if we should stop reading because we're at REGION_CLOSE
-   */
-  private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws 
IOException {
-    if (entry.hasSerialReplicationScope()) {
-      String key = Bytes.toString(entry.getKey().getEncodedRegionName());
-      batch.setLastPosition(key, entry.getKey().getSequenceId());
-      if (!entry.getEdit().getCells().isEmpty()) {
-        WALProtos.RegionEventDescriptor maybeEvent =
-            
WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
-        if (maybeEvent != null && maybeEvent
-            .getEventType() == 
WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
-          // In serially replication, if we move a region to another RS and 
move it back, we may
-          // read logs crossing two sections. We should break at REGION_CLOSE 
and push the first
-          // section first in case of missing the middle section belonging to 
the other RS.
-          // In a worker thread, if we can push the first log of a region, we 
can push all logs
-          // in the same region without waiting until we read a close marker 
because next time
-          // we read logs in this region, it must be a new section and not 
adjacent with this
-          // region. Mark it negative.
-          batch.setLastPosition(key, -entry.getKey().getSequenceId());
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  /**
    * Retrieves the next batch of WAL entries from the queue, waiting up to the 
specified time for a
    * batch to become available
    * @return A batch of entries, along with the position in the log after 
reading the batch

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 2e34b64..cc2f42a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -271,18 +270,6 @@ public interface WAL extends Closeable {
       key.setCompressionContext(compressionContext);
     }
 
-    public boolean hasSerialReplicationScope () {
-      if (getKey().getScopes() == null || getKey().getScopes().isEmpty()) {
-        return false;
-      }
-      for (Map.Entry<byte[], Integer> e:getKey().getScopes().entrySet()) {
-        if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
-          return true;
-        }
-      }
-      return false;
-    }
-
     @Override
     public String toString() {
       return this.key + "=" + this.edit;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 98fff27..cb2494b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -443,7 +443,7 @@ public class TestMetaTableAccessor {
       List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, 
serverName0, 3, false);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, 
serverName0, 3);
 
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -472,7 +472,7 @@ public class TestMetaTableAccessor {
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
       MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, 
serverName0, 3,
-          HConstants.LATEST_TIMESTAMP, false);
+          HConstants.LATEST_TIMESTAMP);
 
       assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
       assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -556,7 +556,7 @@ public class TestMetaTableAccessor {
 
       // now merge the regions, effectively deleting the rows for region a and 
b.
       MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
-        regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
+        regionInfoA, regionInfoB, sn, 1, masterSystemTime);
 
       result = meta.get(get);
       serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@@ -639,7 +639,7 @@ public class TestMetaTableAccessor {
       }
       SpyingRpcScheduler scheduler = (SpyingRpcScheduler) 
rs.getRpcServer().getScheduler();
       long prevCalls = scheduler.numPriorityCalls;
-      MetaTableAccessor.splitRegion(connection, parent, splitA, 
splitB,loc.getServerName(),1,false);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, 
loc.getServerName(), 1);
 
       assertTrue(prevCalls < scheduler.numPriorityCalls);
     }
@@ -661,7 +661,7 @@ public class TestMetaTableAccessor {
       List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, 
serverName0, 3, false);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, 
serverName0, 3);
       Get get1 = new Get(splitA.getRegionName());
       Result resultA = meta.get(get1);
       Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
index bca8cf3..a91560e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
@@ -166,7 +166,7 @@ public class TestMetaScanner {
               end);
 
             MetaTableAccessor.splitRegion(connection,
-              parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 
1, false);
+              parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 
1);
 
             Threads.sleep(random.nextInt(200));
           } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba7a936f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 78b23c0..69dfa40 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -1317,7 +1317,7 @@ public class TestAssignmentManagerOnCluster {
     }
     conf.setInt("hbase.regionstatestore.meta.connection", 3);
     final RegionStateStore rss =
-        new RegionStateStore(new MyMaster(conf, new 
ZkCoordinatedStateManager()));
+        new RegionStateStore(new MyRegionServer(conf, new 
ZkCoordinatedStateManager()));
     rss.start();
     // Create 10 threads and make each do 10 puts related to region state 
update
     Thread[] th = new Thread[10];

Reply via email to