[42/50] [abbrv] hbase git commit: HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly

2018-03-07 Thread zhangduo
http://git-wip-us.apache.org/repos/asf/hbase/blob/99bd2a9c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b8d80d2..5b7bab8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -33,17 +33,20 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -56,7 +59,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -91,7 +94,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   private final List sources;
   // List of all the sources we got from died RSs
   private final List oldsources;
-  private final ReplicationQueues replicationQueues;
+  private final ReplicationQueueStorage queueStorage;
   private final ReplicationTracker replicationTracker;
   private final ReplicationPeers replicationPeers;
   // UUID for this cluster
@@ -124,7 +127,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
   /**
* Creates a replication manager and sets the watch on all the other 
registered region servers
-   * @param replicationQueues the interface for manipulating replication queues
+   * @param queueStorage the interface for manipulating replication queues
* @param replicationPeers
* @param replicationTracker
* @param conf the configuration to use
@@ -134,14 +137,14 @@ public class ReplicationSourceManager implements 
ReplicationListener {
* @param oldLogDir the directory where old logs are archived
* @param clusterId
*/
-  public ReplicationSourceManager(ReplicationQueues replicationQueues,
+  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
   ReplicationPeers replicationPeers, ReplicationTracker 
replicationTracker, Configuration conf,
   Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID 
clusterId,
   WALFileLengthProvider walFileLengthProvider) throws IOException {
 //CopyOnWriteArrayList is thread-safe.
 //Generally, reading is more than modifying.
 this.sources = new CopyOnWriteArrayList<>();
-this.replicationQueues = replicationQueues;
+this.queueStorage = queueStorage;
 this.replicationPeers = replicationPeers;
 this.replicationTracker = replicationTracker;
 this.server = server;
@@ -174,6 +177,19 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
   }
 
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+void exec() throws ReplicationException;
+  }
+
+  private void abortWhenFail(ReplicationQueueOperation op) {
+try {
+  op.exec();
+} catch (ReplicationException e) {
+  server.abort("Failed to operate on replication queue", e);
+}
+  }
+
   /**
* Provide the id of the peer and a log key and this method will figure which
* wal it belongs to and will log, for this region server, the current
@@ -185,12 +201,13 @@ public class ReplicationSourceManager implements 
ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region 
server
* @param holdLogInZK if true then the log is retained in 

[42/50] [abbrv] hbase git commit: HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly

2018-02-10 Thread zhangduo
HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c4977446
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c4977446
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c4977446

Branch: refs/heads/HBASE-19397-branch-2
Commit: c497744647801a053cfcd4ae301310b8ee66a5e7
Parents: fde2b4e
Author: zhangduo 
Authored: Wed Dec 27 22:03:51 2017 +0800
Committer: zhangduo 
Committed: Sat Feb 10 20:20:32 2018 +0800

--
 .../hbase/replication/ReplicationFactory.java   |   9 +-
 .../hbase/replication/ReplicationQueues.java| 160 ---
 .../replication/ReplicationQueuesArguments.java |  70 ---
 .../replication/ReplicationQueuesZKImpl.java| 407 -
 .../hbase/replication/ReplicationTableBase.java | 442 ---
 .../replication/ReplicationTrackerZKImpl.java   |  21 +-
 .../replication/ZKReplicationQueueStorage.java  |  22 +
 .../replication/TestReplicationStateBasic.java  | 131 +++---
 .../replication/TestReplicationStateZKImpl.java |  41 +-
 .../regionserver/DumpReplicationQueues.java |  15 +-
 .../RecoveredReplicationSource.java |  17 +-
 .../RecoveredReplicationSourceShipper.java  |  22 +-
 .../replication/regionserver/Replication.java   |  20 +-
 .../regionserver/ReplicationSource.java |  16 +-
 .../ReplicationSourceInterface.java |  11 +-
 .../regionserver/ReplicationSourceManager.java  | 259 ++-
 .../regionserver/ReplicationSyncUp.java |  29 +-
 .../hbase/master/cleaner/TestLogsCleaner.java   |  12 +-
 .../cleaner/TestReplicationHFileCleaner.java|  23 +-
 .../cleaner/TestReplicationZKNodeCleaner.java   |  22 +-
 .../replication/ReplicationSourceDummy.java |   6 +-
 .../replication/TestReplicationSyncUpTool.java  |   4 +-
 .../TestReplicationSourceManager.java   |  97 ++--
 .../TestReplicationSourceManagerZkImpl.java |  57 +--
 24 files changed, 362 insertions(+), 1551 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/c4977446/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
--
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 6c1c213..5e70e57 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -17,12 +17,11 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A factory class for instantiating replication objects that deal with 
replication state.
@@ -30,12 +29,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 @InterfaceAudience.Private
 public class ReplicationFactory {
 
-  public static ReplicationQueues 
getReplicationQueues(ReplicationQueuesArguments args)
-  throws Exception {
-return (ReplicationQueues) 
ConstructorUtils.invokeConstructor(ReplicationQueuesZKImpl.class,
-  args);
-  }
-
   public static ReplicationPeers getReplicationPeers(ZKWatcher zk, 
Configuration conf,
   Abortable abortable) {
 return getReplicationPeers(zk, conf, null, abortable);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c4977446/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
--
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
deleted file mode 100644
index 7f440b1..000
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy