Repository: hbase Updated Branches: refs/heads/branch-1 6860ddca9 -> 650ef5cf5 refs/heads/branch-1.3 6a216c787 -> 6782dfca4 refs/heads/branch-2 eca1ec335 -> 385b79244 refs/heads/master ea64dbef7 -> 384e308e9
HBASE-18137 Replication gets stuck for empty WALs Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/384e308e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/384e308e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/384e308e Branch: refs/heads/master Commit: 384e308e9f2387422e76ceb1432d6b2b85a973cf Parents: ea64dbe Author: Vincent <vincentp...@gmail.com> Authored: Fri Jun 9 18:47:14 2017 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Sat Jun 10 10:30:40 2017 -0700 ---------------------------------------------------------------------- .../ReplicationSourceShipperThread.java | 2 +- .../ReplicationSourceWALReaderThread.java | 30 ++++++++ .../hbase/replication/TestReplicationBase.java | 1 + .../replication/TestReplicationSmallTests.java | 80 ++++++++++++++++++++ 4 files changed, 112 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java index d1a8ac2..6807da2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java @@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread { } public Path getCurrentPath() { - return this.currentPath; + return this.entryReader.getCurrentPath(); } public long getCurrentPosition() { http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index ad08866..c1af6e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends Thread { sleepMultiplier++; } else { LOG.error("Failed to read stream of replication entries", e); + handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends Thread { } } + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(Exception e) { + if (e.getCause() instanceof EOFException && logQueue.size() > 1 + && conf.getBoolean("replication.source.eof.autorecovery", false)) { + try { + if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); + logQueue.remove(); + currentPosition = 0; + } + } catch (IOException ioe) { + LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + } + } + } + + public Path getCurrentPath() { + // if we've read some WAL entries, get the Path we read from + WALEntryBatch batchQueueHead = entryBatchQueue.peek(); + if (batchQueueHead != null) { + return batchQueueHead.lastWalPath; + } + // otherwise, we must be currently reading from the head of the log queue + return logQueue.peek(); + } + //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 81fe629..9cf80d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -104,6 +104,7 @@ public class TestReplicationBase { conf1.setLong("replication.sleep.before.failover", 2000); conf1.setInt("replication.source.maxretriesmultiplier", 10); conf1.setFloat("replication.source.ratio", 1.0f); + conf1.setBoolean("replication.source.eof.autorecovery", true); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index f1b2015..cc3e43b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Job; @@ -977,4 +981,80 @@ public class TestReplicationSmallTests extends TestReplicationBase { assertEquals(NB_ROWS_IN_BATCH, job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); } + + @Test + public void testEmptyWALRecovery() throws Exception { + final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size(); + + // for each RS, create an empty wal with same walGroupId + final List<Path> emptyWalPaths = new ArrayList<>(); + long ts = System.currentTimeMillis(); + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts); + utility1.getTestFileSystem().create(emptyWalPath).close(); + emptyWalPaths.add(emptyWalPath); + } + + // inject our empty wal into the replication queue + for (int i = 0; i < numRs; i++) { + Replication replicationService = + (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); + replicationService.preLogRoll(null, emptyWalPaths.get(i)); + replicationService.postLogRoll(null, emptyWalPaths.get(i)); + } + + // wait for ReplicationSource to start reading from our empty wal + waitForLogAdvance(numRs, emptyWalPaths, false); + + // roll the original wal, which enqueues a new wal behind our empty wal + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + wal.rollWriter(true); + } + + // ReplicationSource should advance past the empty wal, or else the test will fail + waitForLogAdvance(numRs, emptyWalPaths, true); + + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + testSimplePutDelete(); + } + + /** + * Waits for the ReplicationSource to start reading from the given paths + * @param numRs number of regionservers + * @param emptyWalPaths path for each regionserver + * @param invert if true, waits until ReplicationSource is NOT reading from the given paths + */ + private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths, + final boolean invert) throws Exception { + Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + for (int i = 0; i < numRs; i++) { + Replication replicationService = (Replication) utility1.getHBaseCluster() + .getRegionServer(i).getReplicationSourceService(); + for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() + .getSources()) { + ReplicationSource source = (ReplicationSource) rsi; + if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) { + return false; + } + if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) { + return false; + } + } + } + return true; + } + }); + } }