[09/20] hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader

2018-04-09 Thread zhangduo
HBASE-20167 Optimize the implementation of ReplicationSourceWALReader


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2d0d6a3b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2d0d6a3b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2d0d6a3b

Branch: refs/heads/branch-2
Commit: 2d0d6a3ba1bdeac37e898d37d41eb6b079fc9a6d
Parents: cea5199
Author: zhangduo 
Authored: Mon Mar 12 12:21:44 2018 +0800
Committer: zhangduo 
Committed: Mon Apr 9 15:18:44 2018 +0800

--
 .../RecoveredReplicationSource.java |  67 +--
 .../RecoveredReplicationSourceShipper.java  |  48 ++--
 .../RecoveredReplicationSourceWALReader.java|  56 --
 .../regionserver/ReplicationSource.java |  36 +++---
 .../regionserver/ReplicationSourceShipper.java  |  27 +++--
 .../ReplicationSourceWALReader.java | 101 +
 .../SerialReplicationSourceWALReader.java   | 112 +++
 7 files changed, 218 insertions(+), 229 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index d9506c0..169b469 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   }
 
   @Override
-  protected void tryStartNewShipper(String walGroupId, 
PriorityBlockingQueue queue) {
-final RecoveredReplicationSourceShipper worker =
-new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
-this.queueStorage);
-ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, 
worker);
-if (extant != null) {
-  LOG.debug("Someone has beat us to start a worker thread for wal group " 
+ walGroupId);
-} else {
-  LOG.debug("Starting up worker for wal group " + walGroupId);
-  worker.startup(this::uncaughtException);
-  worker.setWALReader(
-startNewWALReader(worker.getName(), walGroupId, queue, 
worker.getStartPosition()));
-  workerThreads.put(walGroupId, worker);
-}
+  protected RecoveredReplicationSourceShipper createNewShipper(String 
walGroupId,
+  PriorityBlockingQueue queue) {
+return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, 
this, queueStorage);
+  }
+
+  private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
+  BlockingQueue entryBatchQueue, Path currentPath) throws 
InterruptedException {
+LOG.trace("Didn't read any new entries from WAL");
+// we're done with queue recovery, shut ourself down
+reader.setReaderRunning(false);
+// shuts down shipper thread immediately
+entryBatchQueue.put(new WALEntryBatch(0, currentPath));
   }
 
   @Override
-  protected ReplicationSourceWALReader startNewWALReader(String threadName, 
String walGroupId,
+  protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
   PriorityBlockingQueue queue, long startPosition) {
-ReplicationSourceWALReader walReader =
-  new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
-Threads.setDaemonThreadRunning(walReader,
-  threadName + ".replicationSource.replicationWALReaderThread." + 
walGroupId + "," + queueId,
-  this::uncaughtException);
-return walReader;
+if (replicationPeer.getPeerConfig().isSerial()) {
+  return new SerialReplicationSourceWALReader(fs, conf, queue, 
startPosition, walEntryFilter,
+this) {
+
+@Override
+protected void handleEmptyWALEntryBatch(Path currentPath) throws 
InterruptedException {
+  handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
+}
+  };
+} else {
+  return new ReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this) {
+
+ 

[11/22] hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader

2018-04-09 Thread zhangduo
HBASE-20167 Optimize the implementation of ReplicationSourceWALReader


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2d0d6a3b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2d0d6a3b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2d0d6a3b

Branch: refs/heads/HBASE-20046-branch-2
Commit: 2d0d6a3ba1bdeac37e898d37d41eb6b079fc9a6d
Parents: cea5199
Author: zhangduo 
Authored: Mon Mar 12 12:21:44 2018 +0800
Committer: zhangduo 
Committed: Mon Apr 9 15:18:44 2018 +0800

--
 .../RecoveredReplicationSource.java |  67 +--
 .../RecoveredReplicationSourceShipper.java  |  48 ++--
 .../RecoveredReplicationSourceWALReader.java|  56 --
 .../regionserver/ReplicationSource.java |  36 +++---
 .../regionserver/ReplicationSourceShipper.java  |  27 +++--
 .../ReplicationSourceWALReader.java | 101 +
 .../SerialReplicationSourceWALReader.java   | 112 +++
 7 files changed, 218 insertions(+), 229 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index d9506c0..169b469 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   }
 
   @Override
-  protected void tryStartNewShipper(String walGroupId, 
PriorityBlockingQueue queue) {
-final RecoveredReplicationSourceShipper worker =
-new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
-this.queueStorage);
-ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, 
worker);
-if (extant != null) {
-  LOG.debug("Someone has beat us to start a worker thread for wal group " 
+ walGroupId);
-} else {
-  LOG.debug("Starting up worker for wal group " + walGroupId);
-  worker.startup(this::uncaughtException);
-  worker.setWALReader(
-startNewWALReader(worker.getName(), walGroupId, queue, 
worker.getStartPosition()));
-  workerThreads.put(walGroupId, worker);
-}
+  protected RecoveredReplicationSourceShipper createNewShipper(String 
walGroupId,
+  PriorityBlockingQueue queue) {
+return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, 
this, queueStorage);
+  }
+
+  private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
+  BlockingQueue entryBatchQueue, Path currentPath) throws 
InterruptedException {
+LOG.trace("Didn't read any new entries from WAL");
+// we're done with queue recovery, shut ourself down
+reader.setReaderRunning(false);
+// shuts down shipper thread immediately
+entryBatchQueue.put(new WALEntryBatch(0, currentPath));
   }
 
   @Override
