HBASE-19661 Replace ReplicationStateZKBase with ZKReplicationStorageBase

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/69b3fe53
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/69b3fe53
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/69b3fe53

Branch: refs/heads/HBASE-19397-branch-2
Commit: 69b3fe5384995db593bc989841cdf4195badf5df
Parents: bbad78d
Author: huzheng <open...@gmail.com>
Authored: Fri Dec 29 15:55:28 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Jan 12 09:59:39 2018 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationFactory.java   |   5 +-
 .../replication/ReplicationStateZKBase.java     | 153 -------------------
 .../replication/ReplicationTrackerZKImpl.java   |  21 +--
 .../replication/ZKReplicationPeerStorage.java   |  24 ++-
 .../replication/ZKReplicationStorageBase.java   |  13 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |   4 +-
 .../master/ReplicationPeerConfigUpgrader.java   | 128 ++++++++--------
 .../regionserver/DumpReplicationQueues.java     |  18 +--
 .../replication/regionserver/Replication.java   |   3 +-
 .../org/apache/hadoop/hbase/util/HBaseFsck.java |   3 +-
 .../TestReplicationTrackerZKImpl.java           |   3 +-
 .../replication/master/TestTableCFsUpdater.java |  41 ++---
 .../TestReplicationSourceManager.java           |   6 +-
 13 files changed, 136 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 6c66aff..2a970ba 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -33,9 +33,8 @@ public class ReplicationFactory {
     return new ReplicationPeers(zk, conf);
   }
 
