Repository: hbase
Updated Branches:
  refs/heads/master 5147fb12a -> 61ff6ced5


HBASE-16018 Refactored the ReplicationPeers interface to clear up what some 
methods do and move away from a ZooKeeper-specific implementation.

Also added some documentation for undocumented methods.

Signed-off-by: Elliott Clark <ecl...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61ff6ced
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61ff6ced
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61ff6ced

Branch: refs/heads/master
Commit: 61ff6ced5bea3586129fb0844bbf64b122775b42
Parents: 5147fb1
Author: Joseph Hwang <j...@fb.com>
Authored: Tue Jun 14 08:58:49 2016 -0700
Committer: Elliott Clark <ecl...@apache.org>
Committed: Fri Jun 17 13:04:21 2016 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |  8 ++--
 .../hbase/replication/ReplicationPeers.java     | 41 +++++++++++++++-----
 .../replication/ReplicationPeersZKImpl.java     | 17 ++++----
 .../regionserver/ReplicationSource.java         |  2 +-
 .../regionserver/ReplicationSourceManager.java  | 14 +++----
 .../cleaner/TestReplicationHFileCleaner.java    |  4 +-
 .../replication/TestReplicationStateBasic.java  | 40 +++++++++----------
 .../TestReplicationStateHBaseImpl.java          |  6 +--
 .../TestReplicationTrackerZKImpl.java           | 18 ++++-----
 9 files changed, 85 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index e0985bd..b04d317 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -183,7 +183,7 @@ public class ReplicationAdmin implements Closeable {
     if (tableCfs != null) {
       peerConfig.setTableCFsMap(tableCfs);
     }
-    this.replicationPeers.addPeer(id, peerConfig);
+    this.replicationPeers.registerPeer(id, peerConfig);
   }
 
   /**
@@ -192,7 +192,7 @@ public class ReplicationAdmin implements Closeable {
    * @param peerConfig configuration for the replication slave cluster
    */
   public void addPeer(String id, ReplicationPeerConfig peerConfig) throws 
ReplicationException {
-    this.replicationPeers.addPeer(id, peerConfig);
+    this.replicationPeers.registerPeer(id, peerConfig);
   }
 
   /**
@@ -212,7 +212,7 @@ public class ReplicationAdmin implements Closeable {
    * @param id a short name that identifies the cluster
    */
   public void removePeer(String id) throws ReplicationException {
-    this.replicationPeers.removePeer(id);
+    this.replicationPeers.unregisterPeer(id);
   }
 
   /**
@@ -556,7 +556,7 @@ public class ReplicationAdmin implements Closeable {
 
   @VisibleForTesting
   public void peerAdded(String id) throws ReplicationException {
-    this.replicationPeers.peerAdded(id);
+    this.replicationPeers.peerConnected(id);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 9f70d95..2a7963a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -51,18 +51,30 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @param peerConfig configuration for the replication slave cluster
    */
-  void addPeer(String peerId, ReplicationPeerConfig peerConfig)
+  void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
       throws ReplicationException;
 
   /**
    * Removes a remote slave cluster and stops the replication to it.
    * @param peerId a short that identifies the cluster
    */
-  void removePeer(String peerId) throws ReplicationException;
+  void unregisterPeer(String peerId) throws ReplicationException;
 
-  boolean peerAdded(String peerId) throws ReplicationException;
+  /**
+   * Method called after a peer has been connected. It will create a 
ReplicationPeer to track the
+   * newly connected cluster.
+   * @param peerId a short that identifies the cluster
+   * @return whether a ReplicationPeer was successfully created
+   * @throws ReplicationException
+   */
+  boolean peerConnected(String peerId) throws ReplicationException;
 
