Author: jukka
Date: Mon Jul  7 19:07:44 2014
New Revision: 1608564

URL: http://svn.apache.org/r1608564
Log:
1.0.2: Merged revisions 1607077, 1607185, 1607196, 1607526, 1607664 and 1608560 
(OAK-1927, OAK-1932)

Added:
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
      - copied, changed from r1607185, 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
    
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java
      - copied unchanged from r1607664, 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactorTest.java
Modified:
    jackrabbit/oak/branches/1.0/   (props changed)
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
    
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
    
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java

Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk:r1607077,1607185,1607196,1607526,1607664,1608560

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
 Mon Jul  7 19:07:44 2014
@@ -64,7 +64,7 @@ import java.util.UUID;
  * average, the amortized size of each entry in this mapping is about
  * {@code 20/n + 8} bytes, assuming compressed pointers.
  */
-class CompactionMap {
+public class CompactionMap {
 
     private final int compressInterval;
     private final Map<RecordId, RecordId> recent = newHashMap();
@@ -93,6 +93,19 @@ class CompactionMap {
         return after.equals(get(before));
     }
 
+    /**
+     * Checks whether content in the segment with the given identifier was
+     * compacted to new segments.
+     *
+     * @param id segment identifier
+     * @return whether the identified segment was compacted
+     */
+    boolean wasCompacted(SegmentId id) {
+        long msb = id.getMostSignificantBits();
+        long lsb = id.getLeastSignificantBits();
+        return findEntry(msb, lsb) != -1;
+    }
+
     public RecordId get(RecordId before) {
         RecordId after = recent.get(before);
         if (after != null) {

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
 Mon Jul  7 19:07:44 2014
@@ -20,7 +20,6 @@ import static com.google.common.collect.
 import static com.google.common.collect.Maps.newHashMap;
 import static org.apache.jackrabbit.oak.api.Type.BINARIES;
 import static org.apache.jackrabbit.oak.api.Type.BINARY;
-import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -52,33 +51,6 @@ public class Compactor {
     /** Logger instance */
     private static final Logger log = LoggerFactory.getLogger(Compactor.class);
 
-    public static CompactionMap compact(SegmentStore store) {
-        SegmentWriter writer = store.getTracker().getWriter();
-        Compactor compactor = new Compactor(writer);
-
-        log.debug("TarMK compaction");
-
-        SegmentNodeBuilder builder = writer.writeNode(EMPTY_NODE).builder();
-        SegmentNodeState before = store.getHead();
-        EmptyNodeState.compareAgainstEmptyState(
-                before, compactor.newCompactDiff(builder));
-
-        SegmentNodeState after = builder.getNodeState();
-        while (!store.setHead(before, after)) {
-            // Some other concurrent changes have been made.
-            // Rebase (and compact) those changes on top of the
-            // compacted state before retrying to set the head.
-            SegmentNodeState head = store.getHead();
-            head.compareAgainstBaseState(
-                    before, compactor.newCompactDiff(builder));
-            before = head;
-            after = builder.getNodeState();
-        }
-
-        compactor.map.compress();
-        return compactor.map;
-    }
-
     /**
      * Locks down the RecordId persistence structure
      */
@@ -98,12 +70,20 @@ public class Compactor {
      */
     private final Map<String, List<RecordId>> binaries = newHashMap();
 
-    private Compactor(SegmentWriter writer) {
+    public Compactor(SegmentWriter writer) {
         this.writer = writer;
     }
 
-    private CompactDiff newCompactDiff(NodeBuilder builder) {
-        return new CompactDiff(builder);
+    public SegmentNodeState compact(NodeState before, NodeState after) {
+        SegmentNodeBuilder builder = new SegmentNodeBuilder(
+                writer.writeNode(before), writer);
+        after.compareAgainstBaseState(before, new CompactDiff(builder));
+        return builder.getNodeState();
+    }
+
+    public CompactionMap getCompactionMap() {
+        map.compress();
+        return map;
     }
 
     private class CompactDiff extends ApplyDiff {

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
 Mon Jul  7 19:07:44 2014
@@ -65,6 +65,15 @@ class Record {
     }
 
     /**
+     * Returns the tracker of the segment that contains this record.
+     *
+     * @return segment tracker
+     */
+    protected SegmentTracker getTracker() {
+        return segmentId.getTracker();
+    }
+
+    /**
      * Returns the segment that contains this record.
      *
      * @return segment that contains this record

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeBuilder.java
 Mon Jul  7 19:07:44 2014
@@ -33,8 +33,12 @@ public class SegmentNodeBuilder extends 
     private long updateCount = 0;
 
     SegmentNodeBuilder(SegmentNodeState base) {
+        this(base, base.getTracker().getWriter());
+    }
+
+    SegmentNodeBuilder(SegmentNodeState base, SegmentWriter writer) {
         super(base);
-        this.writer = 
base.getRecordId().getSegmentId().getTracker().getWriter();
+        this.writer = writer;
     }
 
     //-------------------------------------------------< MemoryNodeBuilder >--

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
 Mon Jul  7 19:07:44 2014
@@ -923,10 +923,22 @@ public class SegmentWriter {
         return id;
     }
 
+    private SegmentNodeState uncompact(SegmentNodeState state) {
+        RecordId id = tracker.getCompactionMap().get(state.getRecordId());
+        if (id != null) {
+            return new SegmentNodeState(id);
+        } else {
+            return state;
+        }
+    }
+
     public SegmentNodeState writeNode(NodeState state) {
-        if (state instanceof SegmentNodeState
-                && store.containsSegment(((SegmentNodeState) 
state).getRecordId().getSegmentId())) {
-            return (SegmentNodeState) state;
+        if (state instanceof SegmentNodeState) {
+            SegmentNodeState sns = uncompact((SegmentNodeState) state);
+            if (sns != state || store.containsSegment(
+                    sns.getRecordId().getSegmentId())) {
+                return sns;
+            }
         }
 
         SegmentNodeState before = null;
@@ -935,10 +947,13 @@ public class SegmentWriter {
         if (state instanceof ModifiedNodeState) {
             after = (ModifiedNodeState) state;
             NodeState base = after.getBaseState();
-            if (base instanceof SegmentNodeState
-                    && store.containsSegment(((SegmentNodeState) 
base).getRecordId().getSegmentId())) {
-                before = (SegmentNodeState) base;
-                beforeTemplate = before.getTemplate();
+            if (base instanceof SegmentNodeState) {
+                SegmentNodeState sns = uncompact((SegmentNodeState) base);
+                if (sns != base || store.containsSegment(
+                        sns.getRecordId().getSegmentId())) {
+                    before = sns;
+                    beforeTemplate = before.getTemplate();
+                }
             }
         }
 

Copied: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
 (from r1607185, 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java)
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java?p2=jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java&r1=1607185&r2=1608564&rev=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
 Mon Jul  7 19:07:44 2014
@@ -16,9 +16,9 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.file;
 
-import static java.lang.System.nanoTime;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.lang.System.currentTimeMillis;
+
+import java.util.Date;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,9 +33,11 @@ class BackgroundThread extends Thread {
 
     private final long interval;
 
-    private long backlog = 0;
+    private boolean alive = true;
+
+    private long iterations = 0;
 
-    private long lastDuration = 0;
+    private long sumDuration = 0;
 
     private long maxDuration = 0;
 
@@ -54,18 +56,21 @@ class BackgroundThread extends Thread {
     public void run() {
         try {
             while (waitUntilNextIteration()) {
-                long start = nanoTime();
+                setName(name + ", active since " + new Date()
+                        + ", previous max duration " + maxDuration + "ms");
+
+                long start = currentTimeMillis();
                 super.run();
-                long seconds = SECONDS.convert(nanoTime() - start, 
NANOSECONDS);
+                long duration = currentTimeMillis() - start;
 
-                if (lastDuration != seconds) {
-                    lastDuration = seconds;
-                    if (maxDuration < seconds) {
-                        maxDuration = seconds;
-                    }
-                    // make execution statistics visible in thread dumps
-                    setName(name + " " + lastDuration + "/" + maxDuration);
-                }
+                iterations++;
+                sumDuration += duration;
+                maxDuration = Math.max(maxDuration, duration);
+
+                // make execution statistics visible in thread dumps
+                setName(name
+                        + ", avg " + (sumDuration / iterations) + "ms"
+                        + ", max " + maxDuration + "ms");
             }
         } catch (InterruptedException e) {
             log.error(name + " interrupted", e);
@@ -88,29 +93,21 @@ class BackgroundThread extends Thread {
 
     private synchronized void trigger(boolean close) {
         if (close) {
-            backlog = -1;
-        } else if (backlog >= 0) {
-            backlog++;
+            alive = false;
         }
         notify();
     }
 
     private synchronized boolean waitUntilNextIteration()
             throws InterruptedException {
-        if (backlog == 0) {
-            // no backlog to process (and not closed), so wait...
+        if (alive) {
             if (interval < 0) {
                 wait();
             } else {
                 wait(interval);
             }
         }
-
-        if (backlog > 0) {
-            backlog--;
-        }
-
-        return backlog >= 0;
+        return alive;
     }
 
 }

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
 Mon Jul  7 19:07:44 2014
@@ -27,7 +27,6 @@ import static java.lang.String.format;
 import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
 import java.io.File;
@@ -42,7 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -57,6 +55,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -117,29 +116,24 @@ public class FileStore implements Segmen
      * The background flush thread. Automatically flushes the TarMK state
      * once every five seconds.
      */
-    private final Thread flushThread;
+    private final BackgroundThread flushThread;
 
     /**
-     * Flag to request revision cleanup during the next flush.
+     * The background compaction thread. Compacts the TarMK contents whenever
+     * triggered by the {@link #gc()} method.
      */
-    private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
+    private final BackgroundThread compactionThread;
 
     /**
-     * Flag to request segment compaction during the next flush.
+     * Flag to request revision cleanup during the next flush.
      */
-    private final AtomicBoolean compactNeeded = new AtomicBoolean(false);
+    private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
 
     /**
      * List of old tar file generations that are waiting to be removed.
      */
     private final LinkedList<File> toBeRemoved = newLinkedList();
 
-    /**
-     * Synchronization aid used by the background flush thread to stop itself
-     * as soon as the {@link #close()} method is called.
-     */
-    private final CountDownLatch timeToClose = new CountDownLatch(1);
-
     public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB, 
boolean memoryMapping)
             throws IOException {
         this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0, 
memoryMapping);
@@ -231,33 +225,27 @@ public class FileStore implements Segmen
             persistedHead = new AtomicReference<RecordId>(null);
         }
 
-        this.flushThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    timeToClose.await(1, SECONDS);
-                    while (timeToClose.getCount() > 0) {
-                        long start = System.nanoTime();
-                        compact();
+        this.flushThread = new BackgroundThread(
+                "TarMK flush thread [" + directory + "]", 5000, // 5s interval
+                new Runnable() {
+                    @Override
+                    public void run() {
                         try {
                             flush();
                         } catch (IOException e) {
                             log.warn("Failed to flush the TarMK at" +
                                     directory, e);
                         }
-                        long time = SECONDS.convert(
-                                System.nanoTime() - start, NANOSECONDS);
-                        timeToClose.await(Math.max(5, 2 * time), SECONDS);
                     }
-                } catch (InterruptedException e) {
-                    log.warn("TarMK flush thread interrupted");
-                }
-            }
-        });
-        flushThread.setName("TarMK flush thread: " + directory);
-        flushThread.setDaemon(true);
-        flushThread.setPriority(Thread.MIN_PRIORITY);
-        flushThread.start();
+                });
+        this.compactionThread = new BackgroundThread(
+                "TarMK compaction thread [" + directory + "]", -1,
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        compact();
+                    }
+                });
 
         log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
     }
@@ -334,6 +322,14 @@ public class FileStore implements Segmen
         return dataFiles;
     }
 
