[09/20] hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader
HBASE-20167 Optimize the implementation of ReplicationSourceWALReader Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2d0d6a3b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2d0d6a3b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2d0d6a3b Branch: refs/heads/branch-2 Commit: 2d0d6a3ba1bdeac37e898d37d41eb6b079fc9a6d Parents: cea5199 Author: zhangduoAuthored: Mon Mar 12 12:21:44 2018 +0800 Committer: zhangduo Committed: Mon Apr 9 15:18:44 2018 +0800 -- .../RecoveredReplicationSource.java | 67 +-- .../RecoveredReplicationSourceShipper.java | 48 ++-- .../RecoveredReplicationSourceWALReader.java| 56 -- .../regionserver/ReplicationSource.java | 36 +++--- .../regionserver/ReplicationSourceShipper.java | 27 +++-- .../ReplicationSourceWALReader.java | 101 + .../SerialReplicationSourceWALReader.java | 112 +++ 7 files changed, 218 insertions(+), 229 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index d9506c0..169b469 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { -final RecoveredReplicationSourceShipper worker = -new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, -this.queueStorage); -ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); -if (extant != null) { - LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); -} else { - LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(this::uncaughtException); - worker.setWALReader( -startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); - workerThreads.put(walGroupId, worker); -} + protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, + PriorityBlockingQueue queue) { +return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); + } + + private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader, + BlockingQueue entryBatchQueue, Path currentPath) throws InterruptedException { +LOG.trace("Didn't read any new entries from WAL"); +// we're done with queue recovery, shut ourself down +reader.setReaderRunning(false); +// shuts down shipper thread immediately +entryBatchQueue.put(new WALEntryBatch(0, currentPath)); } @Override - protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + protected ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { -ReplicationSourceWALReader walReader = - new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); -Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, - this::uncaughtException); -return walReader; +if (replicationPeer.getPeerConfig().isSerial()) { + return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, +this) { + +@Override +protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); +} + }; +} else { + return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) { + +
[11/22] hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader
HBASE-20167 Optimize the implementation of ReplicationSourceWALReader Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2d0d6a3b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2d0d6a3b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2d0d6a3b Branch: refs/heads/HBASE-20046-branch-2 Commit: 2d0d6a3ba1bdeac37e898d37d41eb6b079fc9a6d Parents: cea5199 Author: zhangduoAuthored: Mon Mar 12 12:21:44 2018 +0800 Committer: zhangduo Committed: Mon Apr 9 15:18:44 2018 +0800 -- .../RecoveredReplicationSource.java | 67 +-- .../RecoveredReplicationSourceShipper.java | 48 ++-- .../RecoveredReplicationSourceWALReader.java| 56 -- .../regionserver/ReplicationSource.java | 36 +++--- .../regionserver/ReplicationSourceShipper.java | 27 +++-- .../ReplicationSourceWALReader.java | 101 + .../SerialReplicationSourceWALReader.java | 112 +++ 7 files changed, 218 insertions(+), 229 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index d9506c0..169b469 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { -final RecoveredReplicationSourceShipper worker = -new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, -this.queueStorage); -ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); -if (extant != null) { - LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); -} else { - LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(this::uncaughtException); - worker.setWALReader( -startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); - workerThreads.put(walGroupId, worker); -} + protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, + PriorityBlockingQueue queue) { +return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); + } + + private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader, + BlockingQueue entryBatchQueue, Path currentPath) throws InterruptedException { +LOG.trace("Didn't read any new entries from WAL"); +// we're done with queue recovery, shut ourself down +reader.setReaderRunning(false); +// shuts down shipper thread immediately +entryBatchQueue.put(new WALEntryBatch(0, currentPath)); } @Override - protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + protected ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { -ReplicationSourceWALReader walReader = - new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); -Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, - this::uncaughtException); -return walReader; +if (replicationPeer.getPeerConfig().isSerial()) { + return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, +this) { + +@Override +protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); +} + }; +} else { + return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
[07/20] hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader
HBASE-20167 Optimize the implementation of ReplicationSourceWALReader Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/91854f4c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/91854f4c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/91854f4c Branch: refs/heads/HBASE-20046-branch-2 Commit: 91854f4cd37fe87a3bb85e9d18221708abf7089b Parents: de685df Author: zhangduoAuthored: Mon Mar 12 12:21:44 2018 +0800 Committer: zhangduo Committed: Sun Apr 8 11:17:26 2018 +0800 -- .../RecoveredReplicationSource.java | 67 +-- .../RecoveredReplicationSourceShipper.java | 48 ++-- .../RecoveredReplicationSourceWALReader.java| 56 -- .../regionserver/ReplicationSource.java | 36 +++--- .../regionserver/ReplicationSourceShipper.java | 27 +++-- .../ReplicationSourceWALReader.java | 101 + .../SerialReplicationSourceWALReader.java | 112 +++ 7 files changed, 218 insertions(+), 229 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/91854f4c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index d9506c0..169b469 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { -final RecoveredReplicationSourceShipper worker = -new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, -this.queueStorage); -ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); -if (extant != null) { - LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); -} else { - LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(this::uncaughtException); - worker.setWALReader( -startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); - workerThreads.put(walGroupId, worker); -} + protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, + PriorityBlockingQueue queue) { +return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); + } + + private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader, + BlockingQueue entryBatchQueue, Path currentPath) throws InterruptedException { +LOG.trace("Didn't read any new entries from WAL"); +// we're done with queue recovery, shut ourself down +reader.setReaderRunning(false); +// shuts down shipper thread immediately +entryBatchQueue.put(new WALEntryBatch(0, currentPath)); } @Override - protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + protected ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { -ReplicationSourceWALReader walReader = - new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); -Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, - this::uncaughtException); -return walReader; +if (replicationPeer.getPeerConfig().isSerial()) { + return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, +this) { + +@Override +protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); +} + }; +} else { + return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader
Repository: hbase Updated Branches: refs/heads/master d5aaeee88 -> 6060d3ba5 HBASE-20167 Optimize the implementation of ReplicationSourceWALReader Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6060d3ba Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6060d3ba Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6060d3ba Branch: refs/heads/master Commit: 6060d3ba56706d87221f0934d0331a2a73e65167 Parents: d5aaeee Author: zhangduoAuthored: Mon Mar 12 12:21:44 2018 +0800 Committer: zhangduo Committed: Mon Mar 12 15:14:16 2018 +0800 -- .../RecoveredReplicationSource.java | 67 +-- .../RecoveredReplicationSourceShipper.java | 48 ++-- .../RecoveredReplicationSourceWALReader.java| 56 -- .../regionserver/ReplicationSource.java | 36 +++--- .../regionserver/ReplicationSourceShipper.java | 27 +++-- .../ReplicationSourceWALReader.java | 101 + .../SerialReplicationSourceWALReader.java | 112 +++ 7 files changed, 218 insertions(+), 229 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/6060d3ba/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index d9506c0..169b469 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends ReplicationSource { } @Override - protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { -final RecoveredReplicationSourceShipper worker = -new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, -this.queueStorage); -ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); -if (extant != null) { - LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); -} else { - LOG.debug("Starting up worker for wal group " + walGroupId); - worker.startup(this::uncaughtException); - worker.setWALReader( -startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); - workerThreads.put(walGroupId, worker); -} + protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, + PriorityBlockingQueue queue) { +return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); + } + + private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader, + BlockingQueue entryBatchQueue, Path currentPath) throws InterruptedException { +LOG.trace("Didn't read any new entries from WAL"); +// we're done with queue recovery, shut ourself down +reader.setReaderRunning(false); +// shuts down shipper thread immediately +entryBatchQueue.put(new WALEntryBatch(0, currentPath)); } @Override - protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, + protected ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { -ReplicationSourceWALReader walReader = - new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); -Threads.setDaemonThreadRunning(walReader, - threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, - this::uncaughtException); -return walReader; +if (replicationPeer.getPeerConfig().isSerial()) { + return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, +this) { + +@Override +protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath); +} + }; +} else { + return new