HBASE-16554 Rebuild WAL tracker if trailer is corrupted. Change-Id: Iecc3347de3de9fc57f57ab5f498aad404d02ec52
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b2eac0da Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b2eac0da Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b2eac0da Branch: refs/heads/hbase-12439 Commit: b2eac0da33c4161aa8188213171afb03b72048a4 Parents: c5b8aab Author: Apekshit Sharma <a...@apache.org> Authored: Sat Sep 17 17:38:40 2016 -0700 Committer: Apekshit Sharma <a...@apache.org> Committed: Mon Sep 19 12:23:48 2016 -0700 ---------------------------------------------------------------------- .../procedure2/store/ProcedureStoreTracker.java | 15 +++- .../procedure2/store/wal/ProcedureWALFile.java | 2 + .../store/wal/ProcedureWALFormat.java | 14 +++- .../store/wal/ProcedureWALFormatReader.java | 59 +++++++++++--- .../procedure2/store/wal/WALProcedureStore.java | 50 ++++++------ .../store/wal/TestWALProcedureStore.java | 82 ++++++++++++++++++++ 6 files changed, 178 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 78d6a44..a60ba3f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -93,6 +93,7 @@ public class ProcedureStoreTracker { private long[] updated; /** * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. + * This represents global state since it's not reset on WAL rolls. */ private long[] deleted; /** @@ -449,8 +450,7 @@ public class ProcedureStoreTracker { } } - public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) - throws IOException { + public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { reset(); for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) { final BitSetNode node = new BitSetNode(protoNode); @@ -536,6 +536,7 @@ public class ProcedureStoreTracker { BitSetNode node = getOrCreateNode(procId); assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; node.updateState(procId, isDeleted); + trackProcIds(procId); } public void reset() { @@ -545,6 +546,11 @@ public class ProcedureStoreTracker { resetUpdates(); } + public boolean isUpdated(long procId) { + final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); + return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId); + } + /** * If {@link #partial} is false, returns state from the bitmap. If no state is found for * {@code procId}, returns YES. @@ -583,6 +589,10 @@ public class ProcedureStoreTracker { } } + public boolean isPartial() { + return partial; + } + public void setPartialFlag(boolean isPartial) { if (this.partial && !isPartial) { for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { @@ -720,6 +730,7 @@ public class ProcedureStoreTracker { entry.getValue().dump(); } } + /** * Iterates over * {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other} http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 99e7a7e..b9726a8 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -62,6 +62,7 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> { this.logFile = logStatus.getPath(); this.logSize = logStatus.getLen(); this.timestamp = logStatus.getModificationTime(); + tracker.setPartialFlag(true); } public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, @@ -72,6 +73,7 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> { this.startPos = startPos; this.logSize = startPos; this.timestamp = timestamp; + tracker.setPartialFlag(true); } public void open() throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index 775ec11..5f726d0 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -25,6 +25,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Iterator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTr @InterfaceAudience.Private @InterfaceStability.Evolving public final class ProcedureWALFormat { + private static final Log LOG = LogFactory.getLog(ProcedureWALFormat.class); + static final byte LOG_TYPE_STREAM = 0; static final byte LOG_TYPE_COMPACTED = 1; static final byte LOG_TYPE_MAX_VALID = 1; @@ -72,19 +76,21 @@ public final class ProcedureWALFormat { public static void load(final Iterator<ProcedureWALFile> logs, final ProcedureStoreTracker tracker, final Loader loader) throws IOException { - ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker); + final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); tracker.setKeepDeletes(true); try { + // Ignore the last log which is current active log. while (logs.hasNext()) { ProcedureWALFile log = logs.next(); log.open(); try { - reader.read(log, loader); + reader.read(log); } finally { log.close(); } } - reader.finalize(loader); + reader.finish(); + // The tracker is now updated with all the procedures read from the logs tracker.setPartialFlag(false); tracker.resetUpdates(); @@ -246,4 +252,4 @@ public final class ProcedureWALFormat { } builder.build().writeDelimitedTo(slot); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 8678c86..118ec19 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -101,19 +101,40 @@ public class ProcedureWALFormatReader { private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); private final WalProcedureMap procedureMap = new WalProcedureMap(1024); - //private long compactionLogId; + // private long compactionLogId; private long maxProcId = 0; - + /** + * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we + * re-build the list of procedures updated in that WAL because we need it for log cleaning + * purpose. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted. + * (see {@link WALProcedureStore#removeInactiveLogs()}). + * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother + * re-building it. (To understand why, take a look at + * {@link ProcedureStoreTracker.BitSetNode#subtract(ProcedureStoreTracker.BitSetNode)}). + */ + private ProcedureStoreTracker localTracker; + private final ProcedureWALFormat.Loader loader; + /** + * Global tracker. If set to partial, it will be updated as procedures are loaded from wals, + * otherwise not. + */ private final ProcedureStoreTracker tracker; - private final boolean hasFastStartSupport; + // private final boolean hasFastStartSupport; - public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) { + public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, + ProcedureWALFormat.Loader loader) { this.tracker = tracker; + this.loader = loader; // we support fast-start only if we have a clean shutdown. - this.hasFastStartSupport = !tracker.isEmpty(); + // this.hasFastStartSupport = !tracker.isEmpty(); } - public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException { + public void read(final ProcedureWALFile log) throws IOException { + localTracker = log.getTracker().isPartial() ? log.getTracker() : null; + if (localTracker != null) { + LOG.info("Rebuilding tracker for log - " + log); + } + FSDataInputStream stream = log.getStream(); try { boolean hasMore = true; @@ -121,7 +142,6 @@ public class ProcedureWALFormatReader { ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); if (entry == null) { LOG.warn("nothing left to decode. exiting with missing EOF"); - hasMore = false; break; } switch (entry.getType()) { @@ -150,9 +170,13 @@ public class ProcedureWALFormatReader { loader.markCorruptedWAL(log, e); } + if (localTracker != null) { + localTracker.setPartialFlag(false); + } if (!localProcedureMap.isEmpty()) { log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); procedureMap.mergeTail(localProcedureMap); + //if (hasFastStartSupport) { // TODO: Some procedure may be already runnables (see readInitEntry()) // (we can also check the "update map" in the log trackers) @@ -164,7 +188,7 @@ public class ProcedureWALFormatReader { } } - public void finalize(ProcedureWALFormat.Loader loader) throws IOException { + public void finish() throws IOException { // notify the loader about the max proc ID loader.setMaxProcId(maxProcId); @@ -185,7 +209,12 @@ public class ProcedureWALFormatReader { LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); } localProcedureMap.add(proc); - tracker.setDeleted(proc.getProcId(), false); + if (tracker.isPartial()) { + tracker.insert(proc.getProcId()); + } + } + if (localTracker != null) { + localTracker.insert(proc.getProcId()); } } @@ -236,7 +265,13 @@ public class ProcedureWALFormatReader { maxProcId = Math.max(maxProcId, procId); localProcedureMap.remove(procId); assert !procedureMap.contains(procId); - tracker.setDeleted(procId, true); + if (tracker.isPartial()) { + tracker.setDeleted(procId, true); + } + if (localTracker != null) { + // In case there is only delete entry for this procedure in current log. + localTracker.setDeleted(procId, true); + } } private boolean isDeleted(final long procId) { @@ -264,7 +299,7 @@ public class ProcedureWALFormatReader { // unlinkFromLinkList = None // ========================================================================== private static class Entry { - // hash-table next + // For bucketed linked lists in hash-table. protected Entry hashNext; // child head protected Entry childHead; @@ -511,6 +546,8 @@ public class ProcedureWALFormatReader { childUnlinkedHead = other.childUnlinkedHead; } } + maxProcId = Math.max(maxProcId, other.maxProcId); + minProcId = Math.max(minProcId, other.minProcId); other.clear(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 0cfe4b0..bcd4e5f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -30,7 +30,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.ListIterator; import java.util.Set; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; @@ -881,24 +880,22 @@ public class WALProcedureStore extends ProcedureStoreBase { } private void closeCurrentLogStream() { + if (stream == null) return; try { - if (stream != null) { - try { - ProcedureWALFile log = logs.getLast(); - log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); - log.updateLocalTracker(storeTracker); - long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); - log.addToSize(trailerSize); - } catch (IOException e) { - LOG.warn("Unable to write the trailer: " + e.getMessage()); - } - stream.close(); - } + ProcedureWALFile log = logs.getLast(); + log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); + log.updateLocalTracker(storeTracker); + long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); + log.addToSize(trailerSize); + } catch (IOException e) { + LOG.warn("Unable to write the trailer: " + e.getMessage()); + } + try { + stream.close(); } catch (IOException e) { LOG.error("Unable to close the stream", e); - } finally { - stream = null; } + stream = null; } // ========================================================================== @@ -1058,11 +1055,15 @@ public class WALProcedureStore extends ProcedureStoreBase { return maxLogId; } + /** + * If last log's tracker is not null, use it as {@link #storeTracker}. + * Otherwise, set storeTracker as partial, and let {@link ProcedureWALFormatReader} rebuild + * it using entries in the log. + */ private void initTrackerFromOldLogs() { - // TODO: Load the most recent tracker available if (logs.isEmpty()) return; ProcedureWALFile log = logs.getLast(); - if (log.getTracker() != null) { + if (!log.getTracker().isPartial()) { storeTracker.resetTo(log.getTracker()); } else { storeTracker.reset(); @@ -1074,7 +1075,7 @@ public class WALProcedureStore extends ProcedureStoreBase { * Loads given log file and it's tracker. */ private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException { - ProcedureWALFile log = new ProcedureWALFile(fs, logFile); + final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); if (logFile.getLen() == 0) { LOG.warn("Remove uninitialized log: " + logFile); log.removeFile(); @@ -1095,20 +1096,15 @@ public class WALProcedureStore extends ProcedureStoreBase { throw new IOException(msg, e); } - if (log.isCompacted()) { - try { - log.readTrailer(); - } catch (IOException e) { - LOG.warn("Unfinished compacted log: " + logFile, e); - log.removeFile(); - return null; - } - } try { log.readTracker(); } catch (IOException e) { + log.getTracker().reset(); + log.getTracker().setPartialFlag(true); LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); } + + log.close(); return log; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 2e2a038..5353d62 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -360,6 +360,88 @@ public class TestWALProcedureStore { assertEquals(0, loader.getCorruptedCount()); } + void assertUpdated(final ProcedureStoreTracker tracker, Procedure[] procs, + int[] updatedProcs, int[] nonUpdatedProcs) { + for (int index : updatedProcs) { + long procId = procs[index].getProcId(); + assertTrue("Procedure id : " + procId, tracker.isUpdated(procId)); + } + for (int index : nonUpdatedProcs) { + long procId = procs[index].getProcId(); + assertFalse("Procedure id : " + procId, tracker.isUpdated(procId)); + } + } + + void assertDeleted(final ProcedureStoreTracker tracker, Procedure[] procs, + int[] deletedProcs, int[] nonDeletedProcs) { + for (int index : deletedProcs) { + long procId = procs[index].getProcId(); + assertEquals("Procedure id : " + procId, + ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId)); + } + for (int index : nonDeletedProcs) { + long procId = procs[index].getProcId(); + assertEquals("Procedure id : " + procId, + ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId)); + } + } + + @Test + public void testCorruptedTrailersRebuild() throws Exception { + final Procedure[] procs = new Procedure[6]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new TestSequentialProcedure(); + } + // Log State (I=insert, U=updated, D=delete) + // | log 1 | log 2 | log 3 | + // 0 | I, D | | | + // 1 | I | | | + // 2 | I | D | | + // 3 | I | U | | + // 4 | | I | D | + // 5 | | | I | + procStore.insert(procs[0], null); + procStore.insert(procs[1], null); + procStore.insert(procs[2], null); + procStore.insert(procs[3], null); + procStore.delete(procs[0], null); + procStore.rollWriterForTesting(); + procStore.delete(procs[2], null); + procStore.update(procs[3]); + procStore.insert(procs[4], null); + procStore.rollWriterForTesting(); + procStore.delete(procs[4], null); + procStore.insert(procs[5], null); + + // Stop the store + procStore.stop(false); + + // Remove 4 byte from the trailers + final FileStatus[] logs = fs.listStatus(logDir); + assertEquals(3, logs.length); + for (int i = 0; i < logs.length; ++i) { + corruptLog(logs[i], 4); + } + + // Restart the store + final LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(3, loader.getLoadedCount()); // procs 1, 3 and 5 + assertEquals(0, loader.getCorruptedCount()); + + // Check the Trackers + final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs(); + assertEquals(4, walFiles.size()); + LOG.info("Checking wal " + walFiles.get(0)); + assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, new int[] {4, 5}); + LOG.info("Checking wal " + walFiles.get(1)); + assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new int[] {0, 1, 5}); + LOG.info("Checking wal " + walFiles.get(2)); + assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new int[] {0, 1, 2, 3}); + LOG.info("Checking global tracker "); + assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new int[] {1, 3, 5}); + } + @Test public void testCorruptedEntries() throws Exception { // Insert something