+    public synchronized long size() throws IOException {
+        long size = writeFile.length();
+        for (TarReader reader : readers) {
+            size += reader.size();
+        }
+        return size;
+    }
+
     public void flush() throws IOException {
         synchronized (persistedHead) {
             RecordId before = persistedHead.get();
@@ -355,39 +351,7 @@ public class FileStore implements Segmen
                     persistedHead.set(after);
 
                     if (cleanup) {
-                        long start = System.nanoTime();
-
-                        // Suggest to the JVM that now would be a good time
-                        // to clear stale weak references in the SegmentTracker
-                        System.gc();
-
-                        Set<UUID> ids = newHashSet();
-                        for (SegmentId id : tracker.getReferencedSegmentIds()) 
{
-                            ids.add(new UUID(
-                                    id.getMostSignificantBits(),
-                                    id.getLeastSignificantBits()));
-                        }
-                        writer.cleanup(ids);
-
-                        List<TarReader> list =
-                                newArrayListWithCapacity(readers.size());
-                        for (TarReader reader : readers) {
-                            TarReader cleaned = reader.cleanup(ids);
-                            if (cleaned == reader) {
-                                list.add(reader);
-                            } else {
-                                if (cleaned != null) {
-                                    list.add(cleaned);
-                                }
-                                File file = reader.close();
-                                log.info("TarMK GC: Cleaned up file {}", file);
-                                toBeRemoved.addLast(file);
-                            }
-                        }
-                        readers = list;
-
-                        log.debug("TarMK GC: Completed in {}ms",
-                                MILLISECONDS.convert(System.nanoTime() - 
start, NANOSECONDS));
+                        cleanup();
                     }
                 }
 
@@ -404,15 +368,73 @@ public class FileStore implements Segmen
         }
     }
 
