Author: jukka
Date: Tue Jul  1 21:14:45 2014
New Revision: 1607185

URL: http://svn.apache.org/r1607185
Log:
OAK-1927: TarMK compaction delays journal updates

Refactor the existing flush thread functionality to a new BackgroundThread class
Add a new TarMK compaction background thread

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java?rev=1607185&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
 Tue Jul  1 21:14:45 2014
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.file;
+
+import static java.lang.System.nanoTime;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BackgroundThread extends Thread {
+
+    /** Logger instance */
+    private static final Logger log =
+            LoggerFactory.getLogger(BackgroundThread.class);
+
+    private final String name;
+
+    private final long interval;
+
+    private long backlog = 0;
+
+    private long lastDuration = 0;
+
+    private long maxDuration = 0;
+
+    BackgroundThread(String name, long interval, Runnable target) {
+        super(target, name);
+
+        this.name = name;
+        this.interval = interval;
+
+        setDaemon(true);
+        setPriority(MIN_PRIORITY);
+        start();
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (waitUntilNextIteration()) {
+                long start = nanoTime();
+                super.run();
+                long seconds = SECONDS.convert(nanoTime() - start, 
NANOSECONDS);
+
+                if (lastDuration != seconds) {
+                    lastDuration = seconds;
+                    if (maxDuration < seconds) {
+                        maxDuration = seconds;
+                    }
+                    // make execution statistics visible in thread dumps
+                    setName(name + " " + lastDuration + "/" + maxDuration);
+                }
+            }
+        } catch (InterruptedException e) {
+            log.error(name + " interrupted", e);
+        }
+    }
+
+    void trigger() {
+        trigger(false);
+    }
+
+    void close() {
+        try {
+            trigger(true);
+            join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error(name + " join interrupted", e);
+        }
+    }
+
+    private synchronized void trigger(boolean close) {
+        if (close) {
+            backlog = -1;
+        } else if (backlog >= 0) {
+            backlog++;
+        }
+        notify();
+    }
+
+    private synchronized boolean waitUntilNextIteration()
+            throws InterruptedException {
+        if (backlog == 0) {
+            // no backlog to process (and not closed), so wait...
+            if (interval < 0) {
+                wait();
+            } else {
+                wait(interval);
+            }
+        }
+
+        if (backlog > 0) {
+            backlog--;
+        }
+
+        return backlog >= 0;
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1607185&r1=1607184&r2=1607185&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
 Tue Jul  1 21:14:45 2014
@@ -27,7 +27,6 @@ import static java.lang.String.format;
 import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
 import java.io.File;
@@ -43,7 +42,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -119,29 +117,24 @@ public class FileStore implements Segmen
      * The background flush thread. Automatically flushes the TarMK state
      * once every five seconds.
      */
-    private final Thread flushThread;
+    private final BackgroundThread flushThread;
 
     /**
-     * Flag to request revision cleanup during the next flush.
+     * The background compaction thread. Compacts the TarMK contents whenever
+     * triggered by the {@link #gc()} method.
      */
-    private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
+    private final BackgroundThread compactionThread;
 
     /**
-     * Flag to request segment compaction during the next flush.
+     * Flag to request revision cleanup during the next flush.
      */
-    private final AtomicBoolean compactNeeded = new AtomicBoolean(false);
+    private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
 
     /**
      * List of old tar file generations that are waiting to be removed.
      */
     private final LinkedList<File> toBeRemoved = newLinkedList();
 
-    /**
-     * Synchronization aid used by the background flush thread to stop itself
-     * as soon as the {@link #close()} method is called.
-     */
-    private final CountDownLatch timeToClose = new CountDownLatch(1);
-
     public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB, 
boolean memoryMapping)
             throws IOException {
         this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0, 
memoryMapping);
@@ -233,32 +226,27 @@ public class FileStore implements Segmen
             persistedHead = new AtomicReference<RecordId>(null);
         }
 
-        this.flushThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    timeToClose.await(1, SECONDS);
-                    while (timeToClose.getCount() > 0) {
-                        long start = System.nanoTime();
+        this.flushThread = new BackgroundThread(
+                "TarMK flush thread [" + directory + "]", 5000, // 5s interval
+                new Runnable() {
+                    @Override
+                    public void run() {
                         try {
                             flush();
                         } catch (IOException e) {
                             log.warn("Failed to flush the TarMK at" +
                                     directory, e);
                         }
-                        long time = SECONDS.convert(
-                                System.nanoTime() - start, NANOSECONDS);
-                        timeToClose.await(Math.max(5, 2 * time), SECONDS);
                     }
-                } catch (InterruptedException e) {
-                    log.warn("TarMK flush thread interrupted");
-                }
-            }
-        });
-        flushThread.setName("TarMK flush thread: " + directory);
-        flushThread.setDaemon(true);
-        flushThread.setPriority(Thread.MIN_PRIORITY);
-        flushThread.start();
+                });
+        this.compactionThread = new BackgroundThread(
+                "TarMK compaction thread [" + directory + "]", -1,
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        compact();
+                    }
+                });
 
         log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
     }
@@ -344,10 +332,6 @@ public class FileStore implements Segmen
     }
 
     public void flush() throws IOException {
-        if (compactNeeded.getAndSet(false)) {
-            compact();
-        }
-
         synchronized (persistedHead) {
             RecordId before = persistedHead.get();
             RecordId after = head.get();
@@ -482,17 +466,13 @@ public class FileStore implements Segmen
 
     @Override
     public void close() {
-        try {
-            // avoid deadlocks while joining the flush thread
-            timeToClose.countDown();
-            try {
-                flushThread.join();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.warn("Interrupted while joining the TarMK flush thread", 
e);
-            }
+        // avoid deadlocks by closing (and joining) the background
+        // threads before acquiring the synchronization lock
+        compactionThread.close();
+        flushThread.close();
 
-            synchronized (this) {
+        synchronized (this) {
+            try {
                 flush();
 
                 writer.close();
@@ -505,14 +485,14 @@ public class FileStore implements Segmen
 
                 journalLock.release();
                 journalFile.close();
-
-                System.gc(); // for any memory-mappings that are no longer used
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Failed to close the TarMK at " + directory, e);
             }
-        } catch (IOException e) {
-            throw new RuntimeException(
-                    "Failed to close the TarMK at " + directory, e);
         }
 
+        System.gc(); // for any memory-mappings that are no longer used
+
         log.info("TarMK closed: {}", directory);
     }
 
@@ -641,7 +621,7 @@ public class FileStore implements Segmen
 
     @Override
     public void gc() {
-        compactNeeded.set(true);
+        compactionThread.trigger();
     }
 
     public Map<String, Set<UUID>> getTarReaderIndex() {


Reply via email to