-  protected ReplicationSourceWALReader startNewWALReader(String threadName, 
String walGroupId,
+  protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
   PriorityBlockingQueue queue, long startPosition) {
-ReplicationSourceWALReader walReader =
-  new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
-Threads.setDaemonThreadRunning(walReader,
-  threadName + ".replicationSource.replicationWALReaderThread." + 
walGroupId + "," + queueId,
-  this::uncaughtException);
-return walReader;
+if (replicationPeer.getPeerConfig().isSerial()) {
+  return new SerialReplicationSourceWALReader(fs, conf, queue, 
startPosition, walEntryFilter,
+this) {
+
+@Override
+protected void handleEmptyWALEntryBatch(Path currentPath) throws 
InterruptedException {
+  handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
+}
+  };
+} else {
+  return new ReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, 

[07/20] hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader

2018-04-07 Thread zhangduo
HBASE-20167 Optimize the implementation of ReplicationSourceWALReader


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/91854f4c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/91854f4c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/91854f4c

Branch: refs/heads/HBASE-20046-branch-2
Commit: 91854f4cd37fe87a3bb85e9d18221708abf7089b
Parents: de685df
Author: zhangduo 
Authored: Mon Mar 12 12:21:44 2018 +0800
Committer: zhangduo 
Committed: Sun Apr 8 11:17:26 2018 +0800

--
 .../RecoveredReplicationSource.java |  67 +--
 .../RecoveredReplicationSourceShipper.java  |  48 ++--
 .../RecoveredReplicationSourceWALReader.java|  56 --
 .../regionserver/ReplicationSource.java |  36 +++---
 .../regionserver/ReplicationSourceShipper.java  |  27 +++--
 .../ReplicationSourceWALReader.java | 101 +
 .../SerialReplicationSourceWALReader.java   | 112 +++
 7 files changed, 218 insertions(+), 229 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/91854f4c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index d9506c0..169b469 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   }
 
   @Override
-  protected void tryStartNewShipper(String walGroupId, 
PriorityBlockingQueue queue) {
-final RecoveredReplicationSourceShipper worker =
-new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
-this.queueStorage);
-ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, 
worker);
-if (extant != null) {
-  LOG.debug("Someone has beat us to start a worker thread for wal group " 
+ walGroupId);
-} else {
-  LOG.debug("Starting up worker for wal group " + walGroupId);
-  worker.startup(this::uncaughtException);
-  worker.setWALReader(
-startNewWALReader(worker.getName(), walGroupId, queue, 
worker.getStartPosition()));
-  workerThreads.put(walGroupId, worker);
-}
+  protected RecoveredReplicationSourceShipper createNewShipper(String 
walGroupId,
+  PriorityBlockingQueue queue) {
+return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, 
this, queueStorage);
+  }
+
+  private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
+  BlockingQueue entryBatchQueue, Path currentPath) throws 
InterruptedException {
+LOG.trace("Didn't read any new entries from WAL");
+// we're done with queue recovery, shut ourself down
+reader.setReaderRunning(false);
+// shuts down shipper thread immediately
+entryBatchQueue.put(new WALEntryBatch(0, currentPath));
   }
 
   @Override
