Author: jukka
Date: Mon Mar 31 15:37:39 2014
New Revision: 1583352
URL: http://svn.apache.org/r1583352
Log:
OAK-1634: After crash, segment persistence is broken with failures in java.nio
classes (with v0.19)
Add more sanity checks and a mechanism for automatically rewinding to a
previously committed revision if the latest one can't be accessed.
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1583352&r1=1583351&r2=1583352&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Mon Mar 31 15:37:39 2014
@@ -20,6 +20,7 @@ import static com.google.common.base.Pre
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.newCopyOnWriteArrayList;
+import static com.google.common.collect.Lists.newLinkedList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
@@ -27,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -165,16 +167,26 @@ public class FileStore implements Segmen
journalFile = new RandomAccessFile(
new File(directory, JOURNAL_FILE_NAME), "rw");
- RecordId id = null;
+ LinkedList<RecordId> heads = newLinkedList();
String line = journalFile.readLine();
while (line != null) {
int space = line.indexOf(' ');
if (space != -1) {
- id = RecordId.fromString(tracker, line.substring(0, space));
+ heads.add(RecordId.fromString(tracker, line.substring(0,
space)));
}
line = journalFile.readLine();
}
+ RecordId id = null;
+ while (id == null && !heads.isEmpty()) {
+ RecordId last = heads.removeLast();
+ if (containsSegment(last.getSegmentId(), dataFiles)) {
+ id = last;
+ } else {
+ log.warn("Unable to committed revision {}, rewinding...",
last);
+ }
+ }
+
if (id != null) {
head = new AtomicReference<RecordId>(id);
persistedHead = new AtomicReference<RecordId>(id);
@@ -333,12 +345,11 @@ public class FileStore implements Segmen
}
private boolean containsSegment(SegmentId id, List<TarFile> files) {
- UUID uuid = new UUID(
- id.getMostSignificantBits(),
- id.getLeastSignificantBits());
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
for (TarFile file : files) {
try {
- ByteBuffer buffer = file.readEntry(uuid);
+ ByteBuffer buffer = file.readEntry(msb, lsb);
if (buffer != null) {
return true;
}
@@ -351,12 +362,11 @@ public class FileStore implements Segmen
}
private Segment loadSegment(SegmentId id, List<TarFile> files) {
- UUID uuid = new UUID(
- id.getMostSignificantBits(),
- id.getLeastSignificantBits());
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
for (TarFile file : files) {
try {
- ByteBuffer buffer = file.readEntry(uuid);
+ ByteBuffer buffer = file.readEntry(msb, lsb);
if (buffer != null) {
return new Segment(tracker, id, buffer);
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java?rev=1583352&r1=1583351&r2=1583352&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
Mon Mar 31 15:37:39 2014
@@ -30,9 +30,21 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.CRC32;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class TarFile {
+ /** Logger instance */
+ private static final Logger log = LoggerFactory.getLogger(TarFile.class);
+
+ /** Magic byte sequence at the end of the index block. */
+ private static final int INDEX_MAGIC =
+ '\n' << 24 + '0' << 16 + 'K' << 8 + '\n';
+
/** The tar file block size. */
private static final int BLOCK_SIZE = 512;
@@ -44,17 +56,17 @@ class TarFile {
private int position = 0;
- private final int maxFileSize;
-
private final byte[] indexEntryName;
- private final Map<UUID, TarEntry> entries = newConcurrentMap();
+ private final ByteBuffer index;
+
+ private final ConcurrentMap<UUID, TarEntry> entries;
TarFile(File file, int maxFileSize, boolean memoryMapping)
throws IOException {
long len = file.length();
checkState(len <= Integer.MAX_VALUE);
- this.maxFileSize = Math.max((int) len, maxFileSize);
+ maxFileSize = Math.max((int) len, maxFileSize);
checkState(maxFileSize % BLOCK_SIZE == 0);
checkState(maxFileSize > 5 * BLOCK_SIZE);
this.indexEntryName = (file.getName() + ".idx").getBytes(UTF_8);
@@ -62,7 +74,7 @@ class TarFile {
this.file = file;
RandomAccessFile f = new RandomAccessFile(file, "rw");
if (memoryMapping) {
- this.access = new MappedAccess(f, this.maxFileSize);
+ this.access = new MappedAccess(f, maxFileSize);
} else {
this.access = new RandomAccess(f);
}
@@ -71,10 +83,10 @@ class TarFile {
if (len == 0) {
// allocate the full file by writing one big index entry
writeEntryHeader(indexEntryName, maxFileSize - 3 * BLOCK_SIZE);
- // zero-out the last four bytes to indicate an empty index
+ // zero-out the last 16 bytes to indicate an empty index
access.write(
- maxFileSize - (ZERO_BYTES.length * 2 + 4),
- ZERO_BYTES, 0, 4);
+ maxFileSize - (ZERO_BYTES.length * 2 + 16),
+ ZERO_BYTES, 0, 16);
// tar format expects the last two blocks to be zero
access.write(
maxFileSize - ZERO_BYTES.length * 2,
@@ -82,8 +94,16 @@ class TarFile {
access.write(
maxFileSize - ZERO_BYTES.length,
ZERO_BYTES, 0, ZERO_BYTES.length);
+
+ this.index = null;
+ this.entries = newConcurrentMap();
} else {
- readIndex(len);
+ this.index = loadAndValidateIndex();
+ if (index == null) {
+ this.entries = loadEntryMap();
+ } else {
+ this.entries = null;
+ }
}
}
@@ -91,8 +111,8 @@ class TarFile {
return entries.keySet();
}
- ByteBuffer readEntry(UUID uuid) throws IOException {
- TarEntry entry = entries.get(uuid);
+ ByteBuffer readEntry(long msb, long lsb) throws IOException {
+ TarEntry entry = indexLookup(msb, lsb);
if (entry != null) {
return access.read(entry.offset(), entry.size());
} else {
@@ -106,15 +126,22 @@ class TarFile {
synchronized boolean writeEntry(
UUID uuid, byte[] b, int offset, int size) throws IOException {
- int indexSize = entries.size() * 24 + 4;
+ if (entries == null) {
+ return false;
+ }
+
+ int length = access.length();
+ int indexSize = entries.size() * 24 + 16;
if (position
- + getEntrySize(size) // this entry
- + getEntrySize(indexSize) // index entry
- + 2 * BLOCK_SIZE // two zero blocks at the end
- > maxFileSize) {
- writeEntryHeader(
- indexEntryName, maxFileSize - 3 * BLOCK_SIZE - position);
+ + getEntrySize(size) // this entry
+ + getEntrySize(indexSize + 24) // index with one extra entry
+ + 2 * BLOCK_SIZE // two zero blocks at the end
+ > length) {
+ int bytes = length - position - 3 * BLOCK_SIZE;
+ writeEntryHeader(indexEntryName, bytes);
+
ByteBuffer index = ByteBuffer.allocate(indexSize);
+
SortedMap<UUID, TarEntry> sorted = newTreeMap();
sorted.putAll(entries);
for (Map.Entry<UUID, TarEntry> entry : sorted.entrySet()) {
@@ -123,26 +150,25 @@ class TarFile {
index.putInt(entry.getValue().offset());
index.putInt(entry.getValue().size());
}
- index.putInt(sorted.size());
+
+ CRC32 checksum = new CRC32();
+ checksum.update(index.array(), 0, index.position());
+ index.putInt((int) checksum.getValue());
+ index.putInt(entries.size());
+ index.putInt(bytes);
+ index.putInt(INDEX_MAGIC);
+
access.write(
- maxFileSize - 2 * BLOCK_SIZE - indexSize,
+ length - 2 * BLOCK_SIZE - indexSize,
index.array(), 0, indexSize);
- position = maxFileSize - 2 * BLOCK_SIZE;
+ position = length - 2 * BLOCK_SIZE;
return false;
}
writeEntryHeader(uuid.toString().getBytes(UTF_8), size);
- position += BLOCK_SIZE;
-
- access.write(position, b, offset, size);
- entries.put(uuid, new TarEntry(position, size));
- position += size;
-
- int padding = BLOCK_SIZE - position % BLOCK_SIZE;
- if (padding < BLOCK_SIZE) {
- access.write(position, ZERO_BYTES, 0, padding);
- position += padding;
- }
+ access.write(position + BLOCK_SIZE, b, offset, size);
+ entries.put(uuid, new TarEntry(position + BLOCK_SIZE, size));
+ position += getEntrySize(size);
return true;
}
@@ -224,20 +250,161 @@ class TarFile {
access.close();
}
- private void readIndex(long len) throws IOException {
- while (position + BLOCK_SIZE <= len) {
+ //-----------------------------------------------------------< private >--
+
+ private TarEntry indexLookup(long msb, long lsb) {
+ if (entries != null) {
+ return entries.get(new UUID(msb, lsb));
+ }
+
+ // The segment identifiers are randomly generated with uniform
+ // distribution, so we can use interpolation search to find the
+ // matching entry in the index. The average runtime is O(log log n).
+
+ int lowIndex = 0;
+ int highIndex = index.remaining() / 24 - 1;
+ float lowValue = Long.MIN_VALUE;
+ float highValue = Long.MAX_VALUE;
+ float targetValue = msb;
+
+ while (lowIndex <= highIndex) {
+ int guessIndex = lowIndex + Math.round(
+ (highIndex - lowIndex)
+ * (targetValue - lowValue)
+ / (highValue - lowValue));
+ int position = index.position() + guessIndex * 24;
+ long guess = index.getLong(position);
+ if (msb < guess) {
+ highIndex = guessIndex - 1;
+ highValue = guess;
+ } else if (msb > guess) {
+ lowIndex = guessIndex + 1;
+ lowValue = guess;
+ } else {
+ // getting close...
+ guess = index.getLong(position + 8);
+ if (lsb < guess) {
+ highIndex = guessIndex - 1;
+ highValue = guess;
+ } else if (lsb > guess) {
+ lowIndex = guessIndex + 1;
+ lowValue = guess;
+ } else {
+ // found it!
+ return new TarEntry(
+ index.getInt(position + 16),
+ index.getInt(position + 20));
+ }
+ }
+ }
+
+ // not found
+ return null;
+ }
+
+ /**
+ * Tries to read an existing index from the tar file. The index is
+ * returned if it is found and looks valid (correct checksum, passes
+ * sanity checks).
+ *
+ * @return tar index, or {@code null} if not found or not valid
+ * @throws IOException if the tar file could not be read
+ */
+ private ByteBuffer loadAndValidateIndex() throws IOException {
+ int length = access.length();
+ if (length % BLOCK_SIZE != 0 || length < 6 * BLOCK_SIZE) {
+ log.warn("Unexpected size {} of tar file {}", length, file);
+ return null; // unexpected file size
+ }
+
+ // read the index metadata just before the two final zero blocks
+ ByteBuffer meta = access.read(length - 2 * BLOCK_SIZE - 16, 16);
+ int crc32 = meta.getInt();
+ int count = meta.getInt();
+ int bytes = meta.getInt();
+ int magic = meta.getInt();
+
+ if (magic != INDEX_MAGIC) {
+ // no warning here, as the index has probably not yet been written
+ return null; // magic byte mismatch
+ }
+
+ if (count < 1 || bytes < count * 24 + 16 || bytes % BLOCK_SIZE != 0) {
+ log.warn("Invalid index footer in tar file {}", file);
+ return null; // impossible entry and/or byte counts
+ }
+
+ int position = length - 2 * BLOCK_SIZE - 16 - count * 24;
+ ByteBuffer index = access.read(position, count * 24);
+ index.mark();
+
+ CRC32 checksum = new CRC32();
+ int limit = length - 2 * BLOCK_SIZE - bytes - BLOCK_SIZE;
+ long lastmsb = Long.MIN_VALUE;
+ long lastlsb = Long.MIN_VALUE;
+ byte[] buffer = new byte[24];
+ for (int i = 0; i < count; i++) {
+ index.get(buffer);
+ checksum.update(buffer);
+
+ ByteBuffer entry = ByteBuffer.wrap(buffer);
+ long msb = entry.getLong();
+ long lsb = entry.getLong();
+ int offset = entry.getInt();
+ int size = entry.getInt();
+
+ if (lastmsb > msb || (lastmsb == msb && lastlsb > lsb)) {
+ log.warn("Incorrect index ordering in tar file {}", file);
+ return null;
+ } else if (lastmsb == msb && lastlsb == lsb && i > 0) {
+ log.warn("Duplicate entry in the index of tar file {}", file);
+ return null;
+ } else if (offset < 0 || offset % BLOCK_SIZE != 0) {
+ log.warn("Invalid entry offset in the index of tar file {}",
file);
+ return null;
+ } else if (size < 1 || offset + size > limit) {
+ log.warn("Invalid entry size in the index of tar file {}",
file);
+ return null;
+ }
+
+ lastmsb = msb;
+ lastlsb = lsb;
+ }
+
+ if (crc32 != (int) checksum.getValue()) {
+ log.warn("Invalid index checksum in tar file {}", file);
+ return null; // checksum mismatch
+ }
+
+ index.reset();
+ return index;
+ }
+
+ /**
+ * Scans through the tar file, looking for all segment entries. Used on
+ * tar files that don't already contain an index.
+ *
+ * @return map of all segment entries in this tar file
+ * @throws IOException if the tar file could not be read
+ */
+ private ConcurrentMap<UUID, TarEntry> loadEntryMap() throws IOException {
+ ConcurrentMap<UUID, TarEntry> entries = newConcurrentMap();
+
+ int limit = access.length() - 2 * BLOCK_SIZE;
+ while (position + 2 * BLOCK_SIZE <= limit) {
// read the tar header block
- ByteBuffer buffer = this.access.read(position, BLOCK_SIZE);
- String name = readString(buffer, 100);
- buffer.position(124);
- int size = readNumber(buffer, 12);
+ ByteBuffer header = access.read(position, BLOCK_SIZE);
+ String name = readString(header, 100);
+ header.position(124);
+ int size = readNumber(header, 12);
+
// TODO: verify the checksum, magic, etc.?
if (name.isEmpty() && size == 0) {
break; // no more entries in this file
} else if (Arrays.equals(name.getBytes(UTF_8), indexEntryName)) {
break; // index entry encountered, so stop here
- } else if (position + BLOCK_SIZE + size > len) {
+ } else if (position + BLOCK_SIZE + size > limit) {
break; // invalid entry, truncate the file at this point
}
@@ -245,11 +412,13 @@ class TarFile {
UUID id = UUID.fromString(name);
entries.put(id, new TarEntry(position + BLOCK_SIZE, size));
} catch (IllegalArgumentException e) {
- throw new IOException("Unexpected tar entry: " + name);
+ break; // unexpected entry, truncate the file at this point
}
- position += (1 + (size + BLOCK_SIZE - 1) / BLOCK_SIZE) *
BLOCK_SIZE;
+ position += getEntrySize(size);
}
+
+ return entries;
}
private static String readString(ByteBuffer buffer, int fieldSize) {
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java?rev=1583352&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
Mon Mar 31 15:37:39 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.file;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStoreTest {
+
+ private File directory;
+
+ @Before
+ public void setUp() throws IOException {
+ directory = File.createTempFile(
+ "FileStoreTest", "dir", new File("target"));
+ directory.delete();
+ directory.mkdir();
+ }
+
+ @Test
+ public void testRecovery() throws IOException {
+ FileStore store = new FileStore(directory, 1);
+ store.flush(); // first 1kB
+
+ SegmentNodeState base = store.getHead();
+ SegmentNodeBuilder builder = base.builder();
+ builder.setProperty("step", "a");
+ store.setHead(base, builder.getNodeState());
+ store.flush(); // second 1kB
+
+ base = store.getHead();
+ builder = base.builder();
+ builder.setProperty("step", "b");
+ store.setHead(base, builder.getNodeState());
+ store.close(); // third 1kB
+
+ store = new FileStore(directory, 1);
+ assertEquals("b", store.getHead().getString("step"));
+ store.close();
+
+ RandomAccessFile file = new RandomAccessFile(
+ new File(directory, "data00000a.tar"), "rw");
+ file.seek(2048);
+ file.write(new byte[1024], 0, 1024);
+ file.close();
+
+ store = new FileStore(directory, 1);
+ assertEquals("a", store.getHead().getString("step"));
+ store.close();
+
+ file = new RandomAccessFile(
+ new File(directory, "data00000a.tar"), "rw");
+ file.seek(1024);
+ file.write(new byte[1024], 0, 1024);
+ file.close();
+
+ store = new FileStore(directory, 1);
+ assertFalse(store.getHead().hasProperty("step"));
+ store.close();
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java?rev=1583352&r1=1583351&r2=1583352&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
Mon Mar 31 15:37:39 2014
@@ -51,12 +51,14 @@ public class TarFileTest {
@Test
public void testWriteAndRead() throws IOException {
UUID id = UUID.randomUUID();
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
byte[] data = "Hello, World!".getBytes(UTF_8);
TarFile tar = new TarFile(file, 10240, false);
try {
tar.writeEntry(id, data, 0, data.length);
- assertEquals(ByteBuffer.wrap(data), tar.readEntry(id));
+ assertEquals(ByteBuffer.wrap(data), tar.readEntry(msb, lsb));
} finally {
tar.close();
}
@@ -65,7 +67,7 @@ public class TarFileTest {
tar = new TarFile(file, 10240, false);
try {
- assertEquals(ByteBuffer.wrap(data), tar.readEntry(id));
+ assertEquals(ByteBuffer.wrap(data), tar.readEntry(msb, lsb));
} finally {
tar.close();
}