HBASE-16554 Rebuild WAL tracker if trailer is corrupted.

Change-Id: Iecc3347de3de9fc57f57ab5f498aad404d02ec52


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b2eac0da
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b2eac0da
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b2eac0da

Branch: refs/heads/hbase-14439
Commit: b2eac0da33c4161aa8188213171afb03b72048a4
Parents: c5b8aab
Author: Apekshit Sharma <a...@apache.org>
Authored: Sat Sep 17 17:38:40 2016 -0700
Committer: Apekshit Sharma <a...@apache.org>
Committed: Mon Sep 19 12:23:48 2016 -0700

----------------------------------------------------------------------
 .../procedure2/store/ProcedureStoreTracker.java | 15 +++-
 .../procedure2/store/wal/ProcedureWALFile.java  |  2 +
 .../store/wal/ProcedureWALFormat.java           | 14 +++-
 .../store/wal/ProcedureWALFormatReader.java     | 59 +++++++++++---
 .../procedure2/store/wal/WALProcedureStore.java | 50 ++++++------
 .../store/wal/TestWALProcedureStore.java        | 82 ++++++++++++++++++++
 6 files changed, 178 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 78d6a44..a60ba3f 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -93,6 +93,7 @@ public class ProcedureStoreTracker {
     private long[] updated;
     /**
      * Keeps track of procedure ids which belong to this bitmap's range and 
have been deleted.
+     * This represents global state since it's not reset on WAL rolls.
      */
     private long[] deleted;
     /**
@@ -449,8 +450,7 @@ public class ProcedureStoreTracker {
     }
   }
 
-  public void resetToProto(ProcedureProtos.ProcedureStoreTracker 
trackerProtoBuf)
-      throws IOException {
+  public void resetToProto(final ProcedureProtos.ProcedureStoreTracker 
trackerProtoBuf) {
     reset();
     for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: 
trackerProtoBuf.getNodeList()) {
       final BitSetNode node = new BitSetNode(protoNode);
@@ -536,6 +536,7 @@ public class ProcedureStoreTracker {
     BitSetNode node = getOrCreateNode(procId);
     assert node.contains(procId) : "expected procId=" + procId + " in the 
node=" + node;
     node.updateState(procId, isDeleted);
+    trackProcIds(procId);
   }
 
   public void reset() {
@@ -545,6 +546,11 @@ public class ProcedureStoreTracker {
     resetUpdates();
   }
 
+  public boolean isUpdated(long procId) {
+    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+    return entry != null && entry.getValue().contains(procId) && 
entry.getValue().isUpdated(procId);
+  }
+
   /**
    * If {@link #partial} is false, returns state from the bitmap. If no state 
is found for
    * {@code procId}, returns YES.
@@ -583,6 +589,10 @@ public class ProcedureStoreTracker {
     }
   }
 
+  public boolean isPartial() {
+    return partial;
+  }
+
   public void setPartialFlag(boolean isPartial) {
     if (this.partial && !isPartial) {
       for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
@@ -720,6 +730,7 @@ public class ProcedureStoreTracker {
       entry.getValue().dump();
     }
   }
+
   /**
    * Iterates over
    * {@link BitSetNode}s in this.map and subtracts with corresponding ones 
from {@code other}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 99e7a7e..b9726a8 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -62,6 +62,7 @@ public class ProcedureWALFile implements 
Comparable<ProcedureWALFile> {
     this.logFile = logStatus.getPath();
     this.logSize = logStatus.getLen();
     this.timestamp = logStatus.getModificationTime();
+    tracker.setPartialFlag(true);
   }
 
   public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader 
header,
@@ -72,6 +73,7 @@ public class ProcedureWALFile implements 
Comparable<ProcedureWALFile> {
     this.startPos = startPos;
     this.logSize = startPos;
     this.timestamp = timestamp;
+    tracker.setPartialFlag(true);
   }
 
   public void open() throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 775ec11..5f726d0 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -25,6 +25,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -44,6 +46,8 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTr
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public final class ProcedureWALFormat {
+  private static final Log LOG = LogFactory.getLog(ProcedureWALFormat.class);
+
   static final byte LOG_TYPE_STREAM = 0;
   static final byte LOG_TYPE_COMPACTED = 1;
   static final byte LOG_TYPE_MAX_VALID = 1;
@@ -72,19 +76,21 @@ public final class ProcedureWALFormat {
 
   public static void load(final Iterator<ProcedureWALFile> logs,
       final ProcedureStoreTracker tracker, final Loader loader) throws 
IOException {
-    ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
+    final ProcedureWALFormatReader reader = new 
ProcedureWALFormatReader(tracker, loader);
     tracker.setKeepDeletes(true);
     try {
+      // Ignore the last log which is current active log.
       while (logs.hasNext()) {
         ProcedureWALFile log = logs.next();
         log.open();
         try {
-          reader.read(log, loader);
+          reader.read(log);
         } finally {
           log.close();
         }
       }
-      reader.finalize(loader);
+      reader.finish();
+
       // The tracker is now updated with all the procedures read from the logs
       tracker.setPartialFlag(false);
       tracker.resetUpdates();
@@ -246,4 +252,4 @@ public final class ProcedureWALFormat {
     }
     builder.build().writeDelimitedTo(slot);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 8678c86..118ec19 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -101,19 +101,40 @@ public class ProcedureWALFormatReader {
   private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
   private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
 
-  //private long compactionLogId;
+  // private long compactionLogId;
   private long maxProcId = 0;
-
+  /**
+   * If tracker for a log file is partial (see {@link 
ProcedureStoreTracker#partial}), we
+   * re-build the list of procedures updated in that WAL because we need it 
for log cleaning
+   * purpose. If all procedures updated in a WAL are found to be obsolete, it 
can be safely deleted.
+   * (see {@link WALProcedureStore#removeInactiveLogs()}).
+   * However, we don't need deleted part of a WAL's tracker for this purpose, 
so we don't bother
+   * re-building it. (To understand why, take a look at
+   * {@link 
ProcedureStoreTracker.BitSetNode#subtract(ProcedureStoreTracker.BitSetNode)}).
+   */
+  private ProcedureStoreTracker localTracker;
+  private final ProcedureWALFormat.Loader loader;
+  /**
+   * Global tracker. If set to partial, it will be updated as procedures are 
loaded from wals,
+   * otherwise not.
+   */
   private final ProcedureStoreTracker tracker;
-  private final boolean hasFastStartSupport;
+  // private final boolean hasFastStartSupport;
 
-  public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
+  public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
+      ProcedureWALFormat.Loader loader) {
     this.tracker = tracker;
+    this.loader = loader;
     // we support fast-start only if we have a clean shutdown.
-    this.hasFastStartSupport = !tracker.isEmpty();
+    // this.hasFastStartSupport = !tracker.isEmpty();
   }
 