-  protected ReplicationSourceWALReader startNewWALReader(String threadName, 
String walGroupId,
+  protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
   PriorityBlockingQueue queue, long startPosition) {
-ReplicationSourceWALReader walReader =
-  new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
-Threads.setDaemonThreadRunning(walReader,
-  threadName + ".replicationSource.replicationWALReaderThread." + 
walGroupId + "," + queueId,
-  this::uncaughtException);
-return walReader;
+if (replicationPeer.getPeerConfig().isSerial()) {
+  return new SerialReplicationSourceWALReader(fs, conf, queue, 
startPosition, walEntryFilter,
+this) {
+
+@Override
+protected void handleEmptyWALEntryBatch(Path currentPath) throws 
InterruptedException {
+  handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
+}
+  };
+} else {
+  return new ReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, 

hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader

2018-03-12 Thread zhangduo
Repository: hbase
Updated Branches:
  refs/heads/master d5aaeee88 -> 6060d3ba5


HBASE-20167 Optimize the implementation of ReplicationSourceWALReader


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6060d3ba
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6060d3ba
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6060d3ba

Branch: refs/heads/master
Commit: 6060d3ba56706d87221f0934d0331a2a73e65167
Parents: d5aaeee
Author: zhangduo 
Authored: Mon Mar 12 12:21:44 2018 +0800
Committer: zhangduo 
Committed: Mon Mar 12 15:14:16 2018 +0800

--
 .../RecoveredReplicationSource.java |  67 +--
 .../RecoveredReplicationSourceShipper.java  |  48 ++--
 .../RecoveredReplicationSourceWALReader.java|  56 --
 .../regionserver/ReplicationSource.java |  36 +++---
 .../regionserver/ReplicationSourceShipper.java  |  27 +++--
 .../ReplicationSourceWALReader.java | 101 +
 .../SerialReplicationSourceWALReader.java   | 112 +++
 7 files changed, 218 insertions(+), 229 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/6060d3ba/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index d9506c0..169b469 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   }
 
   @Override
-  protected void tryStartNewShipper(String walGroupId, 
PriorityBlockingQueue queue) {
-final RecoveredReplicationSourceShipper worker =
-new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
-this.queueStorage);
-ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, 
worker);
-if (extant != null) {
-  LOG.debug("Someone has beat us to start a worker thread for wal group " 
+ walGroupId);
-} else {
-  LOG.debug("Starting up worker for wal group " + walGroupId);
-  worker.startup(this::uncaughtException);
-  worker.setWALReader(
-startNewWALReader(worker.getName(), walGroupId, queue, 
worker.getStartPosition()));
-  workerThreads.put(walGroupId, worker);
-}
+  protected RecoveredReplicationSourceShipper createNewShipper(String 
walGroupId,
+  PriorityBlockingQueue queue) {
+return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, 
this, queueStorage);
+  }
+
+  private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
+  BlockingQueue entryBatchQueue, Path currentPath) throws 
InterruptedException {
+LOG.trace("Didn't read any new entries from WAL");
+// we're done with queue recovery, shut ourself down
+reader.setReaderRunning(false);
+// shuts down shipper thread immediately
+entryBatchQueue.put(new WALEntryBatch(0, currentPath));
   }
 
   @Override
-  protected ReplicationSourceWALReader startNewWALReader(String threadName, 
String walGroupId,
+  protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
   PriorityBlockingQueue queue, long startPosition) {
-ReplicationSourceWALReader walReader =
-  new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
-Threads.setDaemonThreadRunning(walReader,
-  threadName + ".replicationSource.replicationWALReaderThread." + 
walGroupId + "," + queueId,
-  this::uncaughtException);
-return walReader;
+if (replicationPeer.getPeerConfig().isSerial()) {
+  return new SerialReplicationSourceWALReader(fs, conf, queue, 
startPosition, walEntryFilter,
+this) {
+
+@Override
+protected void handleEmptyWALEntryBatch(Path currentPath) throws 
InterruptedException {
+  handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
+}
+  };
+} else {
+  return new