-  void peerRemoved(String peerId);
+  /**
+   * Method called after a peer has been disconnected. It will remove the 
ReplicationPeer that
+   * tracked the disconnected cluster.
+   * @param peerId a short that identifies the cluster
+   */
+  void peerDisconnected(String peerId);
 
   /**
    * Restart the replication to the specified remote slave cluster.
@@ -77,14 +89,14 @@ public interface ReplicationPeers {
   void disablePeer(String peerId) throws ReplicationException;
 
   /**
-   * Get the table and column-family list string of the peer from ZK.
+   * Get the table and column-family list string of the peer from the 
underlying storage.
    * @param peerId a short that identifies the cluster
    */
   public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
       throws ReplicationException;
 
   /**
-   * Set the table and column-family list string of the peer to ZK.
+   * Set the table and column-family list string of the peer to the underlying 
storage.
    * @param peerId a short that identifies the cluster
    * @param tableCFs the table and column-family list which will be replicated 
for this peer
    */
@@ -93,17 +105,20 @@ public interface ReplicationPeers {
       throws ReplicationException;
 
   /**
-   * Returns the ReplicationPeer
+   * Returns the ReplicationPeer for the specified connected peer. This 
ReplicationPeer will
+   * continue to track changes to the Peer's state and config. This method 
returns null if no
+   * peer has been connected with the given peerId.
    * @param peerId id for the peer
    * @return ReplicationPeer object
    */
-  ReplicationPeer getPeer(String peerId);
+  ReplicationPeer getConnectedPeer(String peerId);
 
   /**
-   * Returns the set of peerIds defined
+   * Returns the set of peerIds of the clusters that have been connected and 
have an underlying
+   * ReplicationPeer.
    * @return a Set of Strings for peerIds
    */
-  public Set<String> getPeerIds();
+  public Set<String> getConnectedPeerIds();
 
   /**
    * Get the replication status for the specified connected remote slave 
cluster.
@@ -152,5 +167,11 @@ public interface ReplicationPeers {
    */
   Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws 
ReplicationException;
 
+  /**
+   * Update the peerConfig for the a given peer cluster
+   * @param id a short that identifies the cluster
+   * @param peerConfig new config for the peer cluster
+   * @throws ReplicationException
+   */
   void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws 
ReplicationException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 5af97c2..54c2dac 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
 
 /**
  * This class provides an implementation of the ReplicationPeers interface 
using ZooKeeper. The
@@ -105,7 +104,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
   }
 
   @Override
-  public void addPeer(String id, ReplicationPeerConfig peerConfig)
+  public void registerPeer(String id, ReplicationPeerConfig peerConfig)
       throws ReplicationException {
     try {
       if (peerExists(id)) {
@@ -148,7 +147,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
   }
 
   @Override
-  public void removePeer(String id) throws ReplicationException {
+  public void unregisterPeer(String id) throws ReplicationException {
     try {
       if (!peerExists(id)) {
         throw new IllegalArgumentException("Cannot remove peer with id=" + id
@@ -219,7 +218,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
   public boolean getStatusOfPeer(String id) {
     ReplicationPeer replicationPeer = this.peerClusters.get(id);
     if (replicationPeer == null) {
-      throw new IllegalArgumentException("Peer with id= " + id + " is not 
connected");
+      throw new IllegalArgumentException("Peer with id= " + id + " is not 
cached");
     }
     return replicationPeer.getPeerState() == PeerState.ENABLED;
   }
@@ -270,12 +269,12 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
   }
 
   @Override
-  public ReplicationPeer getPeer(String peerId) {
+  public ReplicationPeer getConnectedPeer(String peerId) {
     return peerClusters.get(peerId);
   }
 
   @Override
-  public Set<String> getPeerIds() {
+  public Set<String> getConnectedPeerIds() {
     return peerClusters.keySet(); // this is not thread-safe
   }
 
@@ -342,7 +341,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
   @Override
   public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
       throws ReplicationException {
-    ReplicationPeer peer = getPeer(id);
+    ReplicationPeer peer = getConnectedPeer(id);
     if (peer == null){
       throw new ReplicationException("Could not find peer Id " + id);
     }
@@ -411,12 +410,12 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
   }
 
   @Override
-  public boolean peerAdded(String peerId) throws ReplicationException {
+  public boolean peerConnected(String peerId) throws ReplicationException {
     return createAndAddPeer(peerId);
   }
 
   @Override
-  public void peerRemoved(String peerId) {
+  public void peerDisconnected(String peerId) {
     ReplicationPeer rp = this.peerClusters.get(peerId);
     if (rp != null) {
       ((ConcurrentMap<String, ReplicationPeerZKImpl>) 
peerClusters).remove(peerId, rp);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 84e0787..2f3b2a8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -237,7 +237,7 @@ public class ReplicationSource extends Thread
       // A peerId will not have "-" in its name, see HBASE-11394
       peerId = peerClusterZnode.split("-")[0];
     }
-    Map<TableName, List<String>> tableCFMap = 
replicationPeers.getPeer(peerId).getTableCFs();
+    Map<TableName, List<String>> tableCFMap = 
replicationPeers.getConnectedPeer(peerId).getTableCFs();
     if (tableCFMap != null) {
       List<String> tableCfs = tableCFMap.get(tableName);
       if (tableCFMap.containsKey(tableName)

http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 433f9c5..7532c64 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -230,7 +230,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * old region server wal queues
    */
   protected void init() throws IOException, ReplicationException {
-    for (String id : this.replicationPeers.getPeerIds()) {
+    for (String id : this.replicationPeers.getConnectedPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
         // Check if peer exists in hfile-refs queue, if not add it. This can 
happen in the case
@@ -264,7 +264,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   protected ReplicationSourceInterface addSource(String id) throws IOException,
       ReplicationException {
     ReplicationPeerConfig peerConfig = 
replicationPeers.getReplicationPeerConfig(id);
-    ReplicationPeer peer = replicationPeers.getPeer(id);
+    ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
@@ -306,7 +306,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void deleteSource(String peerId, boolean closeConnection) {
     this.replicationQueues.removeQueue(peerId);
     if (closeConnection) {
-      this.replicationPeers.peerRemoved(peerId);
+      this.replicationPeers.peerDisconnected(peerId);
     }
   }
 
@@ -381,7 +381,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     // update replication queues on ZK
     synchronized (replicationPeers) {// synchronize on replicationPeers to 
avoid adding source for
                                      // the to-be-removed peer
-      for (String id : replicationPeers.getPeerIds()) {
+      for (String id : replicationPeers.getConnectedPeerIds()) {
         try {
           this.replicationQueues.addLog(id, logName);
         } catch (ReplicationException e) {
@@ -586,7 +586,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void peerListChanged(List<String> peerIds) {
     for (String id : peerIds) {
       try {
-        boolean added = this.replicationPeers.peerAdded(id);
+        boolean added = this.replicationPeers.peerConnected(id);
         if (added) {
           addSource(id);
           if (replicationForBulkLoadDataEnabled) {
@@ -659,7 +659,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           // there is not an actual peer defined corresponding to peerId for 
the failover.
           ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(peerId);
           String actualPeerId = replicationQueueInfo.getPeerId();
-          ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+          ReplicationPeer peer = 
replicationPeers.getConnectedPeer(actualPeerId);
           ReplicationPeerConfig peerConfig = null;
           try {
             peerConfig = 
replicationPeers.getReplicationPeerConfig(actualPeerId);
@@ -688,7 +688,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
           ReplicationSourceInterface src =
               getReplicationSource(conf, fs, ReplicationSourceManager.this, 
this.rq, this.rp,
                 server, peerId, this.clusterId, peerConfig, peer);
-          if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
+          if 
(!this.rp.getConnectedPeerIds().contains((src.getPeerClusterId()))) {
             src.terminate("Recovered queue doesn't belong to any current 
peer");
             break;
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index e5f1e69..fc3e516 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -110,7 +110,7 @@ public class TestReplicationHFileCleaner {
   @Before
   public void setup() throws ReplicationException, IOException {
     root = TEST_UTIL.getDataTestDirOnTestFS();
-    rp.addPeer(peerId, new 
ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
+    rp.registerPeer(peerId, new 
ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
     rq.addPeerToHFileRefs(peerId);
   }
 
@@ -121,7 +121,7 @@ public class TestReplicationHFileCleaner {
     } catch (IOException e) {
       LOG.warn("Failed to delete files recursively from path " + root);
     }
-    rp.removePeer(peerId);
+    rp.unregisterPeer(peerId);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index b4451f2..933c621 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -165,7 +165,7 @@ public abstract class TestReplicationStateBasic {
     rp.init();
 
     try {
-      rp.addPeer(ID_ONE,
+      rp.registerPeer(ID_ONE,
         new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
       fail("Should throw an IllegalArgumentException because "
             + "zookeeper.znode.parent is missing leading '/'.");
@@ -174,7 +174,7 @@ public abstract class TestReplicationStateBasic {
     }
 
     try {
-      rp.addPeer(ID_ONE,
+      rp.registerPeer(ID_ONE,
         new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
       fail("Should throw an IllegalArgumentException because 
zookeeper.znode.parent is missing.");
     } catch (IllegalArgumentException e) {
@@ -182,7 +182,7 @@ public abstract class TestReplicationStateBasic {
     }
 
     try {
-      rp.addPeer(ID_ONE,
+      rp.registerPeer(ID_ONE,
         new 
ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
       fail("Should throw an IllegalArgumentException because "
           + "hbase.zookeeper.property.clientPort is missing.");
@@ -203,7 +203,7 @@ public abstract class TestReplicationStateBasic {
     files1.add("file_3");
     assertNull(rqc.getReplicableHFiles(ID_ONE));
     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.registerPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE));
     rq1.addPeerToHFileRefs(ID_ONE);
     rq1.addHFileRefs(ID_ONE, files1);
     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
@@ -216,7 +216,7 @@ public abstract class TestReplicationStateBasic {
     files2.add(removedString);
     rq1.removeHFileRefs(ID_ONE, files2);
     assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
-    rp.removePeer(ID_ONE);
+    rp.unregisterPeer(ID_ONE);
   }
 
   @Test
@@ -225,9 +225,9 @@ public abstract class TestReplicationStateBasic {
     rqc.init();
 
     rp.init();
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.registerPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE));
     rq1.addPeerToHFileRefs(ID_ONE);
-    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
+    rp.registerPeer(ID_TWO, new 
ReplicationPeerConfig().setClusterKey(KEY_TWO));
     rq1.addPeerToHFileRefs(ID_TWO);
 
     List<String> files1 = new ArrayList<String>(3);
@@ -240,13 +240,13 @@ public abstract class TestReplicationStateBasic {
     assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
     assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
 
-    rp.removePeer(ID_ONE);
+    rp.unregisterPeer(ID_ONE);
     rq1.removePeerFromHFileRefs(ID_ONE);
     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
     assertNull(rqc.getReplicableHFiles(ID_ONE));
     assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
 
-    rp.removePeer(ID_TWO);
+    rp.unregisterPeer(ID_TWO);
     rq1.removePeerFromHFileRefs(ID_TWO);
     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
     assertNull(rqc.getReplicableHFiles(ID_TWO));
@@ -258,7 +258,7 @@ public abstract class TestReplicationStateBasic {
 
     // Test methods with non-existent peer ids
     try {
-      rp.removePeer("bogus");
+      rp.unregisterPeer("bogus");
       fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
     } catch (IllegalArgumentException e) {
     }
@@ -277,16 +277,16 @@ public abstract class TestReplicationStateBasic {
       fail("Should have thrown an IllegalArgumentException when passed a bogus 
peerId");
     } catch (IllegalArgumentException e) {
     }
-    assertFalse(rp.peerAdded("bogus"));
-    rp.peerRemoved("bogus");
+    assertFalse(rp.peerConnected("bogus"));
+    rp.peerDisconnected("bogus");
 
     assertNull(rp.getPeerConf("bogus"));
     assertNumberOfPeers(0);
 
     // Add some peers
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.registerPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE));
     assertNumberOfPeers(1);
-    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
+    rp.registerPeer(ID_TWO, new 
ReplicationPeerConfig().setClusterKey(KEY_TWO));
     assertNumberOfPeers(2);
 
     // Test methods with a peer that is added but not connected
@@ -296,13 +296,13 @@ public abstract class TestReplicationStateBasic {
     } catch (IllegalArgumentException e) {
     }
     assertEquals(KEY_ONE, 
ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
-    rp.removePeer(ID_ONE);
-    rp.peerRemoved(ID_ONE);
+    rp.unregisterPeer(ID_ONE);
+    rp.peerDisconnected(ID_ONE);
     assertNumberOfPeers(1);
 
     // Add one peer
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
-    rp.peerAdded(ID_ONE);
+    rp.registerPeer(ID_ONE, new 
ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.peerConnected(ID_ONE);
     assertNumberOfPeers(2);
     assertTrue(rp.getStatusOfPeer(ID_ONE));
     rp.disablePeer(ID_ONE);
@@ -311,7 +311,7 @@ public abstract class TestReplicationStateBasic {
     assertConnectedPeerStatus(true, ID_ONE);
 
     // Disconnect peer
-    rp.peerRemoved(ID_ONE);
+    rp.peerDisconnected(ID_ONE);
     assertNumberOfPeers(2);
     try {
       rp.getStatusOfPeer(ID_ONE);
@@ -361,7 +361,7 @@ public abstract class TestReplicationStateBasic {
         rq3.addLog("qId" + i, "filename" + j);
       }
       //Add peers for the corresponding queues so they are not orphans
-      rp.addPeer("qId" + i, new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
+      rp.registerPeer("qId" + i, new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
index 346ff37..3a9a5a5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
@@ -204,9 +204,9 @@ public class TestReplicationStateHBaseImpl {
   @Test
   public void TestMultipleReplicationQueuesHBaseImpl () {
     try {
-      rp.addPeer("Queue1", new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
-      rp.addPeer("Queue2", new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
-      rp.addPeer("Queue3", new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
+      rp.registerPeer("Queue1", new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
+      rp.registerPeer("Queue2", new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
+      rp.registerPeer("Queue3", new 
ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
     } catch (ReplicationException e) {
       fail("Failed to add peers to ReplicationPeers");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61ff6ced/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index 671b7fd..3b19660 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -147,9 +147,9 @@ public class TestReplicationTrackerZKImpl {
 
   @Test(timeout = 30000)
   public void testPeerRemovedEvent() throws Exception {
-    rp.addPeer("5", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+    rp.registerPeer("5", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     rt.registerListener(new DummyReplicationListener());
-    rp.removePeer("5");
+    rp.unregisterPeer("5");
     // wait for event
     while (peerRemovedCount.get() < 1) {
       Thread.sleep(5);
@@ -160,7 +160,7 @@ public class TestReplicationTrackerZKImpl {
   @Test(timeout = 30000)
   public void testPeerListChangedEvent() throws Exception {
     // add a peer
-    rp.addPeer("5", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+    rp.registerPeer("5", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5",
 true);
     rt.registerListener(new DummyReplicationListener());
     rp.disablePeer("5");
@@ -177,23 +177,23 @@ public class TestReplicationTrackerZKImpl {
 
     // clean up
     //ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5");
-    rp.removePeer("5");
+    rp.unregisterPeer("5");
   }
 
   @Test(timeout = 30000)
   public void testPeerNameControl() throws Exception {
     int exists = 0;
     int hyphen = 0;
-    rp.addPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+    rp.registerPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     
     try{
-      rp.addPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+      rp.registerPeer("6", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     }catch(IllegalArgumentException e){
       exists++;
     }
 
     try{
-      rp.addPeer("6-ec2", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+      rp.registerPeer("6-ec2", new 
ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     }catch(IllegalArgumentException e){
       hyphen++;
     }
@@ -201,7 +201,7 @@ public class TestReplicationTrackerZKImpl {
     assertEquals(1, hyphen);
     
     // clean up
-    rp.removePeer("6");
+    rp.unregisterPeer("6");
   }
   
   private class DummyReplicationListener implements ReplicationListener {
@@ -217,7 +217,7 @@ public class TestReplicationTrackerZKImpl {
     public void peerRemoved(String peerId) {
       peerRemovedData = peerId;
       peerRemovedCount.getAndIncrement();
-      LOG.debug("Received peerRemoved event: " + peerId);
+      LOG.debug("Received peerDisconnected event: " + peerId);
     }
 
     @Override

Reply via email to