[36/50] [abbrv] hbase git commit: HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/05aa1aac Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/05aa1aac Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/05aa1aac Branch: refs/heads/HBASE-19397-branch-2 Commit: 05aa1aac55fb9fc10370c5aeda5c0754eafda900 Parents: bba95c5 Author: huzhengAuthored: Tue Dec 26 16:46:10 2017 +0800 Committer: zhangduo Committed: Thu Feb 22 15:23:34 2018 +0800 -- .../replication/ReplicationPeerConfigUtil.java | 10 +- .../replication/VerifyReplication.java | 9 +- .../hbase/replication/ReplicationFactory.java | 10 +- .../hbase/replication/ReplicationPeerImpl.java | 60 +- .../replication/ReplicationPeerStorage.java | 3 +- .../hbase/replication/ReplicationPeers.java | 238 .../replication/ReplicationPeersZKImpl.java | 552 --- .../replication/ZKReplicationPeerStorage.java | 12 +- .../replication/ZKReplicationStorageBase.java | 3 +- .../replication/TestReplicationStateBasic.java | 125 ++--- .../replication/TestReplicationStateZKImpl.java | 2 +- .../TestZKReplicationPeerStorage.java | 12 +- .../cleaner/ReplicationZKNodeCleaner.java | 57 +- .../replication/ReplicationPeerManager.java | 6 +- .../regionserver/DumpReplicationQueues.java | 2 +- .../regionserver/PeerProcedureHandlerImpl.java | 49 +- .../replication/regionserver/Replication.java | 2 +- .../regionserver/ReplicationSource.java | 7 +- .../regionserver/ReplicationSourceManager.java | 44 +- .../cleaner/TestReplicationHFileCleaner.java| 7 +- .../replication/TestMultiSlaveReplication.java | 2 - .../TestReplicationTrackerZKImpl.java | 26 +- .../TestReplicationSourceManager.java | 17 +- .../hadoop/hbase/HBaseZKTestingUtility.java | 3 +- 24 files changed, 307 insertions(+), 951 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/05aa1aac/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java -- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 022bf64..a234a9b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -247,22 +247,22 @@ public final class ReplicationPeerConfigUtil { public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) throws DeserializationException { if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); + int pbLen = ProtobufUtil.lengthOfPBMagic(); ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder(); ReplicationProtos.ReplicationPeer peer; try { -ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); +ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); peer = builder.build(); } catch (IOException e) { throw new DeserializationException(e); } return convert(peer); } else { - if (bytes.length > 0) { -return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); + if (bytes == null || bytes.length <= 0) { +throw new DeserializationException("Bytes to deserialize should not be empty."); } - return ReplicationPeerConfig.newBuilder().setClusterKey("").build(); + return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/05aa1aac/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java -- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 09d4b4b..f0070f0 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -339,15 +339,10 @@ public class VerifyReplication extends Configured implements
[36/50] [abbrv] hbase git commit: HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
http://git-wip-us.apache.org/repos/asf/hbase/blob/4175a603/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index d5e25cb..a42fcc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -174,7 +174,6 @@ public class ReplicationSourceManager implements ReplicationListener { this.clusterId = clusterId; this.walFileLengthProvider = walFileLengthProvider; this.replicationTracker.registerListener(this); -this.replicationPeers.getAllPeerIds(); // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); @@ -278,8 +277,8 @@ public class ReplicationSourceManager implements ReplicationListener { } List otherRegionServers = replicationTracker.getListOfRegionServers().stream() .map(ServerName::valueOf).collect(Collectors.toList()); -LOG.info( - "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); +LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " ++ otherRegionServers); // Look if there's anything to process after a restart for (ServerName rs : currentReplicators) { @@ -296,7 +295,7 @@ public class ReplicationSourceManager implements ReplicationListener { * The returned future is for adoptAbandonedQueues task. */ Future init() throws IOException, ReplicationException { -for (String id : this.replicationPeers.getConnectedPeerIds()) { +for (String id : this.replicationPeers.getAllPeerIds()) { addSource(id); if (replicationForBulkLoadDataEnabled) { // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case @@ -315,8 +314,8 @@ public class ReplicationSourceManager implements ReplicationListener { */ @VisibleForTesting ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { -ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); -ReplicationPeer peer = replicationPeers.getConnectedPeer(id); +ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id); +ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); @@ -362,7 +361,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void deleteSource(String peerId, boolean closeConnection) { abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId)); if (closeConnection) { - this.replicationPeers.peerDisconnected(peerId); + this.replicationPeers.removePeer(peerId); } } @@ -455,12 +454,12 @@ public class ReplicationSourceManager implements ReplicationListener { // update replication queues on ZK // synchronize on replicationPeers to avoid adding source for the to-be-removed peer synchronized (replicationPeers) { - for (String id : replicationPeers.getConnectedPeerIds()) { + for (String id : replicationPeers.getAllPeerIds()) { try { this.queueStorage.addWAL(server.getServerName(), id, logName); } catch (ReplicationException e) { - throw new IOException("Cannot add log to replication queue" + -" when creating a new source, queueId=" + id + ", filename=" + logName, e); + throw new IOException("Cannot add log to replication queue" + + " when creating a new source, queueId=" + id + ", filename=" + logName, e); } } } @@ -642,7 +641,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void addPeer(String id) throws ReplicationException, IOException { LOG.info("Trying to add peer, peerId: " + id); -boolean added = this.replicationPeers.peerConnected(id); +boolean added = this.replicationPeers.addPeer(id); if (added) { LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); addSource(id); @@ -778,19 +777,25 @@ public class ReplicationSourceManager implements ReplicationListener { // there is not an actual peer defined corresponding to peerId for the failover. ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);