hbase git commit: HBASE-18137 Replication gets stuck for empty WALs
Repository: hbase Updated Branches: refs/heads/branch-1.2 96e48c3df -> c1289960d HBASE-18137 Replication gets stuck for empty WALs Signed-off-by: Andrew PurtellConflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c1289960 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c1289960 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c1289960 Branch: refs/heads/branch-1.2 Commit: c1289960dd7ebf94c615d00ef9c77c27737496e1 Parents: 96e48c3 Author: Vincent Authored: Wed Jun 7 14:48:45 2017 -0700 Committer: Andrew Purtell Committed: Sat Jun 10 12:47:18 2017 -0700 -- .../regionserver/ReplicationSource.java | 16 ++-- .../hbase/replication/TestReplicationBase.java | 1 + .../replication/TestReplicationSmallTests.java | 83 3 files changed, 94 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/c1289960/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2285a5e..a59c3c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -512,9 +512,9 @@ public class ReplicationSource extends Thread terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); } } + int sleepMultiplier = 1; // Loop until we close down while (isWorkerActive()) { -int sleepMultiplier = 1; // Sleep until replication is enabled again if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -591,7 +591,7 @@ public class ReplicationSource extends Thread if (considerDumping && sleepMultiplier == maxRetriesMultiplier && -processEndOfFile()) { +processEndOfFile(false)) { continue; } } @@ -717,7 +717,7 @@ public class ReplicationSource extends Thread } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure - return seenEntries == 0 && processEndOfFile(); + return seenEntries == 0 && processEndOfFile(false); } /** @@ -846,11 +846,12 @@ public class ReplicationSource extends Thread // which throws a NPE if we open a file before any data node has the most recent block // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. LOG.warn("Got NPE opening reader, will retry."); -} else if (sleepMultiplier >= maxRetriesMultiplier) { +} else if (sleepMultiplier >= maxRetriesMultiplier +&& conf.getBoolean("replication.source.eof.autorecovery", false)) { // TODO Need a better way to determine if a file is really gone but // TODO without scanning all logs dir LOG.warn("Waited too long for this file, considering dumping"); - return !processEndOfFile(); + return !processEndOfFile(true); } } return true; @@ -990,7 +991,7 @@ public class ReplicationSource extends Thread */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", justification = "Yeah, this is how it works") -protected boolean processEndOfFile() { +protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) { // We presume this means the file we're reading is closed. if (this.queue.size() != 0) { // -1 means the wal wasn't closed cleanly. @@ -1025,6 +1026,9 @@ public class ReplicationSource extends Thread LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats() + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); } +if (dumpOnlyIfZeroLength && stat.getLen() != 0) { + return false; +} this.currentPath = null; this.repLogReader.finishCurrentFile(); this.reader = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/c1289960/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
[3/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs
HBASE-18137 Replication gets stuck for empty WALs Signed-off-by: Andrew PurtellProject: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6782dfca Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6782dfca Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6782dfca Branch: refs/heads/branch-1.3 Commit: 6782dfca4f3a2f5e02cc60a7c04d8d5d95ebc36e Parents: 6a216c7 Author: Vincent Authored: Wed Jun 7 14:48:45 2017 -0700 Committer: Andrew Purtell Committed: Sat Jun 10 12:26:12 2017 -0700 -- .../regionserver/ReplicationSource.java | 16 ++-- .../hbase/replication/TestReplicationBase.java | 1 + .../replication/TestReplicationSmallTests.java | 83 3 files changed, 94 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 65f581a..2285292 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -542,9 +542,9 @@ public class ReplicationSource extends Thread terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); } } + int sleepMultiplier = 1; // Loop until we close down while (isWorkerActive()) { -int sleepMultiplier = 1; // Sleep until replication is enabled again if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -622,7 +622,7 @@ public class ReplicationSource extends Thread if (considerDumping && sleepMultiplier == maxRetriesMultiplier && -processEndOfFile()) { +processEndOfFile(false)) { continue; } } @@ -749,7 +749,7 @@ public class ReplicationSource extends Thread } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure - return seenEntries == 0 && processEndOfFile(); + return seenEntries == 0 && processEndOfFile(false); } /** @@ -930,11 +930,12 @@ public class ReplicationSource extends Thread // which throws a NPE if we open a file before any data node has the most recent block // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. LOG.warn("Got NPE opening reader, will retry."); -} else if (sleepMultiplier >= maxRetriesMultiplier) { +} else if (sleepMultiplier >= maxRetriesMultiplier +&& conf.getBoolean("replication.source.eof.autorecovery", false)) { // TODO Need a better way to determine if a file is really gone but // TODO without scanning all logs dir LOG.warn("Waited too long for this file, considering dumping"); - return !processEndOfFile(); + return !processEndOfFile(true); } } return true; @@ -1100,7 +1101,7 @@ public class ReplicationSource extends Thread */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", justification = "Yeah, this is how it works") -protected boolean processEndOfFile() { +protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) { // We presume this means the file we're reading is closed. if (this.queue.size() != 0) { // -1 means the wal wasn't closed cleanly. @@ -1135,6 +1136,9 @@ public class ReplicationSource extends Thread LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats() + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); } +if (dumpOnlyIfZeroLength && stat.getLen() != 0) { + return false; +} this.currentPath = null; this.repLogReader.finishCurrentFile(); this.reader = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java -- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
[4/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs
HBASE-18137 Replication gets stuck for empty WALs Signed-off-by: Andrew PurtellProject: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/385b7924 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/385b7924 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/385b7924 Branch: refs/heads/branch-2 Commit: 385b792446ea1b0c58b7365904d677ba48eec930 Parents: eca1ec3 Author: Vincent Authored: Fri Jun 9 18:47:14 2017 -0700 Committer: Andrew Purtell Committed: Sat Jun 10 12:45:40 2017 -0700 -- .../ReplicationSourceShipperThread.java | 2 +- .../ReplicationSourceWALReaderThread.java | 30 .../hbase/replication/TestReplicationBase.java | 1 + .../replication/TestReplicationSmallTests.java | 80 4 files changed, 112 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java index d1a8ac2..6807da2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java @@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread { } public Path getCurrentPath() { -return this.currentPath; +return this.entryReader.getCurrentPath(); } public long getCurrentPosition() { http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index ad08866..c1af6e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends Thread { sleepMultiplier++; } else { LOG.error("Failed to read stream of replication entries", e); + handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends Thread { } } + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(Exception e) { +if (e.getCause() instanceof EOFException && logQueue.size() > 1 +&& conf.getBoolean("replication.source.eof.autorecovery", false)) { + try { +if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); + logQueue.remove(); + currentPosition = 0; +} + } catch (IOException ioe) { +LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + } +} + } + + public Path getCurrentPath() { +// if we've read some WAL entries, get the Path we read from +WALEntryBatch batchQueueHead = entryBatchQueue.peek(); +if (batchQueueHead != null) { + return batchQueueHead.lastWalPath; +} +// otherwise, we must be currently reading from the head of the log queue +return logQueue.peek(); + } + //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java -- diff
[1/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs
Repository: hbase Updated Branches: refs/heads/branch-1 6860ddca9 -> 650ef5cf5 refs/heads/branch-1.3 6a216c787 -> 6782dfca4 refs/heads/branch-2 eca1ec335 -> 385b79244 refs/heads/master ea64dbef7 -> 384e308e9 HBASE-18137 Replication gets stuck for empty WALs Signed-off-by: Andrew PurtellProject: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/384e308e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/384e308e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/384e308e Branch: refs/heads/master Commit: 384e308e9f2387422e76ceb1432d6b2b85a973cf Parents: ea64dbe Author: Vincent Authored: Fri Jun 9 18:47:14 2017 -0700 Committer: Andrew Purtell Committed: Sat Jun 10 10:30:40 2017 -0700 -- .../ReplicationSourceShipperThread.java | 2 +- .../ReplicationSourceWALReaderThread.java | 30 .../hbase/replication/TestReplicationBase.java | 1 + .../replication/TestReplicationSmallTests.java | 80 4 files changed, 112 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java index d1a8ac2..6807da2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java @@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread { } public Path getCurrentPath() { -return this.currentPath; +return this.entryReader.getCurrentPath(); } public long getCurrentPosition() { http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index ad08866..c1af6e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends Thread { sleepMultiplier++; } else { LOG.error("Failed to read stream of replication entries", e); + handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends Thread { } } + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(Exception e) { +if (e.getCause() instanceof EOFException && logQueue.size() > 1 +&& conf.getBoolean("replication.source.eof.autorecovery", false)) { + try { +if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); + logQueue.remove(); + currentPosition = 0; +} + } catch (IOException ioe) { +LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + } +} + } + + public Path getCurrentPath() { +// if we've read some WAL entries, get the Path we read from +WALEntryBatch batchQueueHead = entryBatchQueue.peek(); +if (batchQueueHead != null) { + return batchQueueHead.lastWalPath; +} +// otherwise, we must be currently reading from the head of the log queue +return logQueue.peek(); + } + //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota
[2/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs
HBASE-18137 Replication gets stuck for empty WALs Signed-off-by: Andrew PurtellProject: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/650ef5cf Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/650ef5cf Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/650ef5cf Branch: refs/heads/branch-1 Commit: 650ef5cf59ee7f6e4c219a9043b66a814da52f19 Parents: 6860ddc Author: Vincent Authored: Fri Jun 9 18:36:23 2017 -0700 Committer: Andrew Purtell Committed: Sat Jun 10 11:29:51 2017 -0700 -- .../regionserver/ReplicationSource.java | 2 +- .../ReplicationSourceWALReaderThread.java | 30 +++ .../hbase/replication/TestReplicationBase.java | 1 + .../replication/TestReplicationSmallTests.java | 82 4 files changed, 114 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 6954ea2..8378b9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -931,7 +931,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } public Path getCurrentPath() { - return this.currentPath; + return this.entryReader.getCurrentPath(); } public long getCurrentPosition() { http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java -- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 6f1c641..40828b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -188,6 +189,7 @@ public class ReplicationSourceWALReaderThread extends Thread { sleepMultiplier++; } else { LOG.error("Failed to read stream of replication entries", e); + handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -197,6 +199,34 @@ public class ReplicationSourceWALReaderThread extends Thread { } } + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(Exception e) { +if (e.getCause() instanceof EOFException && logQueue.size() > 1 +&& conf.getBoolean("replication.source.eof.autorecovery", false)) { + try { +if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); + logQueue.remove(); + currentPosition = 0; +} + } catch (IOException ioe) { +LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + } +} + } + + public Path getCurrentPath() { +// if we've read some WAL entries, get the Path we read from +WALEntryBatch batchQueueHead = entryBatchQueue.peek(); +if (batchQueueHead != null) { + return batchQueueHead.lastWalPath; +} +// otherwise, we must be currently reading from the head of the log queue +return logQueue.peek(); + } + //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java -- diff --git