Author: jukka
Date: Wed Dec 4 03:55:56 2013
New Revision: 1547700
URL: http://svn.apache.org/r1547700
Log:
OAK-593: Segment-based MK
Use a background thread to flush the TarMK files once every five seconds
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
Wed Dec 4 03:55:56 2013
@@ -20,8 +20,6 @@ import static com.google.common.base.Obj
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static java.util.Collections.emptyList;
import static
org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory.isDataSegmentId;
import static
org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
@@ -102,13 +100,17 @@ public class Segment {
}
};
+ private static final UUID[] NO_REFS = new UUID[0];
+
private final SegmentStore store;
private final UUID uuid;
+ private final UUID[] refids;
+
private final ByteBuffer data;
- private final int refposition;
+ private final boolean current;
public Segment(SegmentStore store, UUID uuid, ByteBuffer data) {
this.store = checkNotNull(store);
@@ -117,10 +119,28 @@ public class Segment {
int refpos = data.position();
if (isDataSegmentId(uuid)) {
+ int refs = data.get(refpos) & 0xff;
int roots = data.getShort(refpos + 1) & 0xffff;
refpos += align(3 + roots * 3);
+ refids = new UUID[refs];
+ for (int i = 0; i < refs; i++) {
+ refids[i] = new UUID(
+ data.getLong(refpos + i * 16),
+ data.getLong(refpos + i * 16 + 8));
+ }
+ } else {
+ refids = NO_REFS;
}
- this.refposition = refpos;
+
+ this.current = false;
+ }
+
+ Segment(SegmentStore store, UUID uuid, UUID[] refids, ByteBuffer data) {
+ this.store = checkNotNull(store);
+ this.uuid = checkNotNull(uuid);
+ this.refids = checkNotNull(refids);
+ this.data = checkNotNull(data);
+ this.current = true;
}
/**
@@ -154,18 +174,7 @@ public class Segment {
}
public List<UUID> getReferencedIds() {
- if (isDataSegmentId(uuid)) {
- int refcount = data.get(data.position()) & 0xff;
- List<UUID> refs = newArrayListWithCapacity(refcount);
- for (int i = 0; i < refcount; i++) {
- refs.add(new UUID(
- data.getLong(refposition + i * 16),
- data.getLong(refposition + i * 16 + 8)));
- }
- return refs;
- } else {
- return emptyList();
- }
+ return Arrays.asList(refids);
}
public int size() {
@@ -226,8 +235,7 @@ public class Segment {
UUID refid;
int refpos = data.get(pos) & 0xff;
if (refpos != 0xff) {
- refpos = refposition + refpos * 16;
- refid = new UUID(data.getLong(refpos), data.getLong(refpos + 8));
+ refid = refids[refpos];
} else {
refid = uuid;
}
@@ -412,27 +420,26 @@ public class Segment {
StringWriter string = new StringWriter();
PrintWriter writer = new PrintWriter(string);
- int pos = refposition;
- int refcount = data.get(data.position()) & 0xff;
- int rootcount = data.getShort(data.position() + 1) &0xffff;
- int length =
- data.capacity() - (align(3 + rootcount * 3) + refcount * 16);
+ int rootcount = 0;
+ int length = data.remaining();
+ if (!current) {
+ rootcount = data.getShort(data.position() + 1) &0xffff;
+ length -= (align(3 + rootcount * 3) + refids.length * 16);
+ }
writer.format(
"Segment %s (%d bytes, %d ref%s, %d root%s)%n",
uuid, length,
- refcount, (refcount != 1 ? "s" : ""),
+ refids.length, (refids.length != 1 ? "s" : ""),
rootcount, (rootcount != 1 ? "s" : ""));
writer.println("--------------------------------------------------------------------------");
- if (refcount > 0) {
- for (int i = 0; i < refcount; i++) {
- UUID id = new UUID(data.getLong(pos), data.getLong(pos + 8));
- writer.format("reference %02x: %s%n", i, id);
- pos += 16;
+ if (refids.length > 0) {
+ for (int i = 0; i < refids.length; i++) {
+ writer.format("reference %02x: %s%n", i, refids[i]);
}
writer.println("--------------------------------------------------------------------------");
}
- pos = data.limit() - ((length + 15) & ~15);
+ int pos = data.limit() - ((length + 15) & ~15);
while (pos < data.limit()) {
writer.format("%04x: ", (MAX_SEGMENT_SIZE - data.limit() + pos) >>
RECORD_ALIGN_BITS);
for (int i = 0; i < 16; i++) {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Wed Dec 4 03:55:56 2013
@@ -116,7 +116,7 @@ public class SegmentWriter {
* The segment write buffer, filled from the end to the beginning
* (see OAK-629).
*/
- private final byte[] buffer = new byte[MAX_SEGMENT_SIZE];
+ private byte[] buffer = new byte[MAX_SEGMENT_SIZE];
/**
* The number of bytes already written (or allocated). Counted from
@@ -168,12 +168,12 @@ public class SegmentWriter {
public synchronized Segment getCurrentSegment(UUID id) {
if (equal(id, uuid)) {
if (currentSegment == null) {
- int header = align(3 + roots.size() * 3) + 16 * refids.size();
- ByteBuffer b = ByteBuffer.allocate(header + length);
- writeSegmentHeader(b);
- b.put(buffer, buffer.length - length, length);
- b.rewind();
- currentSegment = new Segment(store, uuid, b);
+ ByteBuffer b = ByteBuffer.wrap(buffer);
+ b.position(buffer.length - length);
+ currentSegment = new Segment(
+ store, uuid,
+ refids.keySet().toArray(new UUID[refids.size()]),
+ b);
}
return currentSegment;
} else {
@@ -196,6 +196,7 @@ public class SegmentWriter {
store.writeSegment(uuid, buffer, buffer.length - length, length);
uuid = newDataSegmentId();
+ buffer = new byte[MAX_SEGMENT_SIZE];
refids.clear();
roots.clear();
length = 0;
@@ -221,10 +222,7 @@ public class SegmentWriter {
}
}
int refCount = refids.size() + segmentIds.size();
-
- Set<RecordId> rootIds = newHashSet(roots.keySet());
- rootIds.removeAll(ids);
- int rootCount = rootIds.size() + 1;
+ int rootCount = roots.size() + 1;
int recordSize = Segment.align(size + ids.size() *
Segment.RECORD_ID_BYTES);
int headerSize = Segment.align(3 + rootCount * 3);
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Wed Dec 4 03:55:56 2013
@@ -16,11 +16,11 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.file;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.newLinkedList;
-import static com.google.common.collect.Maps.newConcurrentMap;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static
org.apache.jackrabbit.oak.plugins.segment.SegmentIdFactory.isBulkSegmentId;
@@ -30,7 +30,6 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import javax.annotation.Nonnull;
@@ -63,7 +62,13 @@ public class FileStore extends AbstractS
private final RandomAccessFile journalFile;
- private final Map<String, RecordId> journals = newConcurrentMap();
+ private volatile RecordId head;
+
+ private volatile boolean updated = false;
+
+ private volatile boolean alive = true;
+
+ private final Thread flushThread;
public FileStore(File directory, int maxFileSizeMB, boolean memoryMapping)
throws IOException {
@@ -103,46 +108,64 @@ public class FileStore extends AbstractS
}
}
+ head = null;
journalFile = new RandomAccessFile(
new File(directory, JOURNAL_FILE_NAME), "rw");
String line = journalFile.readLine();
while (line != null) {
int space = line.indexOf(' ');
if (space != -1) {
- String name = line.substring(space + 1);
- RecordId id = RecordId.fromString(line.substring(0, space));
- journals.put(name, id);
+ head = RecordId.fromString(line.substring(0, space));
}
line = journalFile.readLine();
}
- if (!journals.containsKey("root")) {
+ if (head == null) {
NodeBuilder builder = EMPTY_NODE.builder();
builder.setChildNode("root", initial);
SegmentNodeState root =
getWriter().writeNode(builder.getNodeState());
- journals.put("root", root.getRecordId());
- journalFile.writeBytes(root.getRecordId() + " root\n");
+ head = root.getRecordId();
+ updated = true;
}
- }
- RecordId getHead(String name) {
- return journals.get(name);
+ this.flushThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ synchronized (flushThread) {
+ flushThread.wait(1000);
+ while (alive) {
+ flush();
+ flushThread.wait(5000);
+ }
+ }
+ } catch (InterruptedException e) {
+ // stop flushing
+ }
+ }
+ });
+ flushThread.setName("TarMK flush thread " + directory);
+ flushThread.setDaemon(true);
+ flushThread.setPriority(Thread.MIN_PRIORITY);
+ flushThread.start();
}
- synchronized boolean setHead(String name, RecordId base, RecordId head) {
- if (base.equals(journals.get(name))) {
- getWriter().flush();
+ private synchronized void flush() {
+ if (updated) {
try {
- journalFile.writeBytes(head + " " + name + "\n");
- journals.put(name, head);
- return true;
+ getWriter().flush();
+ for (TarFile file : bulkFiles) {
+ file.flush();
+ }
+ for (TarFile file : dataFiles) {
+ file.flush();
+ }
+ journalFile.writeBytes(head + " root\n");
+ journalFile.getChannel().force(false);
} catch (IOException e) {
- throw new IllegalStateException(
- "Failed to update journal " + name, e);
+ e.printStackTrace(); // FIXME
}
- } else {
- return false;
}
}
@@ -162,6 +185,13 @@ public class FileStore extends AbstractS
try {
super.close();
+ alive = false;
+ synchronized (flushThread) {
+ flushThread.notify();
+ }
+ flushThread.join();
+ flush();
+
journalFile.close();
for (TarFile file : bulkFiles) {
@@ -181,12 +211,29 @@ public class FileStore extends AbstractS
@Override
public Journal getJournal(String name) {
- synchronized (journals) {
- if (journals.containsKey(name)) {
- journals.put(name, journals.get("root"));
+ checkArgument("root".equals(name)); // only root supported for now
+ return new Journal() {
+ @Override
+ public RecordId getHead() {
+ return head;
}
- }
- return new FileJournal(this, name);
+ @Override
+ public boolean setHead(RecordId base, RecordId head) {
+ synchronized (FileStore.this) {
+ if (base.equals(FileStore.this.head)) {
+ FileStore.this.head = head;
+ updated = true;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+ @Override
+ public void merge() {
+ throw new UnsupportedOperationException();
+ }
+ };
}
@Override @Nonnull
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
Wed Dec 4 03:55:56 2013
@@ -28,12 +28,15 @@ class MappedAccess implements FileAccess
private final MappedByteBuffer buffer;
+ private boolean updated = false;
+
MappedAccess(File file, int length) throws IOException {
RandomAccessFile f = new RandomAccessFile(file, "rw");
try {
long l = f.length();
if (l == 0) { // it's a new file
l = length;
+ updated = true;
}
buffer = f.getChannel().map(READ_WRITE, 0, l);
} finally {
@@ -55,16 +58,21 @@ class MappedAccess implements FileAccess
}
@Override
- public void write(int position, byte[] b, int offset, int length)
+ public synchronized void write(
+ int position, byte[] b, int offset, int length)
throws IOException {
ByteBuffer entry = buffer.duplicate();
entry.position(position);
entry.put(b, offset, length);
+ updated = true;
}
@Override
- public void flush() {
- buffer.force();
+ public synchronized void flush() {
+ if (updated) {
+ buffer.force();
+ updated = false;
+ }
}
@Override
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
Wed Dec 4 03:55:56 2013
@@ -179,8 +179,13 @@ class TarFile {
return true;
}
- void close() throws IOException {
+ public void flush() throws IOException {
access.flush();
+ }
+
+
+ void close() throws IOException {
+ flush();
access.close();
}
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java?rev=1547700&r1=1547699&r2=1547700&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentSizeTest.java
Wed Dec 4 03:55:56 2013
@@ -154,6 +154,7 @@ public class SegmentSizeTest {
}
SegmentNodeState state = writer.writeNode(builder.getNodeState());
+ writer.flush();
Segment segment =
store.readSegment(state.getRecordId().getSegmentId());
assertEquals(26752, Segment.WEIGHER.weigh(null, segment));
@@ -162,6 +163,7 @@ public class SegmentSizeTest {
builder = state.builder();
builder.child("child1000");
state = writer.writeNode(builder.getNodeState());
+ writer.flush();
segment = store.readSegment(state.getRecordId().getSegmentId());
assertEquals(136, Segment.WEIGHER.weigh(null, segment));
}
@@ -170,6 +172,7 @@ public class SegmentSizeTest {
SegmentStore store = new MemoryStore();
SegmentWriter writer = store.getWriter();
RecordId id = writer.writeNode(builder.getNodeState()).getRecordId();
+ writer.flush();
Segment segment = store.readSegment(id.getSegmentId());
return Segment.WEIGHER.weigh(null, segment);
}
@@ -179,6 +182,7 @@ public class SegmentSizeTest {
SegmentWriter writer = store.getWriter();
NodeState state = builder.getNodeState();
RecordId id = writer.writeNode(state).getRecordId();
+ writer.flush();
Segment segment = store.readSegment(id.getSegmentId());
int base = Segment.WEIGHER.weigh(null, segment);
@@ -186,6 +190,7 @@ public class SegmentSizeTest {
writer = store.getWriter();
writer.writeNode(state);
id = writer.writeNode(state).getRecordId();
+ writer.flush();
segment = store.readSegment(id.getSegmentId());
return Segment.WEIGHER.weigh(null, segment) - base - 4;
}