Author: jukka
Date: Mon Mar 31 15:37:39 2014
New Revision: 1583352

URL: http://svn.apache.org/r1583352
Log:
OAK-1634: After crash, segment persistence is broken with failures in java.nio 
classes (with v0.19)

Add more sanity checks and a mechanism for automatically rewinding to a
previously committed revision if the latest one can't be accessed.

Added:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1583352&r1=1583351&r2=1583352&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
 Mon Mar 31 15:37:39 2014
@@ -20,6 +20,7 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.newCopyOnWriteArrayList;
+import static com.google.common.collect.Lists.newLinkedList;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
@@ -27,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -165,16 +167,26 @@ public class FileStore implements Segmen
         journalFile = new RandomAccessFile(
                 new File(directory, JOURNAL_FILE_NAME), "rw");
 
-        RecordId id = null;
+        LinkedList<RecordId> heads = newLinkedList();
         String line = journalFile.readLine();
         while (line != null) {
             int space = line.indexOf(' ');
             if (space != -1) {
-                id = RecordId.fromString(tracker, line.substring(0, space));
+                heads.add(RecordId.fromString(tracker, line.substring(0, 
space)));
             }
             line = journalFile.readLine();
         }
 
+        RecordId id = null;
+        while (id == null && !heads.isEmpty()) {
+            RecordId last = heads.removeLast();
+            if (containsSegment(last.getSegmentId(), dataFiles)) {
+                id = last;
+            } else {
+                log.warn("Unable to committed revision {}, rewinding...", 
last);
+            }
+        }
+
         if (id != null) {
             head = new AtomicReference<RecordId>(id);
             persistedHead = new AtomicReference<RecordId>(id);
@@ -333,12 +345,11 @@ public class FileStore implements Segmen
     }
 
     private boolean containsSegment(SegmentId id, List<TarFile> files) {
-        UUID uuid = new UUID(
-                id.getMostSignificantBits(),
-                id.getLeastSignificantBits());
+        long msb = id.getMostSignificantBits();
+        long lsb = id.getLeastSignificantBits();
         for (TarFile file : files) {
             try {
-                ByteBuffer buffer = file.readEntry(uuid);
+                ByteBuffer buffer = file.readEntry(msb, lsb);
                 if (buffer != null) {
                     return true;
                 }
@@ -351,12 +362,11 @@ public class FileStore implements Segmen
     }
 
     private Segment loadSegment(SegmentId id, List<TarFile> files) {
-        UUID uuid = new UUID(
-                id.getMostSignificantBits(),
-                id.getLeastSignificantBits());
+        long msb = id.getMostSignificantBits();
+        long lsb = id.getLeastSignificantBits();
         for (TarFile file : files) {
             try {
-                ByteBuffer buffer = file.readEntry(uuid);
+                ByteBuffer buffer = file.readEntry(msb, lsb);
                 if (buffer != null) {
                     return new Segment(tracker, id, buffer);
                 }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java?rev=1583352&r1=1583351&r2=1583352&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
 Mon Mar 31 15:37:39 2014
@@ -30,9 +30,21 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.CRC32;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class TarFile {
 
+    /** Logger instance */
+    private static final Logger log = LoggerFactory.getLogger(TarFile.class);
+
+    /** Magic byte sequence at the end of the index block. */
+    private static final int INDEX_MAGIC =
+            '\n' << 24 + '0' << 16 + 'K' << 8 + '\n';
+
     /** The tar file block size. */
     private static final int BLOCK_SIZE = 512;
 
@@ -44,17 +56,17 @@ class TarFile {
 
     private int position = 0;
 
-    private final int maxFileSize;
-
     private final byte[] indexEntryName;
 
-    private final Map<UUID, TarEntry> entries = newConcurrentMap();
+    private final ByteBuffer index;
+
+    private final ConcurrentMap<UUID, TarEntry> entries;
 
     TarFile(File file, int maxFileSize, boolean memoryMapping)
             throws IOException {
         long len = file.length();
         checkState(len <= Integer.MAX_VALUE);
-        this.maxFileSize = Math.max((int) len, maxFileSize);
+        maxFileSize = Math.max((int) len, maxFileSize);
         checkState(maxFileSize % BLOCK_SIZE == 0);
         checkState(maxFileSize > 5 * BLOCK_SIZE);
         this.indexEntryName = (file.getName() + ".idx").getBytes(UTF_8);
@@ -62,7 +74,7 @@ class TarFile {
         this.file = file;
         RandomAccessFile f = new RandomAccessFile(file, "rw");
         if (memoryMapping) {
-            this.access = new MappedAccess(f, this.maxFileSize);
+            this.access = new MappedAccess(f, maxFileSize);
         } else {
             this.access = new RandomAccess(f);
         }
@@ -71,10 +83,10 @@ class TarFile {
         if (len == 0) {
             // allocate the full file by writing one big index entry
             writeEntryHeader(indexEntryName, maxFileSize - 3 * BLOCK_SIZE);
-            // zero-out the last four bytes to indicate an empty index
+            // zero-out the last 16 bytes to indicate an empty index
             access.write(
-                    maxFileSize - (ZERO_BYTES.length * 2 + 4),
-                    ZERO_BYTES, 0, 4);
+                    maxFileSize - (ZERO_BYTES.length * 2 + 16),
+                    ZERO_BYTES, 0, 16);
             // tar format expects the last two blocks to be zero
             access.write(
                     maxFileSize - ZERO_BYTES.length * 2,
@@ -82,8 +94,16 @@ class TarFile {
             access.write(
                     maxFileSize - ZERO_BYTES.length,
                     ZERO_BYTES, 0, ZERO_BYTES.length);
+
+            this.index = null;
+            this.entries = newConcurrentMap();
         } else {
-            readIndex(len);
+            this.index = loadAndValidateIndex();
+            if (index == null) {
+                this.entries = loadEntryMap();
+            } else {
+                this.entries = null;
+            }
         }
     }
 
@@ -91,8 +111,8 @@ class TarFile {
         return entries.keySet();
     }
 
-    ByteBuffer readEntry(UUID uuid) throws IOException {
-        TarEntry entry = entries.get(uuid);
+    ByteBuffer readEntry(long msb, long lsb) throws IOException {
+        TarEntry entry = indexLookup(msb, lsb);
         if (entry != null) {
             return access.read(entry.offset(), entry.size());
         } else {
@@ -106,15 +126,22 @@ class TarFile {
 
     synchronized boolean writeEntry(
             UUID uuid, byte[] b, int offset, int size) throws IOException {
-        int indexSize = entries.size() * 24 + 4;
+        if (entries == null) {
+            return false;
+        }
+
+        int length = access.length();
+        int indexSize = entries.size() * 24 + 16;
         if (position
-                + getEntrySize(size)      // this entry
-                + getEntrySize(indexSize) // index entry
-                + 2 * BLOCK_SIZE          // two zero blocks at the end
-                > maxFileSize) {
-            writeEntryHeader(
-                    indexEntryName, maxFileSize - 3 * BLOCK_SIZE - position);
+                + getEntrySize(size)           // this entry
+                + getEntrySize(indexSize + 24) // index with one extra entry
+                + 2 * BLOCK_SIZE               // two zero blocks at the end
+                > length) {
+            int bytes = length - position - 3 * BLOCK_SIZE;
+            writeEntryHeader(indexEntryName, bytes);
+
             ByteBuffer index = ByteBuffer.allocate(indexSize);
+
             SortedMap<UUID, TarEntry> sorted = newTreeMap();
             sorted.putAll(entries);
             for (Map.Entry<UUID, TarEntry> entry : sorted.entrySet()) {
@@ -123,26 +150,25 @@ class TarFile {
                 index.putInt(entry.getValue().offset());
                 index.putInt(entry.getValue().size());
             }
-            index.putInt(sorted.size());
+
+            CRC32 checksum = new CRC32();
+            checksum.update(index.array(), 0, index.position());
+            index.putInt((int) checksum.getValue());
+            index.putInt(entries.size());
+            index.putInt(bytes);
+            index.putInt(INDEX_MAGIC);
+
             access.write(
-                    maxFileSize - 2 * BLOCK_SIZE - indexSize,
+                    length - 2 * BLOCK_SIZE - indexSize,
                     index.array(), 0, indexSize);
-            position = maxFileSize - 2 * BLOCK_SIZE;
+            position = length - 2 * BLOCK_SIZE;
             return false;
         }
 
         writeEntryHeader(uuid.toString().getBytes(UTF_8), size);
-        position += BLOCK_SIZE;
-
-        access.write(position, b, offset, size);
-        entries.put(uuid, new TarEntry(position, size));
-        position += size;
-
-        int padding = BLOCK_SIZE - position % BLOCK_SIZE;
-        if (padding < BLOCK_SIZE) {
-            access.write(position, ZERO_BYTES, 0, padding);
-            position += padding;
-        }
+        access.write(position + BLOCK_SIZE, b, offset, size);
+        entries.put(uuid, new TarEntry(position + BLOCK_SIZE, size));
+        position += getEntrySize(size);
 
         return true;
     }
@@ -224,20 +250,161 @@ class TarFile {
         access.close();
     }
 
-    private void readIndex(long len) throws IOException {
-        while (position + BLOCK_SIZE <= len) {
+    //-----------------------------------------------------------< private >--
+
+    private TarEntry indexLookup(long msb, long lsb) {
+        if (entries != null) {
+            return entries.get(new UUID(msb, lsb));
+        }
+
+        // The segment identifiers are randomly generated with uniform
+        // distribution, so we can use interpolation search to find the
+        // matching entry in the index. The average runtime is O(log log n).
+
+        int lowIndex = 0;
+        int highIndex = index.remaining() / 24 - 1;
+        float lowValue = Long.MIN_VALUE;
+        float highValue = Long.MAX_VALUE;
+        float targetValue = msb;
+
+        while (lowIndex <= highIndex) {
+            int guessIndex = lowIndex + Math.round(
+                    (highIndex - lowIndex)
+                    * (targetValue - lowValue)
+                    / (highValue - lowValue));
+            int position = index.position() + guessIndex * 24;
+            long guess = index.getLong(position);
+            if (msb < guess) {
+                highIndex = guessIndex - 1;
+                highValue = guess;
+            } else if (msb > guess) {
+                lowIndex = guessIndex + 1;
+                lowValue = guess;
+            } else {
+                // getting close...
+                guess = index.getLong(position + 8);
+                if (lsb < guess) {
+                    highIndex = guessIndex - 1;
+                    highValue = guess;
+                } else if (lsb > guess) {
+                    lowIndex = guessIndex + 1;
+                    lowValue = guess;
+                } else {
+                    // found it!
+                    return new TarEntry(
+                            index.getInt(position + 16),
+                            index.getInt(position + 20));
+                }
+            }
+        }
+
+        // not found
+        return null;
+    }
+
+    /**
+     * Tries to read an existing index from the tar file. The index is
+     * returned if it is found and looks valid (correct checksum, passes
+     * sanity checks).
+     *
+     * @return tar index, or {@code null} if not found or not valid
+     * @throws IOException if the tar file could not be read
+     */
+    private ByteBuffer loadAndValidateIndex() throws IOException {
+        int length = access.length();
+        if (length % BLOCK_SIZE != 0 || length < 6 * BLOCK_SIZE) {
+            log.warn("Unexpected size {} of tar file {}", length, file);
+            return null; // unexpected file size
+        }
+
+        // read the index metadata just before the two final zero blocks
+        ByteBuffer meta = access.read(length - 2 * BLOCK_SIZE - 16, 16);
+        int crc32 = meta.getInt();
+        int count = meta.getInt();
+        int bytes = meta.getInt();
+        int magic = meta.getInt();
+
+        if (magic != INDEX_MAGIC) {
+            // no warning here, as the index has probably not yet been written
+            return null; // magic byte mismatch
+        }
+
+        if (count < 1 || bytes < count * 24 + 16 || bytes % BLOCK_SIZE != 0) {
+            log.warn("Invalid index footer in tar file {}", file);
+            return null; // impossible entry and/or byte counts
+        }
+
+        int position = length - 2 * BLOCK_SIZE - 16 - count * 24;
+        ByteBuffer index = access.read(position, count * 24);
+        index.mark();
+
+        CRC32 checksum = new CRC32();
+        int limit = length - 2 * BLOCK_SIZE - bytes - BLOCK_SIZE;
+        long lastmsb = Long.MIN_VALUE;
+        long lastlsb = Long.MIN_VALUE;
+        byte[] buffer = new byte[24];
+        for (int i = 0; i < count; i++) {
+            index.get(buffer);
+            checksum.update(buffer);
+
+            ByteBuffer entry = ByteBuffer.wrap(buffer);
+            long msb   = entry.getLong();
+            long lsb   = entry.getLong();
+            int offset = entry.getInt();
+            int size   = entry.getInt();
+
+            if (lastmsb > msb || (lastmsb == msb && lastlsb > lsb)) {
+                log.warn("Incorrect index ordering in tar file {}", file);
+                return null;
+            } else if (lastmsb == msb && lastlsb == lsb && i > 0) {
+                log.warn("Duplicate entry in the index of tar file {}", file);
+                return null;
+            } else if (offset < 0 || offset % BLOCK_SIZE != 0) {
+                log.warn("Invalid entry offset in the index of tar file {}", 
file);
+                return null;
+            } else if (size < 1 || offset + size > limit) {
+                log.warn("Invalid entry size in the index of tar file {}", 
file);
+                return null;
+            }
+
+            lastmsb = msb;
+            lastlsb = lsb;
+        }
+
+        if (crc32 != (int) checksum.getValue()) {
+            log.warn("Invalid index checksum in tar file {}", file);
+            return null; // checksum mismatch
+        }
+
+        index.reset();
+        return index;
+    }
+
+    /**
+     * Scans through the tar file, looking for all segment entries. Used on
+     * tar files that don't already contain an index.
+     *
+     * @return map of all segment entries in this tar file
+     * @throws IOException if the tar file could not be read
+     */
+    private ConcurrentMap<UUID, TarEntry> loadEntryMap() throws IOException {
+        ConcurrentMap<UUID, TarEntry> entries = newConcurrentMap();
+
+        int limit = access.length() - 2 * BLOCK_SIZE;
+        while (position + 2 * BLOCK_SIZE <= limit) {
             // read the tar header block
-            ByteBuffer buffer = this.access.read(position, BLOCK_SIZE);
-            String name = readString(buffer, 100);
-            buffer.position(124);
-            int size = readNumber(buffer, 12);
+            ByteBuffer header = access.read(position, BLOCK_SIZE);
+            String name = readString(header, 100);
+            header.position(124);
+            int size = readNumber(header, 12);
+
             // TODO: verify the checksum, magic, etc.?
 
             if (name.isEmpty() && size == 0) {
                 break; // no more entries in this file
             } else if (Arrays.equals(name.getBytes(UTF_8), indexEntryName)) {
                 break; // index entry encountered, so stop here
-            } else if (position + BLOCK_SIZE + size > len) {
+            } else if (position + BLOCK_SIZE + size > limit) {
                 break; // invalid entry, truncate the file at this point
             }
 
@@ -245,11 +412,13 @@ class TarFile {
                 UUID id = UUID.fromString(name);
                 entries.put(id, new TarEntry(position + BLOCK_SIZE, size));
             } catch (IllegalArgumentException e) {
-                throw new IOException("Unexpected tar entry: " + name);
+                break; // unexpected entry, truncate the file at this point
             }
 
-            position += (1 + (size + BLOCK_SIZE - 1) / BLOCK_SIZE) * 
BLOCK_SIZE;
+            position += getEntrySize(size);
         }
+
+        return entries;
     }
 
     private static String readString(ByteBuffer buffer, int fieldSize) {

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java?rev=1583352&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
 Mon Mar 31 15:37:39 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.file;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStoreTest {
+
+    private File directory;
+
+    @Before
+    public void setUp() throws IOException {
+        directory = File.createTempFile(
+                "FileStoreTest", "dir", new File("target"));
+        directory.delete();
+        directory.mkdir();
+    }
+
+    @Test
+    public void testRecovery() throws IOException {
+        FileStore store = new FileStore(directory, 1);
+        store.flush(); // first 1kB
+
+        SegmentNodeState base = store.getHead();
+        SegmentNodeBuilder builder = base.builder();
+        builder.setProperty("step", "a");
+        store.setHead(base, builder.getNodeState());
+        store.flush(); // second 1kB
+
+        base = store.getHead();
+        builder = base.builder();
+        builder.setProperty("step", "b");
+        store.setHead(base, builder.getNodeState());
+        store.close(); // third 1kB
+
+        store = new FileStore(directory, 1);
+        assertEquals("b", store.getHead().getString("step"));
+        store.close();
+
+        RandomAccessFile file = new RandomAccessFile(
+                new File(directory, "data00000a.tar"), "rw");
+        file.seek(2048);
+        file.write(new byte[1024], 0, 1024);
+        file.close();
+
+        store = new FileStore(directory, 1);
+        assertEquals("a", store.getHead().getString("step"));
+        store.close();
+
+        file = new RandomAccessFile(
+                new File(directory, "data00000a.tar"), "rw");
+        file.seek(1024);
+        file.write(new byte[1024], 0, 1024);
+        file.close();
+
+        store = new FileStore(directory, 1);
+        assertFalse(store.getHead().hasProperty("step"));
+        store.close();
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java?rev=1583352&r1=1583351&r2=1583352&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
 Mon Mar 31 15:37:39 2014
@@ -51,12 +51,14 @@ public class TarFileTest {
     @Test
     public void testWriteAndRead() throws IOException {
         UUID id = UUID.randomUUID();
+        long msb = id.getMostSignificantBits();
+        long lsb = id.getLeastSignificantBits();
         byte[] data = "Hello, World!".getBytes(UTF_8);
 
         TarFile tar = new TarFile(file, 10240, false);
         try {
             tar.writeEntry(id, data, 0, data.length);
-            assertEquals(ByteBuffer.wrap(data), tar.readEntry(id));
+            assertEquals(ByteBuffer.wrap(data), tar.readEntry(msb, lsb));
         } finally {
             tar.close();
         }
@@ -65,7 +67,7 @@ public class TarFileTest {
 
         tar = new TarFile(file, 10240, false);
         try {
-            assertEquals(ByteBuffer.wrap(data), tar.readEntry(id));
+            assertEquals(ByteBuffer.wrap(data), tar.readEntry(msb, lsb));
         } finally {
             tar.close();
         }


Reply via email to