-  public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) 
throws IOException {
+  public void read(final ProcedureWALFile log) throws IOException {
+    localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
+    if (localTracker != null) {
+      LOG.info("Rebuilding tracker for log - " + log);
+    }
+
     FSDataInputStream stream = log.getStream();
     try {
       boolean hasMore = true;
@@ -121,7 +142,6 @@ public class ProcedureWALFormatReader {
         ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
         if (entry == null) {
           LOG.warn("nothing left to decode. exiting with missing EOF");
-          hasMore = false;
           break;
         }
         switch (entry.getType()) {
@@ -150,9 +170,13 @@ public class ProcedureWALFormatReader {
       loader.markCorruptedWAL(log, e);
     }
 
+    if (localTracker != null) {
+      localTracker.setPartialFlag(false);
+    }
     if (!localProcedureMap.isEmpty()) {
       log.setProcIds(localProcedureMap.getMinProcId(), 
localProcedureMap.getMaxProcId());
       procedureMap.mergeTail(localProcedureMap);
+
       //if (hasFastStartSupport) {
         // TODO: Some procedure may be already runnables (see readInitEntry())
         //       (we can also check the "update map" in the log trackers)
@@ -164,7 +188,7 @@ public class ProcedureWALFormatReader {
     }
   }
 
-  public void finalize(ProcedureWALFormat.Loader loader) throws IOException {
+  public void finish() throws IOException {
     // notify the loader about the max proc ID
     loader.setMaxProcId(maxProcId);
 
@@ -185,7 +209,12 @@ public class ProcedureWALFormatReader {
         LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
       }
       localProcedureMap.add(proc);
-      tracker.setDeleted(proc.getProcId(), false);
+      if (tracker.isPartial()) {
+        tracker.insert(proc.getProcId());
+      }
+    }
+    if (localTracker != null) {
+      localTracker.insert(proc.getProcId());
     }
   }
 
@@ -236,7 +265,13 @@ public class ProcedureWALFormatReader {
     maxProcId = Math.max(maxProcId, procId);
     localProcedureMap.remove(procId);
     assert !procedureMap.contains(procId);
-    tracker.setDeleted(procId, true);
+    if (tracker.isPartial()) {
+      tracker.setDeleted(procId, true);
+    }
+    if (localTracker != null) {
+      // In case there is only delete entry for this procedure in current log.
+      localTracker.setDeleted(procId, true);
+    }
   }
 
   private boolean isDeleted(final long procId) {
@@ -264,7 +299,7 @@ public class ProcedureWALFormatReader {
   //      unlinkFromLinkList = None
   // ==========================================================================
   private static class Entry {
-    // hash-table next
+    // For bucketed linked lists in hash-table.
     protected Entry hashNext;
     // child head
     protected Entry childHead;
@@ -511,6 +546,8 @@ public class ProcedureWALFormatReader {
           childUnlinkedHead = other.childUnlinkedHead;
         }
       }
+      maxProcId = Math.max(maxProcId, other.maxProcId);
+      minProcId = Math.max(minProcId, other.minProcId);
 
       other.clear();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 0cfe4b0..bcd4e5f 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Set;
 import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.TimeUnit;
@@ -881,24 +880,22 @@ public class WALProcedureStore extends ProcedureStoreBase 
{
   }
 
   private void closeCurrentLogStream() {
+    if (stream == null) return;
     try {
-      if (stream != null) {
-        try {
-          ProcedureWALFile log = logs.getLast();
-          log.setProcIds(storeTracker.getUpdatedMinProcId(), 
storeTracker.getUpdatedMaxProcId());
-          log.updateLocalTracker(storeTracker);
-          long trailerSize = ProcedureWALFormat.writeTrailer(stream, 
storeTracker);
-          log.addToSize(trailerSize);
-        } catch (IOException e) {
-          LOG.warn("Unable to write the trailer: " + e.getMessage());
-        }
-        stream.close();
-      }
+      ProcedureWALFile log = logs.getLast();
+      log.setProcIds(storeTracker.getUpdatedMinProcId(), 
storeTracker.getUpdatedMaxProcId());
+      log.updateLocalTracker(storeTracker);
+      long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
+      log.addToSize(trailerSize);
+    } catch (IOException e) {
+      LOG.warn("Unable to write the trailer: " + e.getMessage());
+    }
+    try {
+      stream.close();
     } catch (IOException e) {
       LOG.error("Unable to close the stream", e);
-    } finally {
-      stream = null;
     }
+    stream = null;
   }
 
   // ==========================================================================
@@ -1058,11 +1055,15 @@ public class WALProcedureStore extends 
ProcedureStoreBase {
     return maxLogId;
   }
 
+  /**
+   * If last log's tracker is not null, use it as {@link #storeTracker}.
+   * Otherwise, set storeTracker as partial, and let {@link 
ProcedureWALFormatReader} rebuild
+   * it using entries in the log.
+   */
   private void initTrackerFromOldLogs() {
-    // TODO: Load the most recent tracker available
     if (logs.isEmpty()) return;
     ProcedureWALFile log = logs.getLast();
-    if (log.getTracker() != null) {
+    if (!log.getTracker().isPartial()) {
       storeTracker.resetTo(log.getTracker());
     } else {
       storeTracker.reset();
@@ -1074,7 +1075,7 @@ public class WALProcedureStore extends ProcedureStoreBase 
{
    * Loads given log file and it's tracker.
    */
   private ProcedureWALFile initOldLog(final FileStatus logFile) throws 
IOException {
-    ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
+    final ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
     if (logFile.getLen() == 0) {
       LOG.warn("Remove uninitialized log: " + logFile);
       log.removeFile();
@@ -1095,20 +1096,15 @@ public class WALProcedureStore extends 
ProcedureStoreBase {
       throw new IOException(msg, e);
     }
 
-    if (log.isCompacted()) {
-      try {
-        log.readTrailer();
-      } catch (IOException e) {
-        LOG.warn("Unfinished compacted log: " + logFile, e);
-        log.removeFile();
-        return null;
-      }
-    }
     try {
       log.readTracker();
     } catch (IOException e) {
+      log.getTracker().reset();
+      log.getTracker().setPartialFlag(true);
       LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
     }
+
+    log.close();
     return log;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2eac0da/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 2e2a038..5353d62 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -360,6 +360,88 @@ public class TestWALProcedureStore {
     assertEquals(0, loader.getCorruptedCount());
   }
 
+  void assertUpdated(final ProcedureStoreTracker tracker, Procedure[] procs,
+      int[] updatedProcs, int[] nonUpdatedProcs) {
+    for (int index : updatedProcs) {
+      long procId = procs[index].getProcId();
+      assertTrue("Procedure id : " + procId, tracker.isUpdated(procId));
+    }
+    for (int index : nonUpdatedProcs) {
+      long procId = procs[index].getProcId();
+      assertFalse("Procedure id : " + procId, tracker.isUpdated(procId));
+    }
+  }
+
+  void assertDeleted(final ProcedureStoreTracker tracker, Procedure[] procs,
+      int[] deletedProcs, int[] nonDeletedProcs) {
+    for (int index : deletedProcs) {
+      long procId = procs[index].getProcId();
+      assertEquals("Procedure id : " + procId,
+          ProcedureStoreTracker.DeleteState.YES, tracker.isDeleted(procId));
+    }
+    for (int index : nonDeletedProcs) {
+      long procId = procs[index].getProcId();
+      assertEquals("Procedure id : " + procId,
+          ProcedureStoreTracker.DeleteState.NO, tracker.isDeleted(procId));
+    }
+  }
+
+  @Test
+  public void testCorruptedTrailersRebuild() throws Exception {
+    final Procedure[] procs = new Procedure[6];
+    for (int i = 0; i < procs.length; ++i) {
+      procs[i] = new TestSequentialProcedure();
+    }
+    // Log State (I=insert, U=updated, D=delete)
+    //   | log 1 | log 2 | log 3 |
+    // 0 | I, D  |       |       |
+    // 1 | I     |       |       |
+    // 2 | I     | D     |       |
+    // 3 | I     | U     |       |
+    // 4 |       | I     | D     |
+    // 5 |       |       | I     |
+    procStore.insert(procs[0], null);
+    procStore.insert(procs[1], null);
+    procStore.insert(procs[2], null);
+    procStore.insert(procs[3], null);
+    procStore.delete(procs[0], null);
+    procStore.rollWriterForTesting();
+    procStore.delete(procs[2], null);
+    procStore.update(procs[3]);
+    procStore.insert(procs[4], null);
+    procStore.rollWriterForTesting();
+    procStore.delete(procs[4], null);
+    procStore.insert(procs[5], null);
+
+    // Stop the store
+    procStore.stop(false);
+
+    // Remove 4 byte from the trailers
+    final FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(3, logs.length);
+    for (int i = 0; i < logs.length; ++i) {
+      corruptLog(logs[i], 4);
+    }
+
+    // Restart the store
+    final LoadCounter loader = new LoadCounter();
+    storeRestart(loader);
+    assertEquals(3, loader.getLoadedCount());  // procs 1, 3 and 5
+    assertEquals(0, loader.getCorruptedCount());
+
+    // Check the Trackers
+    final ArrayList<ProcedureWALFile> walFiles = procStore.getActiveLogs();
+    assertEquals(4, walFiles.size());
+    LOG.info("Checking wal " + walFiles.get(0));
+    assertUpdated(walFiles.get(0).getTracker(), procs, new int[]{0, 1, 2, 3}, 
new int[] {4, 5});
+    LOG.info("Checking wal " + walFiles.get(1));
+    assertUpdated(walFiles.get(1).getTracker(), procs, new int[]{2, 3, 4}, new 
int[] {0, 1, 5});
+    LOG.info("Checking wal " + walFiles.get(2));
+    assertUpdated(walFiles.get(2).getTracker(), procs, new int[]{4, 5}, new 
int[] {0, 1, 2, 3});
+    LOG.info("Checking global tracker ");
+    assertDeleted(procStore.getStoreTracker(), procs, new int[]{0, 2, 4}, new 
int[] {1, 3, 5});
+  }
+
   @Test
   public void testCorruptedEntries() throws Exception {
     // Insert something

Reply via email to