-    public void compact() {
-        if (compactNeeded.getAndSet(false)) {
-            long start = System.nanoTime();
-            tracker.getWriter().dropCache();
-            tracker.setCompactionMap(Compactor.compact(this));
-            log.info("TarMK Compaction: Completed in {}ms", MILLISECONDS
-                    .convert(System.nanoTime() - start, NANOSECONDS));
-            cleanupNeeded.set(true);
+    public synchronized void cleanup() throws IOException {
+        long start = System.nanoTime();
+        log.info("TarMK revision cleanup started");
+
+        // Suggest to the JVM that now would be a good time
+        // to clear stale weak references in the SegmentTracker
+        System.gc();
+
+        Set<UUID> ids = newHashSet();
+        for (SegmentId id : tracker.getReferencedSegmentIds()) {
+            ids.add(new UUID(
+                    id.getMostSignificantBits(),
+                    id.getLeastSignificantBits()));
         }
+        writer.cleanup(ids);
+
+        List<TarReader> list =
+                newArrayListWithCapacity(readers.size());
+        for (TarReader reader : readers) {
+            TarReader cleaned = reader.cleanup(ids);
+            if (cleaned == reader) {
+                list.add(reader);
+            } else {
+                if (cleaned != null) {
+                    list.add(cleaned);
+                }
+                File file = reader.close();
+                log.info("TarMK revision cleanup reclaiming {}", 
file.getName());
+                toBeRemoved.addLast(file);
+            }
+        }
+        readers = list;
+
+        log.info("TarMK revision cleanup completed in {}ms",
+                MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS));
+    }
+
+    public void compact() {
+        long start = System.nanoTime();
+        log.info("TarMK compaction started");
+
+        SegmentWriter writer = new SegmentWriter(this, tracker);
+        Compactor compactor = new Compactor(writer);
+
+        SegmentNodeState before = getHead();
+        SegmentNodeState after = compactor.compact(EMPTY_NODE, before);
+        writer.flush();
+        while (!setHead(before, after)) {
+            // Some other concurrent changes have been made.
+            // Rebase (and compact) those changes on top of the
+            // compacted state before retrying to set the head.
+            SegmentNodeState head = getHead();
+            after = compactor.compact(before, head);
+            before = head;
+            writer.flush();
+        }
+        tracker.setCompactionMap(compactor.getCompactionMap());
+
+        // Drop the SegmentWriter caches and flush any existing state
+        // in an attempt to prevent new references to old pre-compacted
+        // content. TODO: There should be a cleaner way to do this.
+        tracker.getWriter().dropCache();
+        tracker.getWriter().flush();
+
+        log.info("TarMK compaction completed in {}ms", MILLISECONDS
+                .convert(System.nanoTime() - start, NANOSECONDS));
+        cleanupNeeded.set(true);
     }
 
     public synchronized Iterable<SegmentId> getSegmentIds() {
@@ -451,17 +473,13 @@ public class FileStore implements Segmen
 
     @Override
     public void close() {
-        try {
-            // avoid deadlocks while joining the flush thread
-            timeToClose.countDown();
-            try {
-                flushThread.join();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.warn("Interrupted while joining the TarMK flush thread", 
e);
-            }
+        // avoid deadlocks by closing (and joining) the background
+        // threads before acquiring the synchronization lock
+        compactionThread.close();
+        flushThread.close();
 
-            synchronized (this) {
+        synchronized (this) {
+            try {
                 flush();
 
                 writer.close();
@@ -474,14 +492,14 @@ public class FileStore implements Segmen
 
                 journalLock.release();
                 journalFile.close();
-
-                System.gc(); // for any memory-mappings that are no longer used
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Failed to close the TarMK at " + directory, e);
             }
-        } catch (IOException e) {
-            throw new RuntimeException(
-                    "Failed to close the TarMK at " + directory, e);
         }
 
+        System.gc(); // for any memory-mappings that are no longer used
+
         log.info("TarMK closed: {}", directory);
     }
 
@@ -610,7 +628,7 @@ public class FileStore implements Segmen
 
     @Override
     public void gc() {
-        compactNeeded.set(true);
+        compactionThread.trigger();
     }
 
 }

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
 Mon Jul  7 19:07:44 2014
@@ -481,6 +481,10 @@ class TarReader {
         this.graph = loadGraph(file, access, index);
     }
 
+    long size() {
+        return file.length();
+    }
+
     Set<UUID> getUUIDs() {
         Set<UUID> uuids = newHashSetWithExpectedSize(index.remaining() / 24);
         int position = index.position();

Modified: 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
 Mon Jul  7 19:07:44 2014
@@ -31,8 +31,12 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.Compactor;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -84,6 +88,70 @@ public class FileStoreTest {
     }
 
     @Test
+    public void testCompaction() throws IOException {
+        int largeBinarySize = 10 * 1024 * 1024;
+
+        FileStore store = new FileStore(directory, 1, false);
+        SegmentWriter writer = store.getTracker().getWriter();
+
+        SegmentNodeState base = store.getHead();
+        SegmentNodeBuilder builder = base.builder();
+        byte[] data = new byte[largeBinarySize];
+        new Random().nextBytes(data);
+        SegmentBlob blob = writer.writeStream(new ByteArrayInputStream(data));
+        builder.setProperty("foo", blob);
+        builder.getNodeState(); // write the blob reference to the segment
+        builder.setProperty("foo", "bar");
+        SegmentNodeState head = builder.getNodeState();
+        assertTrue(store.setHead(base, head));
+        assertEquals("bar", store.getHead().getString("foo"));
+        store.close();
+
+        // First simulate the case where during compaction a reference to the
+        // older segments is added to a segment that the compactor is writing
+        store = new FileStore(directory, 1, false);
+        head = store.getHead();
+        assertTrue(store.size() > largeBinarySize);
+        Compactor compactor = new Compactor(writer);
+        SegmentNodeState compacted =
+                compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+        builder = head.builder();
+        builder.setChildNode("old", head); // reference to pre-compacted state
+        builder.getNodeState();
+        assertTrue(store.setHead(head, compacted));
+        store.close();
+
+        // In this case the revision cleanup is unable to reclaim the old data
+        store = new FileStore(directory, 1, false);
+        assertTrue(store.size() > largeBinarySize);
+        store.cleanup();
+        assertTrue(store.size() > largeBinarySize);
+        store.close();
+
+        // Now we do the same thing, but let the compactor use a different
+        // SegmentWriter
+        store = new FileStore(directory, 1, false);
+        head = store.getHead();
+        assertTrue(store.size() > largeBinarySize);
+        writer = new SegmentWriter(store, store.getTracker());
+        compactor = new Compactor(writer);
+        compacted = compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+        builder = head.builder();
+        builder.setChildNode("old", head); // reference to pre-compacted state
+        builder.getNodeState();
+        writer.flush();
+        assertTrue(store.setHead(head, compacted));
+        store.close();
+
+        // Revision cleanup is now able to reclaim the extra space (OAK-1932)
+        store = new FileStore(directory, 1, false);
+        assertTrue(store.size() > largeBinarySize);
+        store.cleanup();
+        assertTrue(store.size() < largeBinarySize);
+        store.close();
+    }
+
+    @Test
     public void testRecovery() throws IOException {
         FileStore store = new FileStore(directory, 1, false);
         store.flush(); // first 1kB

Modified: 
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1608564&r1=1608563&r2=1608564&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
 Mon Jul  7 19:07:44 2014
@@ -54,7 +54,6 @@ import org.apache.jackrabbit.oak.jcr.Jcr
 import org.apache.jackrabbit.oak.plugins.backup.FileStoreBackup;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
-import org.apache.jackrabbit.oak.plugins.segment.Compactor;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
@@ -172,7 +171,7 @@ public class Main {
             System.out.println("    -> compacting");
             FileStore store = new FileStore(directory, 256, false);
             try {
-                Compactor.compact(store);
+                store.compact();
             } finally {
                 store.close();
             }
@@ -180,8 +179,7 @@ public class Main {
             System.out.println("    -> cleaning up");
             store = new FileStore(directory, 256, false);
             try {
-                store.gc();
-                store.flush();
+                store.cleanup();
             } finally {
                 store.close();
             }


Reply via email to