http://git-wip-us.apache.org/repos/asf/hbase/blob/99bd2a9c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
--
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b8d80d2..5b7bab8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -33,17 +33,20 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -56,7 +59,7 @@ import
org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -91,7 +94,7 @@ public class ReplicationSourceManager implements
ReplicationListener {
private final List sources;
// List of all the sources we got from died RSs
private final List oldsources;
- private final ReplicationQueues replicationQueues;
+ private final ReplicationQueueStorage queueStorage;
private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
@@ -124,7 +127,7 @@ public class ReplicationSourceManager implements
ReplicationListener {
/**
* Creates a replication manager and sets the watch on all the other
registered region servers
- * @param replicationQueues the interface for manipulating replication queues
+ * @param queueStorage the interface for manipulating replication queues
* @param replicationPeers
* @param replicationTracker
* @param conf the configuration to use
@@ -134,14 +137,14 @@ public class ReplicationSourceManager implements
ReplicationListener {
* @param oldLogDir the directory where old logs are archived
* @param clusterId
*/
- public ReplicationSourceManager(ReplicationQueues replicationQueues,
+ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker
replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID
clusterId,
WALFileLengthProvider walFileLengthProvider) throws IOException {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<>();
-this.replicationQueues = replicationQueues;
+this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
@@ -174,6 +177,19 @@ public class ReplicationSourceManager implements
ReplicationListener {
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
+ @FunctionalInterface
+ private interface ReplicationQueueOperation {
+void exec() throws ReplicationException;
+ }
+
+ private void abortWhenFail(ReplicationQueueOperation op) {
+try {
+ op.exec();
+} catch (ReplicationException e) {
+ server.abort("Failed to operate on replication queue", e);
+}
+ }
+
/**
* Provide the id of the peer and a log key and this method will figure which
* wal it belongs to and will log, for this region server, the current
@@ -185,12 +201,13 @@ public class ReplicationSourceManager implements
ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region
server
* @param holdLogInZK if true then the log is retained in