Author: jukka
Date: Mon Jul 7 19:07:44 2014
New Revision: 1608564
URL: http://svn.apache.org/r1608564
Log:
1.0.2: Merged revisions 1607077, 1607185, 1607196, 1607526, 1607664 and 1608560
(OAK-1927, OAK-1932)
Added:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
- copied, changed from r1607185,
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java
- copied unchanged from r1607664,
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java
Modified:
jackrabbit/oak/branches/1.0/ (props changed)
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
Merged /jackrabbit/oak/trunk:r1607077,1607185,1607196,1607526,1607664,1608560
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
Mon Jul 7 19:07:44 2014
@@ -64,7 +64,7 @@ import java.util.UUID;
* average, the amortized size of each entry in this mapping is about
* {@code 20/n + 8} bytes, assuming compressed pointers.
*/
-class CompactionMap {
+public class CompactionMap {
private final int compressInterval;
private final Map<RecordId, RecordId> recent = newHashMap();
@@ -93,6 +93,19 @@ class CompactionMap {
return after.equals(get(before));
}
+ /**
+ * Checks whether content in the segment with the given identifier was
+ * compacted to new segments.
+ *
+ * @param id segment identifier
+ * @return whether the identified segment was compacted
+ */
+ boolean wasCompacted(SegmentId id) {
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
+ return findEntry(msb, lsb) != -1;
+ }
+
public RecordId get(RecordId before) {
RecordId after = recent.get(before);
if (after != null) {
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
Mon Jul 7 19:07:44 2014
@@ -20,7 +20,6 @@ import static com.google.common.collect.
import static com.google.common.collect.Maps.newHashMap;
import static org.apache.jackrabbit.oak.api.Type.BINARIES;
import static org.apache.jackrabbit.oak.api.Type.BINARY;
-import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import java.io.IOException;
import java.io.InputStream;
@@ -52,33 +51,6 @@ public class Compactor {
/** Logger instance */
private static final Logger log = LoggerFactory.getLogger(Compactor.class);
- public static CompactionMap compact(SegmentStore store) {
- SegmentWriter writer = store.getTracker().getWriter();
- Compactor compactor = new Compactor(writer);
-
- log.debug("TarMK compaction");
-
- SegmentNodeBuilder builder = writer.writeNode(EMPTY_NODE).builder();
- SegmentNodeState before = store.getHead();
- EmptyNodeState.compareAgainstEmptyState(
- before, compactor.newCompactDiff(builder));
-
- SegmentNodeState after = builder.getNodeState();
- while (!store.setHead(before, after)) {
- // Some other concurrent changes have been made.
- // Rebase (and compact) those changes on top of the
- // compacted state before retrying to set the head.
- SegmentNodeState head = store.getHead();
- head.compareAgainstBaseState(
- before, compactor.newCompactDiff(builder));
- before = head;
- after = builder.getNodeState();
- }
-
- compactor.map.compress();
- return compactor.map;
- }
-
/**
* Locks down the RecordId persistence structure
*/
@@ -98,12 +70,20 @@ public class Compactor {
*/
private final Map<String, List<RecordId>> binaries = newHashMap();
- private Compactor(SegmentWriter writer) {
+ public Compactor(SegmentWriter writer) {
this.writer = writer;
}
- private CompactDiff newCompactDiff(NodeBuilder builder) {
- return new CompactDiff(builder);
+ public SegmentNodeState compact(NodeState before, NodeState after) {
+ SegmentNodeBuilder builder = new SegmentNodeBuilder(
+ writer.writeNode(before), writer);
+ after.compareAgainstBaseState(before, new CompactDiff(builder));
+ return builder.getNodeState();
+ }
+
+ public CompactionMap getCompactionMap() {
+ map.compress();
+ return map;
}
private class CompactDiff extends ApplyDiff {
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
Mon Jul 7 19:07:44 2014
@@ -65,6 +65,15 @@ class Record {
}
/**
+ * Returns the tracker of the segment that contains this record.
+ *
+ * @return segment tracker
+ */
+ protected SegmentTracker getTracker() {
+ return segmentId.getTracker();
+ }
+
+ /**
* Returns the segment that contains this record.
*
* @return segment that contains this record
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java
Mon Jul 7 19:07:44 2014
@@ -33,8 +33,12 @@ public class SegmentNodeBuilder extends
private long updateCount = 0;
SegmentNodeBuilder(SegmentNodeState base) {
+ this(base, base.getTracker().getWriter());
+ }
+
+ SegmentNodeBuilder(SegmentNodeState base, SegmentWriter writer) {
super(base);
- this.writer =
base.getRecordId().getSegmentId().getTracker().getWriter();
+ this.writer = writer;
}
//-------------------------------------------------< MemoryNodeBuilder >--
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Mon Jul 7 19:07:44 2014
@@ -923,10 +923,22 @@ public class SegmentWriter {
return id;
}
+ private SegmentNodeState uncompact(SegmentNodeState state) {
+ RecordId id = tracker.getCompactionMap().get(state.getRecordId());
+ if (id != null) {
+ return new SegmentNodeState(id);
+ } else {
+ return state;
+ }
+ }
+
public SegmentNodeState writeNode(NodeState state) {
- if (state instanceof SegmentNodeState
- && store.containsSegment(((SegmentNodeState)
state).getRecordId().getSegmentId())) {
- return (SegmentNodeState) state;
+ if (state instanceof SegmentNodeState) {
+ SegmentNodeState sns = uncompact((SegmentNodeState) state);
+ if (sns != state || store.containsSegment(
+ sns.getRecordId().getSegmentId())) {
+ return sns;
+ }
}
SegmentNodeState before = null;
@@ -935,10 +947,13 @@ public class SegmentWriter {
if (state instanceof ModifiedNodeState) {
after = (ModifiedNodeState) state;
NodeState base = after.getBaseState();
- if (base instanceof SegmentNodeState
- && store.containsSegment(((SegmentNodeState)
base).getRecordId().getSegmentId())) {
- before = (SegmentNodeState) base;
- beforeTemplate = before.getTemplate();
+ if (base instanceof SegmentNodeState) {
+ SegmentNodeState sns = uncompact((SegmentNodeState) base);
+ if (sns != base || store.containsSegment(
+ sns.getRecordId().getSegmentId())) {
+ before = sns;
+ beforeTemplate = before.getTemplate();
+ }
}
}
Copied:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
(from r1607185,
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java)
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java?p2=jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java&r1=1607185&r2=1608564&rev=1608564&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
Mon Jul 7 19:07:44 2014
@@ -16,9 +16,9 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.file;
-import static java.lang.System.nanoTime;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.lang.System.currentTimeMillis;
+
+import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,9 +33,11 @@ class BackgroundThread extends Thread {
private final long interval;
- private long backlog = 0;
+ private boolean alive = true;
+
+ private long iterations = 0;
- private long lastDuration = 0;
+ private long sumDuration = 0;
private long maxDuration = 0;
@@ -54,18 +56,21 @@ class BackgroundThread extends Thread {
public void run() {
try {
while (waitUntilNextIteration()) {
- long start = nanoTime();
+ setName(name + ", active since " + new Date()
+ + ", previous max duration " + maxDuration + "ms");
+
+ long start = currentTimeMillis();
super.run();
- long seconds = SECONDS.convert(nanoTime() - start,
NANOSECONDS);
+ long duration = currentTimeMillis() - start;
- if (lastDuration != seconds) {
- lastDuration = seconds;
- if (maxDuration < seconds) {
- maxDuration = seconds;
- }
- // make execution statistics visible in thread dumps
- setName(name + " " + lastDuration + "/" + maxDuration);
- }
+ iterations++;
+ sumDuration += duration;
+ maxDuration = Math.max(maxDuration, duration);
+
+ // make execution statistics visible in thread dumps
+ setName(name
+ + ", avg " + (sumDuration / iterations) + "ms"
+ + ", max " + maxDuration + "ms");
}
} catch (InterruptedException e) {
log.error(name + " interrupted", e);
@@ -88,29 +93,21 @@ class BackgroundThread extends Thread {
private synchronized void trigger(boolean close) {
if (close) {
- backlog = -1;
- } else if (backlog >= 0) {
- backlog++;
+ alive = false;
}
notify();
}
private synchronized boolean waitUntilNextIteration()
throws InterruptedException {
- if (backlog == 0) {
- // no backlog to process (and not closed), so wait...
+ if (alive) {
if (interval < 0) {
wait();
} else {
wait(interval);
}
}
-
- if (backlog > 0) {
- backlog--;
- }
-
- return backlog >= 0;
+ return alive;
}
}
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Mon Jul 7 19:07:44 2014
@@ -27,7 +27,6 @@ import static java.lang.String.format;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import java.io.File;
@@ -42,7 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -57,6 +55,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -117,29 +116,24 @@ public class FileStore implements Segmen
* The background flush thread. Automatically flushes the TarMK state
* once every five seconds.
*/
- private final Thread flushThread;
+ private final BackgroundThread flushThread;
/**
- * Flag to request revision cleanup during the next flush.
+ * The background compaction thread. Compacts the TarMK contents whenever
+ * triggered by the {@link #gc()} method.
*/
- private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
+ private final BackgroundThread compactionThread;
/**
- * Flag to request segment compaction during the next flush.
+ * Flag to request revision cleanup during the next flush.
*/
- private final AtomicBoolean compactNeeded = new AtomicBoolean(false);
+ private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
/**
* List of old tar file generations that are waiting to be removed.
*/
private final LinkedList<File> toBeRemoved = newLinkedList();
- /**
- * Synchronization aid used by the background flush thread to stop itself
- * as soon as the {@link #close()} method is called.
- */
- private final CountDownLatch timeToClose = new CountDownLatch(1);
-
public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB,
boolean memoryMapping)
throws IOException {
this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0,
memoryMapping);
@@ -231,33 +225,27 @@ public class FileStore implements Segmen
persistedHead = new AtomicReference<RecordId>(null);
}
- this.flushThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- timeToClose.await(1, SECONDS);
- while (timeToClose.getCount() > 0) {
- long start = System.nanoTime();
- compact();
+ this.flushThread = new BackgroundThread(
+ "TarMK flush thread [" + directory + "]", 5000, // 5s interval
+ new Runnable() {
+ @Override
+ public void run() {
try {
flush();
} catch (IOException e) {
log.warn("Failed to flush the TarMK at" +
directory, e);
}
- long time = SECONDS.convert(
- System.nanoTime() - start, NANOSECONDS);
- timeToClose.await(Math.max(5, 2 * time), SECONDS);
}
- } catch (InterruptedException e) {
- log.warn("TarMK flush thread interrupted");
- }
- }
- });
- flushThread.setName("TarMK flush thread: " + directory);
- flushThread.setDaemon(true);
- flushThread.setPriority(Thread.MIN_PRIORITY);
- flushThread.start();
+ });
+ this.compactionThread = new BackgroundThread(
+ "TarMK compaction thread [" + directory + "]", -1,
+ new Runnable() {
+ @Override
+ public void run() {
+ compact();
+ }
+ });
log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
}
@@ -334,6 +322,14 @@ public class FileStore implements Segmen
return dataFiles;
}
+ public synchronized long size() throws IOException {
+ long size = writeFile.length();
+ for (TarReader reader : readers) {
+ size += reader.size();
+ }
+ return size;
+ }
+
public void flush() throws IOException {
synchronized (persistedHead) {
RecordId before = persistedHead.get();
@@ -355,39 +351,7 @@ public class FileStore implements Segmen
persistedHead.set(after);
if (cleanup) {
- long start = System.nanoTime();
-
- // Suggest to the JVM that now would be a good time
- // to clear stale weak references in the SegmentTracker
- System.gc();
-
- Set<UUID> ids = newHashSet();
- for (SegmentId id : tracker.getReferencedSegmentIds())
{
- ids.add(new UUID(
- id.getMostSignificantBits(),
- id.getLeastSignificantBits()));
- }
- writer.cleanup(ids);
-
- List<TarReader> list =
- newArrayListWithCapacity(readers.size());
- for (TarReader reader : readers) {
- TarReader cleaned = reader.cleanup(ids);
- if (cleaned == reader) {
- list.add(reader);
- } else {
- if (cleaned != null) {
- list.add(cleaned);
- }
- File file = reader.close();
- log.info("TarMK GC: Cleaned up file {}", file);
- toBeRemoved.addLast(file);
- }
- }
- readers = list;
-
- log.debug("TarMK GC: Completed in {}ms",
- MILLISECONDS.convert(System.nanoTime() -
start, NANOSECONDS));
+ cleanup();
}
}
@@ -404,15 +368,73 @@ public class FileStore implements Segmen
}
}
- public void compact() {
- if (compactNeeded.getAndSet(false)) {
- long start = System.nanoTime();
- tracker.getWriter().dropCache();
- tracker.setCompactionMap(Compactor.compact(this));
- log.info("TarMK Compaction: Completed in {}ms", MILLISECONDS
- .convert(System.nanoTime() - start, NANOSECONDS));
- cleanupNeeded.set(true);
+ public synchronized void cleanup() throws IOException {
+ long start = System.nanoTime();
+ log.info("TarMK revision cleanup started");
+
+ // Suggest to the JVM that now would be a good time
+ // to clear stale weak references in the SegmentTracker
+ System.gc();
+
+ Set<UUID> ids = newHashSet();
+ for (SegmentId id : tracker.getReferencedSegmentIds()) {
+ ids.add(new UUID(
+ id.getMostSignificantBits(),
+ id.getLeastSignificantBits()));
}
+ writer.cleanup(ids);
+
+ List<TarReader> list =
+ newArrayListWithCapacity(readers.size());
+ for (TarReader reader : readers) {
+ TarReader cleaned = reader.cleanup(ids);
+ if (cleaned == reader) {
+ list.add(reader);
+ } else {
+ if (cleaned != null) {
+ list.add(cleaned);
+ }
+ File file = reader.close();
+ log.info("TarMK revision cleanup reclaiming {}",
file.getName());
+ toBeRemoved.addLast(file);
+ }
+ }
+ readers = list;
+
+ log.info("TarMK revision cleanup completed in {}ms",
+ MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS));
+ }
+
+ public void compact() {
+ long start = System.nanoTime();
+ log.info("TarMK compaction started");
+
+ SegmentWriter writer = new SegmentWriter(this, tracker);
+ Compactor compactor = new Compactor(writer);
+
+ SegmentNodeState before = getHead();
+ SegmentNodeState after = compactor.compact(EMPTY_NODE, before);
+ writer.flush();
+ while (!setHead(before, after)) {
+ // Some other concurrent changes have been made.
+ // Rebase (and compact) those changes on top of the
+ // compacted state before retrying to set the head.
+ SegmentNodeState head = getHead();
+ after = compactor.compact(before, head);
+ before = head;
+ writer.flush();
+ }
+ tracker.setCompactionMap(compactor.getCompactionMap());
+
+ // Drop the SegmentWriter caches and flush any existing state
+ // in an attempt to prevent new references to old pre-compacted
+ // content. TODO: There should be a cleaner way to do this.
+ tracker.getWriter().dropCache();
+ tracker.getWriter().flush();
+
+ log.info("TarMK compaction completed in {}ms", MILLISECONDS
+ .convert(System.nanoTime() - start, NANOSECONDS));
+ cleanupNeeded.set(true);
}
public synchronized Iterable<SegmentId> getSegmentIds() {
@@ -451,17 +473,13 @@ public class FileStore implements Segmen
@Override
public void close() {
- try {
- // avoid deadlocks while joining the flush thread
- timeToClose.countDown();
- try {
- flushThread.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Interrupted while joining the TarMK flush thread",
e);
- }
+ // avoid deadlocks by closing (and joining) the background
+ // threads before acquiring the synchronization lock
+ compactionThread.close();
+ flushThread.close();
- synchronized (this) {
+ synchronized (this) {
+ try {
flush();
writer.close();
@@ -474,14 +492,14 @@ public class FileStore implements Segmen
journalLock.release();
journalFile.close();
-
- System.gc(); // for any memory-mappings that are no longer used
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to close the TarMK at " + directory, e);
}
- } catch (IOException e) {
- throw new RuntimeException(
- "Failed to close the TarMK at " + directory, e);
}
+ System.gc(); // for any memory-mappings that are no longer used
+
log.info("TarMK closed: {}", directory);
}
@@ -610,7 +628,7 @@ public class FileStore implements Segmen
@Override
public void gc() {
- compactNeeded.set(true);
+ compactionThread.trigger();
}
}
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
Mon Jul 7 19:07:44 2014
@@ -481,6 +481,10 @@ class TarReader {
this.graph = loadGraph(file, access, index);
}
+ long size() {
+ return file.length();
+ }
+
Set<UUID> getUUIDs() {
Set<UUID> uuids = newHashSetWithExpectedSize(index.remaining() / 24);
int position = index.position();
Modified:
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
Mon Jul 7 19:07:44 2014
@@ -31,8 +31,12 @@ import java.util.Map;
import java.util.Random;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.Compactor;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
import org.junit.Before;
import org.junit.Test;
@@ -84,6 +88,70 @@ public class FileStoreTest {
}
@Test
+ public void testCompaction() throws IOException {
+ int largeBinarySize = 10 * 1024 * 1024;
+
+ FileStore store = new FileStore(directory, 1, false);
+ SegmentWriter writer = store.getTracker().getWriter();
+
+ SegmentNodeState base = store.getHead();
+ SegmentNodeBuilder builder = base.builder();
+ byte[] data = new byte[largeBinarySize];
+ new Random().nextBytes(data);
+ SegmentBlob blob = writer.writeStream(new ByteArrayInputStream(data));
+ builder.setProperty("foo", blob);
+ builder.getNodeState(); // write the blob reference to the segment
+ builder.setProperty("foo", "bar");
+ SegmentNodeState head = builder.getNodeState();
+ assertTrue(store.setHead(base, head));
+ assertEquals("bar", store.getHead().getString("foo"));
+ store.close();
+
+ // First simulate the case where during compaction a reference to the
+ // older segments is added to a segment that the compactor is writing
+ store = new FileStore(directory, 1, false);
+ head = store.getHead();
+ assertTrue(store.size() > largeBinarySize);
+ Compactor compactor = new Compactor(writer);
+ SegmentNodeState compacted =
+ compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+ builder = head.builder();
+ builder.setChildNode("old", head); // reference to pre-compacted state
+ builder.getNodeState();
+ assertTrue(store.setHead(head, compacted));
+ store.close();
+
+ // In this case the revision cleanup is unable to reclaim the old data
+ store = new FileStore(directory, 1, false);
+ assertTrue(store.size() > largeBinarySize);
+ store.cleanup();
+ assertTrue(store.size() > largeBinarySize);
+ store.close();
+
+ // Now we do the same thing, but let the compactor use a different
+ // SegmentWriter
+ store = new FileStore(directory, 1, false);
+ head = store.getHead();
+ assertTrue(store.size() > largeBinarySize);
+ writer = new SegmentWriter(store, store.getTracker());
+ compactor = new Compactor(writer);
+ compacted = compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+ builder = head.builder();
+ builder.setChildNode("old", head); // reference to pre-compacted state
+ builder.getNodeState();
+ writer.flush();
+ assertTrue(store.setHead(head, compacted));
+ store.close();
+
+ // Revision cleanup is now able to reclaim the extra space (OAK-1932)
+ store = new FileStore(directory, 1, false);
+ assertTrue(store.size() > largeBinarySize);
+ store.cleanup();
+ assertTrue(store.size() < largeBinarySize);
+ store.close();
+ }
+
+ @Test
public void testRecovery() throws IOException {
FileStore store = new FileStore(directory, 1, false);
store.flush(); // first 1kB
Modified:
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
---
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
(original)
+++
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
Mon Jul 7 19:07:44 2014
@@ -54,7 +54,6 @@ import org.apache.jackrabbit.oak.jcr.Jcr
import org.apache.jackrabbit.oak.plugins.backup.FileStoreBackup;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
-import org.apache.jackrabbit.oak.plugins.segment.Compactor;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
@@ -172,7 +171,7 @@ public class Main {
System.out.println(" -> compacting");
FileStore store = new FileStore(directory, 256, false);
try {
- Compactor.compact(store);
+ store.compact();
} finally {
store.close();
}
@@ -180,8 +179,7 @@ public class Main {
System.out.println(" -> cleaning up");
store = new FileStore(directory, 256, false);
try {
- store.gc();
- store.flush();
+ store.cleanup();
} finally {
store.close();
}