-  public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,
-      final ReplicationPeers replicationPeers, Configuration conf, Abortable 
abortable,
+  public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, 
Abortable abortable,
       Stoppable stopper) {
-    return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, 
abortable, stopper);
+    return new ReplicationTrackerZKImpl(zookeeper, abortable, stopper);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
deleted file mode 100644
index f49537c..0000000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This is a base class for maintaining replication state in zookeeper.
- */
-@InterfaceAudience.Private
-public abstract class ReplicationStateZKBase {
-
-  /**
-   * The name of the znode that contains the replication status of a remote 
slave (i.e. peer)
-   * cluster.
-   */
-  protected final String peerStateNodeName;
-  /** The name of the base znode that contains all replication state. */
-  protected final String replicationZNode;
-  /** The name of the znode that contains a list of all remote slave (i.e. 
peer) clusters. */
-  protected final String peersZNode;
-  /** The name of the znode that contains all replication queues */
-  protected final String queuesZNode;
-  /** The name of the znode that contains queues of hfile references to be 
replicated */
-  protected final String hfileRefsZNode;
-  /** The cluster key of the local cluster */
-  protected final String ourClusterKey;
-  /** The name of the znode that contains tableCFs */
-  protected final String tableCFsNodeName;
-
-  protected final ZKWatcher zookeeper;
-  protected final Configuration conf;
-  protected final Abortable abortable;
-
-  public static final byte[] ENABLED_ZNODE_BYTES =
-      toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
-  public static final byte[] DISABLED_ZNODE_BYTES =
-      toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
-  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
-      "zookeeper.znode.replication.hfile.refs";
-  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = 
"hfile-refs";
-
-  public ReplicationStateZKBase(ZKWatcher zookeeper, Configuration conf,
-                                Abortable abortable) {
-    this.zookeeper = zookeeper;
-    this.conf = conf;
-    this.abortable = abortable;
-
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
-    String peersZNodeName = conf.get("zookeeper.znode.replication.peers", 
"peers");
-    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
-    String hfileRefsZNodeName = 
conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
-      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
-    this.peerStateNodeName = 
conf.get("zookeeper.znode.replication.peers.state", "peer-state");
-    this.tableCFsNodeName = 
conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
-    this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
-    this.replicationZNode = 
ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode,
-      replicationZNodeName);
-    this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
-    this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
-    this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, 
hfileRefsZNodeName);
-  }
-
-  public List<String> getListOfReplicators() {
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get list of replicators", e);
-    }
-    return result;
-  }
-
-  /**
-   * @param state
-   * @return Serialized protobuf of <code>state</code> with pb magic prefix 
prepended suitable for
-   *         use as content of a peer-state znode under a peer cluster id as in
-   *         /hbase/replication/peers/PEER_ID/peer-state.
-   */
-  protected static byte[] toByteArray(final 
ReplicationProtos.ReplicationState.State state) {
-    ReplicationProtos.ReplicationState msg =
-        
ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
-    // There is no toByteArray on this pb Message?
-    // 32 bytes is default which seems fair enough here.
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16);
-      msg.writeTo(cos);
-      cos.flush();
-      baos.flush();
-      return ProtobufUtil.prependPBMagic(baos.toByteArray());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  protected boolean peerExists(String id) throws KeeperException {
-    return ZKUtil.checkExists(this.zookeeper, 
ZNodePaths.joinZNode(this.peersZNode, id)) >= 0;
-  }
-
-  /**
-   * Determine if a ZK path points to a peer node.
-   * @param path path to be checked
-   * @return true if the path points to a peer node, otherwise false
-   */
-  protected boolean isPeerPath(String path) {
-    return path.split("/").length == peersZNode.split("/").length + 1;
-  }
-
-  @VisibleForTesting
-  protected String getTableCFsNode(String id) {
-    return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, 
this.tableCFsNodeName));
-  }
-
-  @VisibleForTesting
-  protected String getPeerStateNode(String id) {
-    return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, 
this.peerStateNodeName));
-  }
-  @VisibleForTesting
-  protected String getPeerNode(String id) {
-    return ZNodePaths.joinZNode(this.peersZNode, id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
index 5659e4b..16a1668 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
@@ -20,14 +20,12 @@ package org.apache.hadoop.hbase.replication;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,9 +36,14 @@ import org.slf4j.LoggerFactory;
  * interface.
  */
 @InterfaceAudience.Private
-public class ReplicationTrackerZKImpl extends ReplicationStateZKBase 
implements ReplicationTracker {
+public class ReplicationTrackerZKImpl implements ReplicationTracker {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationTrackerZKImpl.class);
+
+  // Zookeeper
+  private final ZKWatcher zookeeper;
+  // Server to abort.
+  private final Abortable abortable;
   // All about stopping
   private final Stoppable stopper;
   // listeners to be notified
@@ -48,9 +51,9 @@ public class ReplicationTrackerZKImpl extends 
ReplicationStateZKBase implements
   // List of all the other region servers in this cluster
   private final ArrayList<String> otherRegionServers = new ArrayList<>();
 
-  public ReplicationTrackerZKImpl(ZKWatcher zookeeper, final ReplicationPeers 
replicationPeers,
-      Configuration conf, Abortable abortable, Stoppable stopper) {
-    super(zookeeper, conf, abortable);
+  public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, 
Stoppable stopper) {
+    this.zookeeper = zookeeper;
+    this.abortable = abortable;
     this.stopper = stopper;
     this.zookeeper.registerListener(new 
OtherRegionServerWatcher(this.zookeeper));
     // watch the changes

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index 42d4b3f..a53500a 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 
@@ -36,7 +37,14 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
  * ZK based replication peer storage.
  */
 @InterfaceAudience.Private
-class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements 
ReplicationPeerStorage {
+public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
+    implements ReplicationPeerStorage {
+
+  public static final String PEERS_ZNODE = "zookeeper.znode.replication.peers";
+  public static final String PEERS_ZNODE_DEFAULT = "peers";
+
+  public static final String PEERS_STATE_ZNODE = 
"zookeeper.znode.replication.peers.state";
+  public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
 
   public static final byte[] ENABLED_ZNODE_BYTES =
     toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
@@ -56,16 +64,18 @@ class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase implements Repli
 
   public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) {
     super(zookeeper, conf);
-    this.peerStateNodeName = 
conf.get("zookeeper.znode.replication.peers.state", "peer-state");
-    String peersZNodeName = conf.get("zookeeper.znode.replication.peers", 
"peers");
+    this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, 
PEERS_STATE_ZNODE_DEFAULT);
+    String peersZNodeName = conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT);
     this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
   }
 
-  private String getPeerStateNode(String peerId) {
+  @VisibleForTesting
+  public String getPeerStateNode(String peerId) {
     return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName);
   }
 
-  private String getPeerNode(String peerId) {
+  @VisibleForTesting
+  public String getPeerNode(String peerId) {
     return ZNodePaths.joinZNode(peersZNode, peerId);
   }
 
@@ -82,8 +92,8 @@ class ZKReplicationPeerStorage extends 
ZKReplicationStorageBase implements Repli
             enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
         false);
     } catch (KeeperException e) {
-      throw new ReplicationException("Could not add peer with id=" + peerId + 
", peerConfif=>" +
-        peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
+      throw new ReplicationException("Could not add peer with id=" + peerId + 
", peerConfif=>"
+          + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index 2321e4f..7190aeb 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@@ -34,7 +35,10 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
  * zookeeper.
  */
 @InterfaceAudience.Private
-class ZKReplicationStorageBase {
+public class ZKReplicationStorageBase {
+
+  public static final String REPLICATION_ZNODE = "zookeeper.znode.replication";
+  public static final String REPLICATION_ZNODE_DEFAULT = "replication";
 
   /** The name of the base znode that contains all replication state. */
   protected final String replicationZNode;
@@ -45,10 +49,9 @@ class ZKReplicationStorageBase {
   protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) {
     this.zookeeper = zookeeper;
     this.conf = conf;
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
 
-    this.replicationZNode =
-        ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, 
replicationZNodeName);
+    this.replicationZNode = 
ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode,
+      conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT));
   }
 
   /**
@@ -58,7 +61,7 @@ class ZKReplicationStorageBase {
    */
   protected static byte[] toByteArray(final 
ReplicationProtos.ReplicationState.State state) {
     ReplicationProtos.ReplicationState msg =
-      ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
+        
ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
     // There is no toByteArray on this pb Message?
     // 32 bytes is default which seems fair enough here.
     try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 00eb592..64522c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -817,8 +817,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     // This is for backwards compatibility
     // See HBASE-11393
     status.setStatus("Update TableCFs node in ZNode");
-    ReplicationPeerConfigUpgrader tableCFsUpdater = new 
ReplicationPeerConfigUpgrader(zooKeeper,
-            conf, this.clusterConnection);
+    ReplicationPeerConfigUpgrader tableCFsUpdater =
+        new ReplicationPeerConfigUpgrader(zooKeeper, conf);
     tableCFsUpdater.copyTableCFs();
 
     // Add the Observer to delete space quotas on table deletion before 
starting all CPs by

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
index ea5509f..b6e8862 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
@@ -18,96 +18,107 @@
  */
 package org.apache.hadoop.hbase.replication.master;
 
+import static 
org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE;
+import static 
org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE_DEFAULT;
+import static 
org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE;
+import static 
org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT;
+
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
- * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to 
HBase 1.4 or 2.x.
- * It will be removed in HBase 3.x. See HBASE-11393
+ * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to 
HBase 1.4 or 2.x. It will
+ * be removed in HBase 3.x. See HBASE-11393
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
+public class ReplicationPeerConfigUpgrader{
+
+  private static final String TABLE_CFS_ZNODE = 
"zookeeper.znode.replication.peers.tableCFs";
+  private static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs";
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class);
+  private final Configuration conf;
+  private final ZKWatcher zookeeper;
+  private final ReplicationPeerStorage peerStorage;
 
-  public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper,
-                         Configuration conf, Abortable abortable) {
-    super(zookeeper, conf, abortable);
+  public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration 
conf) {
+    this.zookeeper = zookeeper;
+    this.conf = conf;
+    this.peerStorage = 
ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
   }
 
   public void upgrade() throws Exception {
     try (Connection conn = ConnectionFactory.createConnection(conf)) {
       Admin admin = conn.getAdmin();
-      admin.listReplicationPeers().forEach(
-        (peerDesc) -> {
-          String peerId = peerDesc.getPeerId();
-          ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
-          if ((peerConfig.getNamespaces() != null && 
!peerConfig.getNamespaces().isEmpty())
-              || (peerConfig.getTableCFsMap() != null && 
!peerConfig.getTableCFsMap().isEmpty())) {
-            peerConfig.setReplicateAllUserTables(false);
-            try {
-              admin.updateReplicationPeerConfig(peerId, peerConfig);
-            } catch (Exception e) {
-              LOG.error("Failed to upgrade replication peer config for 
peerId=" + peerId, e);
-            }
+      admin.listReplicationPeers().forEach((peerDesc) -> {
+        String peerId = peerDesc.getPeerId();
+        ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
+        if ((peerConfig.getNamespaces() != null && 
!peerConfig.getNamespaces().isEmpty())
+            || (peerConfig.getTableCFsMap() != null && 
!peerConfig.getTableCFsMap().isEmpty())) {
+          peerConfig.setReplicateAllUserTables(false);
+          try {
+            admin.updateReplicationPeerConfig(peerId, peerConfig);
+          } catch (Exception e) {
+            LOG.error("Failed to upgrade replication peer config for peerId=" 
+ peerId, e);
           }
-        });
+        }
+      });
     }
   }
 
-  public void copyTableCFs() {
-    List<String> znodes = null;
-    try {
-      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-    } catch (KeeperException e) {
-      LOG.error("Failed to get peers znode", e);
-    }
-    if (znodes != null) {
-      for (String peerId : znodes) {
-        if (!copyTableCFs(peerId)) {
-          LOG.error("upgrade tableCFs failed for peerId=" + peerId);
-        }
+  public void copyTableCFs() throws ReplicationException {
+    for (String peerId : peerStorage.listPeerIds()) {
+      if (!copyTableCFs(peerId)) {
+        LOG.error("upgrade tableCFs failed for peerId=" + peerId);
       }
     }
   }
 
-  public boolean copyTableCFs(String peerId) {
+  @VisibleForTesting
+  protected String getTableCFsNode(String peerId) {
+    String replicationZNode = 
ZNodePaths.joinZNode(zookeeper.znodePaths.baseZNode,
+      conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT));
+    String peersZNode =
+        ZNodePaths.joinZNode(replicationZNode, conf.get(PEERS_ZNODE, 
PEERS_ZNODE_DEFAULT));
+    return ZNodePaths.joinZNode(peersZNode,
+      ZNodePaths.joinZNode(peerId, conf.get(TABLE_CFS_ZNODE, 
TABLE_CFS_ZNODE_DEFAULT)));
+  }
+
+  public boolean copyTableCFs(String peerId) throws ReplicationException {
     String tableCFsNode = getTableCFsNode(peerId);
     try {
       if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
-        String peerNode = getPeerNode(peerId);
-        ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
+        ReplicationPeerConfig rpc = peerStorage.getPeerConfig(peerId);
         // We only need to copy data from tableCFs node to rpc Node the first 
time hmaster start.
         if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
           // we copy TableCFs node into PeerNode
           LOG.info("copy tableCFs into peerNode:" + peerId);
           ReplicationProtos.TableCF[] tableCFs =
-                  ReplicationPeerConfigUtil.parseTableCFs(
-                          ZKUtil.getData(this.zookeeper, tableCFsNode));
+              
ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, 
tableCFsNode));
           if (tableCFs != null && tableCFs.length > 0) {
             
rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
-            ZKUtil.setData(this.zookeeper, peerNode,
-              ReplicationPeerConfigUtil.toByteArray(rpc));
+            peerStorage.updatePeerConfig(peerId, rpc);
           }
         } else {
           LOG.info("No tableCFs in peerNode:" + peerId);
@@ -126,23 +137,6 @@ public class ReplicationPeerConfigUpgrader extends 
ReplicationStateZKBase {
     return true;
   }
 
-  private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
-          throws KeeperException, InterruptedException {
-    byte[] data = null;
-    data = ZKUtil.getData(this.zookeeper, peerNode);
-    if (data == null) {
-      LOG.error("Could not get configuration for " +
-              "peer because it doesn't exist. peer=" + peerNode);
-      return null;
-    }
-    try {
-      return ReplicationPeerConfigUtil.parsePeerFrom(data);
-    } catch (DeserializationException e) {
-      LOG.warn("Failed to parse cluster key from peer=" + peerNode);
-      return null;
-    }
-  }
-
   private static void printUsageAndExit() {
     System.err.printf(
       "Usage: hbase 
org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
@@ -163,19 +157,17 @@ public class ReplicationPeerConfigUpgrader extends 
ReplicationStateZKBase {
       printUsageAndExit();
     } else if (args[0].equals("copyTableCFs")) {
       Configuration conf = HBaseConfiguration.create();
-      ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", 
null);
-      try {
-        ReplicationPeerConfigUpgrader tableCFsUpdater = new 
ReplicationPeerConfigUpgrader(zkw,
-            conf, null);
+      try (ZKWatcher zkw = new ZKWatcher(conf, 
"ReplicationPeerConfigUpgrader", null)) {
+        ReplicationPeerConfigUpgrader tableCFsUpdater =
+            new ReplicationPeerConfigUpgrader(zkw, conf);
         tableCFsUpdater.copyTableCFs();
-      } finally {
-        zkw.close();
       }
     } else if (args[0].equals("upgrade")) {
       Configuration conf = HBaseConfiguration.create();
-      ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", 
null);
-      ReplicationPeerConfigUpgrader upgrader = new 
ReplicationPeerConfigUpgrader(zkw, conf, null);
-      upgrader.upgrade();
+      try (ZKWatcher zkw = new ZKWatcher(conf, 
"ReplicationPeerConfigUpgrader", null)) {
+        ReplicationPeerConfigUpgrader upgrader = new 
ReplicationPeerConfigUpgrader(zkw, conf);
+        upgrader.upgrade();
+      }
     } else {
       printUsageAndExit();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 27bda2d..22e8628 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
@@ -237,7 +236,7 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
         LOG.info("Found [--distributed], will poll each RegionServer.");
         Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId())
             .collect(Collectors.toSet());
-        System.out.println(dumpQueues(connection, zkw, peerIds, 
opts.isHdfs()));
+        System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
         System.out.println(dumpReplicationSummary());
       } else {
         // use ZK instead
@@ -301,18 +300,15 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
     return sb.toString();
   }
 
-  public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, 
Set<String> peerIds,
+  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds,
       boolean hdfs) throws Exception {
     ReplicationQueueStorage queueStorage;
-    ReplicationPeers replicationPeers;
     ReplicationTracker replicationTracker;
     StringBuilder sb = new StringBuilder();
 
     queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, 
getConf());
-    replicationPeers =
-        ReplicationFactory.getReplicationPeers(zkw, getConf());
-    replicationTracker = ReplicationFactory.getReplicationTracker(zkw, 
replicationPeers, getConf(),
-      new WarnOnlyAbortable(), new WarnOnlyStoppable());
+    replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new 
WarnOnlyAbortable(),
+      new WarnOnlyStoppable());
     Set<String> liveRegionServers = new 
HashSet<>(replicationTracker.getListOfRegionServers());
 
     // Loops each peer on each RS and dumps the queues
@@ -330,11 +326,9 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
         List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
         if (!peerIds.contains(queueInfo.getPeerId())) {
           deletedQueues.add(regionserver + "/" + queueId);
-          sb.append(
-            formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, 
true, hdfs));
+          sb.append(formatQueue(regionserver, queueStorage, queueInfo, 
queueId, wals, true, hdfs));
         } else {
-          sb.append(
-            formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, 
false, hdfs));
+          sb.append(formatQueue(regionserver, queueStorage, queueInfo, 
queueId, wals, false, hdfs));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index f985f90..dca2439 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -132,8 +132,7 @@ public class Replication implements
           ReplicationFactory.getReplicationPeers(server.getZooKeeper(), 
this.conf);
       this.replicationPeers.init();
       this.replicationTracker =
-          ReplicationFactory.getReplicationTracker(server.getZooKeeper(), 
this.replicationPeers,
-            this.conf, this.server, this.server);
+          ReplicationFactory.getReplicationTracker(server.getZooKeeper(), 
this.server, this.server);
     } catch (Exception e) {
       throw new IOException("Failed replication handler create", e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 8b5a4e6..40ad2bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -330,8 +330,7 @@ public class HBaseFsck extends Configured implements 
Closeable {
    * @throws MasterNotRunningException if the master is not running
    * @throws ZooKeeperConnectionException if unable to connect to ZooKeeper
    */
-  public HBaseFsck(Configuration conf) throws MasterNotRunningException,
-      ZooKeeperConnectionException, IOException, ClassNotFoundException {
+  public HBaseFsck(Configuration conf) throws IOException, 
ClassNotFoundException {
     this(conf, createThreadPool(conf));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index fdfa6b7..757d9a9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -90,7 +90,8 @@ public class TestReplicationTrackerZKImpl {
       ZKClusterId.setClusterId(zkw, new ClusterId());
       rp = ReplicationFactory.getReplicationPeers(zkw, conf);
       rp.init();
-      rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new 
DummyServer(fakeRs1));
+      rt = ReplicationFactory.getReplicationTracker(zkw, new 
DummyServer(fakeRs1),
+        new DummyServer(fakeRs1));
     } catch (Exception e) {
       fail("Exception during test setup: " + e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
index 2993043..19acc75 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
@@ -25,14 +25,13 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -57,12 +56,19 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
 
   private static ZKWatcher zkw = null;
   private static Abortable abortable = null;
+  private static ZKStorageUtil zkStorageUtil = null;
+
+  private static class ZKStorageUtil extends ZKReplicationPeerStorage {
+    public ZKStorageUtil(ZKWatcher zookeeper, Configuration conf) {
+      super(zookeeper, conf);
+    }
+  }
 
   @Rule
   public TestName name = new TestName();
 
   public TestTableCFsUpdater() {
-    super(zkw, TEST_UTIL.getConfiguration(), abortable);
+    super(zkw, TEST_UTIL.getConfiguration());
   }
 
   @BeforeClass
@@ -81,6 +87,7 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
       }
     };
     zkw = new ZKWatcher(conf, "TableCFs", abortable, true);
+    zkStorageUtil = new ZKStorageUtil(zkw, conf);
   }
 
   @AfterClass
@@ -89,8 +96,7 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
   }
 
   @Test
-  public void testUpgrade() throws KeeperException, InterruptedException,
-      DeserializationException {
+  public void testUpgrade() throws Exception {
     String peerId = "1";
     final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
     final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
@@ -98,13 +104,13 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(zkw.getQuorum());
-    String peerNode = getPeerNode(peerId);
+    String peerNode = zkStorageUtil.getPeerNode(peerId);
     ZKUtil.createWithParents(zkw, peerNode, 
ReplicationPeerConfigUtil.toByteArray(rpc));
 
     String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + 
tableName3;
     String tableCFsNode = getTableCFsNode(peerId);
     LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
-    ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+    ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
 
     ReplicationPeerConfig actualRpc =
         ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
@@ -117,13 +123,13 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
     peerId = "2";
     rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(zkw.getQuorum());
-    peerNode = getPeerNode(peerId);
+    peerNode = zkStorageUtil.getPeerNode(peerId);
     ZKUtil.createWithParents(zkw, peerNode, 
ReplicationPeerConfigUtil.toByteArray(rpc));
 
     tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2";
     tableCFsNode = getTableCFsNode(peerId);
     LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
-    ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+    ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
 
     actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
@@ -135,13 +141,13 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
     peerId = "3";
     rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(zkw.getQuorum());
-    peerNode = getPeerNode(peerId);
+    peerNode = zkStorageUtil.getPeerNode(peerId);
     ZKUtil.createWithParents(zkw, peerNode, 
ReplicationPeerConfigUtil.toByteArray(rpc));
 
     tableCFs = "";
     tableCFsNode = getTableCFsNode(peerId);
     LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
-    ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+    ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
 
     actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
@@ -153,7 +159,7 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
     peerId = "4";
     rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(zkw.getQuorum());
-    peerNode = getPeerNode(peerId);
+    peerNode = zkStorageUtil.getPeerNode(peerId);
     ZKUtil.createWithParents(zkw, peerNode, 
ReplicationPeerConfigUtil.toByteArray(rpc));
 
     tableCFsNode = getTableCFsNode(peerId);
@@ -167,7 +173,7 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
     copyTableCFs();
 
     peerId = "1";
-    peerNode = getPeerNode(peerId);
+    peerNode = zkStorageUtil.getPeerNode(peerId);
     actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
     Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
@@ -182,9 +188,8 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
     assertEquals("cf3", tableNameListMap.get(tableName2).get(0));
     assertNull(tableNameListMap.get(tableName3));
 
-
     peerId = "2";
-    peerNode = getPeerNode(peerId);
+    peerNode = zkStorageUtil.getPeerNode(peerId);
     actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
     tableNameListMap = actualRpc.getTableCFsMap();
@@ -198,19 +203,17 @@ public class TestTableCFsUpdater extends 
ReplicationPeerConfigUpgrader {
     assertEquals("cf2", tableNameListMap.get(tableName2).get(0));
 
     peerId = "3";
-    peerNode = getPeerNode(peerId);
+    peerNode = zkStorageUtil.getPeerNode(peerId);
     actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
     tableNameListMap = actualRpc.getTableCFsMap();
     assertNull(tableNameListMap);
 
     peerId = "4";
-    peerNode = getPeerNode(peerId);
+    peerNode = zkStorageUtil.getPeerNode(peerId);
     actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, 
peerNode));
     assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
     tableNameListMap = actualRpc.getTableCFsMap();
     assertNull(tableNameListMap);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69b3fe53/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index f4d3901..1001aa5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -68,8 +68,8 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -163,9 +163,9 @@ public abstract class TestReplicationSourceManager {
             + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
     ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
-      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+      ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
     ZKUtil.createWithParents(zkw, "/hbase/replication/state");
-    ZKUtil.setData(zkw, "/hbase/replication/state", 
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.setData(zkw, "/hbase/replication/state", 
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
 
     ZKClusterId.setClusterId(zkw, new ClusterId());
     FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());

Reply via email to