HBASE-16824 Writer.flush() can be called on already closed streams in WAL roll
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/57181442 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/57181442 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/57181442 Branch: refs/heads/branch-1.2 Commit: 57181442577c36689114334b011a6e72de4ae785 Parents: bcc74e5 Author: Enis Soztutar <e...@apache.org> Authored: Tue Oct 18 18:46:02 2016 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Oct 18 19:19:12 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/wal/FSHLog.java | 31 ++++++++++++-- .../wal/TestLogRollingNoCluster.java | 43 ++++++++++++++------ 2 files changed, 57 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/57181442/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 79ff1bc..7e3d82b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1127,6 +1127,7 @@ public class FSHLog implements WAL { private volatile long sequence; // Keep around last exception thrown. Clear on successful sync. private final BlockingQueue<SyncFuture> syncFutures; + private volatile SyncFuture takeSyncFuture = null; /** * UPDATE! @@ -1216,13 +1217,21 @@ public class FSHLog implements WAL { return sequence; } + boolean areSyncFuturesReleased() { + // check whether there is no sync futures offered, and no in-flight sync futures that is being + // processed. + return syncFutures.size() <= 0 + && takeSyncFuture == null; + } + public void run() { long currentSequence; while (!isInterrupted()) { int syncCount = 0; - SyncFuture takeSyncFuture; + try { while (true) { + takeSyncFuture = null; // We have to process what we 'take' from the queue takeSyncFuture = this.syncFutures.take(); currentSequence = this.sequence; @@ -1733,9 +1742,21 @@ public class FSHLog implements WAL { * @return True if outstanding sync futures still */ private boolean isOutstandingSyncs() { + // Look at SyncFutures in the EventHandler for (int i = 0; i < this.syncFuturesCount; i++) { if (!this.syncFutures[i].isDone()) return true; } + + return false; + } + + private boolean isOutstandingSyncsFromRunners() { + // Look at SyncFutures in the SyncRunners + for (SyncRunner syncRunner: syncRunners) { + if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) { + return true; + } + } return false; } @@ -1846,11 +1867,13 @@ public class FSHLog implements WAL { // Wait on outstanding syncers; wait for them to finish syncing (unless we've been // shutdown or unless our latch has been thrown because we have been aborted or unless // this WAL is broken and we can't get a sync/append to complete). - while (!this.shutdown && this.zigzagLatch.isCocked() && - highestSyncedSequence.get() < currentSequence && + while ((!this.shutdown && this.zigzagLatch.isCocked() + && highestSyncedSequence.get() < currentSequence && // We could be in here and all syncs are failing or failed. Check for this. Otherwise // we'll just be stuck here for ever. In other words, ensure there syncs running. - isOutstandingSyncs()) { + isOutstandingSyncs()) + // Wait for all SyncRunners to finish their work so that we can replace the writer + || isOutstandingSyncsFromRunners()) { synchronized (this.safePointWaiter) { this.safePointWaiter.wait(0, 1); } http://git-wip-us.apache.org/repos/asf/hbase/blob/57181442/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 1c36552..034ddcd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertFalse; - import java.io.IOException; - +import java.util.TreeMap; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -49,7 +50,18 @@ import org.junit.experimental.categories.Category; public class TestLogRollingNoCluster { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; - private static final int THREAD_COUNT = 100; // Spin up this many threads + private static final int NUM_THREADS = 100; // Spin up this many threads + private static final int NUM_ENTRIES = 100; // How many entries to write + + /** ProtobufLogWriter that simulates higher latencies in sync() call */ + public static class HighLatencySyncWriter extends ProtobufLogWriter { + @Override + public void sync() throws IOException { + Threads.sleep(ThreadLocalRandom.current().nextInt(10)); + super.sync(); + Threads.sleep(ThreadLocalRandom.current().nextInt(10)); + } + } /** * Spin up a bunch of threads and have them all append to a WAL. Roll the @@ -58,37 +70,41 @@ public class TestLogRollingNoCluster { * @throws InterruptedException */ @Test - public void testContendedLogRolling() throws IOException, InterruptedException { - Path dir = TEST_UTIL.getDataTestDir(); + public void testContendedLogRolling() throws Exception { + TEST_UTIL.startMiniDFSCluster(3); + Path dir = TEST_UTIL.getDataTestDirOnTestFS(); + // The implementation needs to know the 'handler' count. - TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREAD_COUNT); + TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS); final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); FSUtils.setRootDir(conf, dir); + conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName()); final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName()); final WAL wal = wals.getWAL(new byte[]{}); Appender [] appenders = null; - final int count = THREAD_COUNT; - appenders = new Appender[count]; + final int numThreads = NUM_THREADS; + appenders = new Appender[numThreads]; try { - for (int i = 0; i < count; i++) { + for (int i = 0; i < numThreads; i++) { // Have each appending thread write 'count' entries - appenders[i] = new Appender(wal, i, count); + appenders[i] = new Appender(wal, i, NUM_ENTRIES); } - for (int i = 0; i < count; i++) { + for (int i = 0; i < numThreads; i++) { appenders[i].start(); } - for (int i = 0; i < count; i++) { + for (int i = 0; i < numThreads; i++) { //ensure that all threads are joined before closing the wal appenders[i].join(); } } finally { wals.close(); } - for (int i = 0; i < count; i++) { + for (int i = 0; i < numThreads; i++) { assertFalse(appenders[i].isException()); } + TEST_UTIL.shutdownMiniDFSCluster(); } /** @@ -137,6 +153,7 @@ public class TestLogRollingNoCluster { final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME); final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), TableName.META_TABLE_NAME, now, mvcc), edit, true); + Threads.sleep(ThreadLocalRandom.current().nextInt(5)); wal.sync(txid); } String msg = getName() + " finished";