[36/50] [abbrv] hbase git commit: HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface

2018-02-21 Thread zhangduo
HBASE-19622 Reimplement ReplicationPeers with the new replication storage 
interface


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/05aa1aac
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/05aa1aac
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/05aa1aac

Branch: refs/heads/HBASE-19397-branch-2
Commit: 05aa1aac55fb9fc10370c5aeda5c0754eafda900
Parents: bba95c5
Author: huzheng 
Authored: Tue Dec 26 16:46:10 2017 +0800
Committer: zhangduo 
Committed: Thu Feb 22 15:23:34 2018 +0800

--
 .../replication/ReplicationPeerConfigUtil.java  |  10 +-
 .../replication/VerifyReplication.java  |   9 +-
 .../hbase/replication/ReplicationFactory.java   |  10 +-
 .../hbase/replication/ReplicationPeerImpl.java  |  60 +-
 .../replication/ReplicationPeerStorage.java |   3 +-
 .../hbase/replication/ReplicationPeers.java | 238 
 .../replication/ReplicationPeersZKImpl.java | 552 ---
 .../replication/ZKReplicationPeerStorage.java   |  12 +-
 .../replication/ZKReplicationStorageBase.java   |   3 +-
 .../replication/TestReplicationStateBasic.java  | 125 ++---
 .../replication/TestReplicationStateZKImpl.java |   2 +-
 .../TestZKReplicationPeerStorage.java   |  12 +-
 .../cleaner/ReplicationZKNodeCleaner.java   |  57 +-
 .../replication/ReplicationPeerManager.java |   6 +-
 .../regionserver/DumpReplicationQueues.java |   2 +-
 .../regionserver/PeerProcedureHandlerImpl.java  |  49 +-
 .../replication/regionserver/Replication.java   |   2 +-
 .../regionserver/ReplicationSource.java |   7 +-
 .../regionserver/ReplicationSourceManager.java  |  44 +-
 .../cleaner/TestReplicationHFileCleaner.java|   7 +-
 .../replication/TestMultiSlaveReplication.java  |   2 -
 .../TestReplicationTrackerZKImpl.java   |  26 +-
 .../TestReplicationSourceManager.java   |  17 +-
 .../hadoop/hbase/HBaseZKTestingUtility.java |   3 +-
 24 files changed, 307 insertions(+), 951 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/05aa1aac/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
--
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index 022bf64..a234a9b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -247,22 +247,22 @@ public final class ReplicationPeerConfigUtil {
   public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
   throws DeserializationException {
 if (ProtobufUtil.isPBMagicPrefix(bytes)) {
-  int pblen = ProtobufUtil.lengthOfPBMagic();
+  int pbLen = ProtobufUtil.lengthOfPBMagic();
   ReplicationProtos.ReplicationPeer.Builder builder =
   ReplicationProtos.ReplicationPeer.newBuilder();
   ReplicationProtos.ReplicationPeer peer;
   try {
-ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
 peer = builder.build();
   } catch (IOException e) {
 throw new DeserializationException(e);
   }
   return convert(peer);
 } else {
-  if (bytes.length > 0) {
-return 
ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
+  if (bytes == null || bytes.length <= 0) {
+throw new DeserializationException("Bytes to deserialize should not be 
empty.");
   }
-  return ReplicationPeerConfig.newBuilder().setClusterKey("").build();
+  return 
ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
 }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/05aa1aac/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
--
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 09d4b4b..f0070f0 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -339,15 +339,10 @@ public class VerifyReplication extends Configured 
implements 

[36/50] [abbrv] hbase git commit: HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface

2018-02-03 Thread zhangduo
http://git-wip-us.apache.org/repos/asf/hbase/blob/4175a603/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index d5e25cb..a42fcc0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -174,7 +174,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 this.clusterId = clusterId;
 this.walFileLengthProvider = walFileLengthProvider;
 this.replicationTracker.registerListener(this);
-this.replicationPeers.getAllPeerIds();
 // It's preferable to failover 1 RS at a time, but with good zk servers
 // more could be processed at the same time.
 int nbWorkers = conf.getInt("replication.executor.workers", 1);
@@ -278,8 +277,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 }
 List otherRegionServers = 
replicationTracker.getListOfRegionServers().stream()
 .map(ServerName::valueOf).collect(Collectors.toList());
-LOG.info(
-  "Current list of replicators: " + currentReplicators + " other RSs: " + 
otherRegionServers);
+LOG.info("Current list of replicators: " + currentReplicators + " other 
RSs: "
++ otherRegionServers);
 
 // Look if there's anything to process after a restart
 for (ServerName rs : currentReplicators) {
@@ -296,7 +295,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
* The returned future is for adoptAbandonedQueues task.
*/
   Future init() throws IOException, ReplicationException {
-for (String id : this.replicationPeers.getConnectedPeerIds()) {
+for (String id : this.replicationPeers.getAllPeerIds()) {
   addSource(id);
   if (replicationForBulkLoadDataEnabled) {
 // Check if peer exists in hfile-refs queue, if not add it. This can 
happen in the case
@@ -315,8 +314,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
*/
   @VisibleForTesting
   ReplicationSourceInterface addSource(String id) throws IOException, 
ReplicationException {
-ReplicationPeerConfig peerConfig = 
replicationPeers.getReplicationPeerConfig(id);
-ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
+ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
+ReplicationPeer peer = replicationPeers.getPeer(id);
 ReplicationSourceInterface src = getReplicationSource(id, peerConfig, 
peer);
 synchronized (this.walsById) {
   this.sources.add(src);
@@ -362,7 +361,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void deleteSource(String peerId, boolean closeConnection) {
 abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), 
peerId));
 if (closeConnection) {
-  this.replicationPeers.peerDisconnected(peerId);
+  this.replicationPeers.removePeer(peerId);
 }
   }
 
@@ -455,12 +454,12 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 // update replication queues on ZK
 // synchronize on replicationPeers to avoid adding source for the 
to-be-removed peer
 synchronized (replicationPeers) {
-  for (String id : replicationPeers.getConnectedPeerIds()) {
+  for (String id : replicationPeers.getAllPeerIds()) {
 try {
   this.queueStorage.addWAL(server.getServerName(), id, logName);
 } catch (ReplicationException e) {
-  throw new IOException("Cannot add log to replication queue" +
-" when creating a new source, queueId=" + id + ", filename=" + 
logName, e);
+  throw new IOException("Cannot add log to replication queue"
+  + " when creating a new source, queueId=" + id + ", filename=" + 
logName, e);
 }
   }
 }
@@ -642,7 +641,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
   public void addPeer(String id) throws ReplicationException, IOException {
 LOG.info("Trying to add peer, peerId: " + id);
-boolean added = this.replicationPeers.peerConnected(id);
+boolean added = this.replicationPeers.addPeer(id);
 if (added) {
   LOG.info("Peer " + id + " connected success, trying to start the 
replication source thread.");
   addSource(id);
@@ -778,19 +777,25 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   // there is not an actual peer defined corresponding to peerId for 
the failover.
   ReplicationQueueInfo replicationQueueInfo = new 
ReplicationQueueInfo(peerId);