Author: jukka
Date: Tue Jul 1 21:14:45 2014
New Revision: 1607185
URL: http://svn.apache.org/r1607185
Log:
OAK-1927: TarMK compaction delays journal updates
Refactor the existing flush thread functionality to a new BackgroundThread class
Add a new TarMK compaction background thread
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java?rev=1607185&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
Tue Jul 1 21:14:45 2014
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.file;
+
+import static java.lang.System.nanoTime;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class BackgroundThread extends Thread {
+
+ /** Logger instance */
+ private static final Logger log =
+ LoggerFactory.getLogger(BackgroundThread.class);
+
+ private final String name;
+
+ private final long interval;
+
+ private long backlog = 0;
+
+ private long lastDuration = 0;
+
+ private long maxDuration = 0;
+
+ BackgroundThread(String name, long interval, Runnable target) {
+ super(target, name);
+
+ this.name = name;
+ this.interval = interval;
+
+ setDaemon(true);
+ setPriority(MIN_PRIORITY);
+ start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (waitUntilNextIteration()) {
+ long start = nanoTime();
+ super.run();
+ long seconds = SECONDS.convert(nanoTime() - start,
NANOSECONDS);
+
+ if (lastDuration != seconds) {
+ lastDuration = seconds;
+ if (maxDuration < seconds) {
+ maxDuration = seconds;
+ }
+ // make execution statistics visible in thread dumps
+ setName(name + " " + lastDuration + "/" + maxDuration);
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error(name + " interrupted", e);
+ }
+ }
+
+ void trigger() {
+ trigger(false);
+ }
+
+ void close() {
+ try {
+ trigger(true);
+ join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error(name + " join interrupted", e);
+ }
+ }
+
+ private synchronized void trigger(boolean close) {
+ if (close) {
+ backlog = -1;
+ } else if (backlog >= 0) {
+ backlog++;
+ }
+ notify();
+ }
+
+ private synchronized boolean waitUntilNextIteration()
+ throws InterruptedException {
+ if (backlog == 0) {
+ // no backlog to process (and not closed), so wait...
+ if (interval < 0) {
+ wait();
+ } else {
+ wait(interval);
+ }
+ }
+
+ if (backlog > 0) {
+ backlog--;
+ }
+
+ return backlog >= 0;
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/BackgroundThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1607185&r1=1607184&r2=1607185&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Tue Jul 1 21:14:45 2014
@@ -27,7 +27,6 @@ import static java.lang.String.format;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import java.io.File;
@@ -43,7 +42,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -119,29 +117,24 @@ public class FileStore implements Segmen
* The background flush thread. Automatically flushes the TarMK state
* once every five seconds.
*/
- private final Thread flushThread;
+ private final BackgroundThread flushThread;
/**
- * Flag to request revision cleanup during the next flush.
+ * The background compaction thread. Compacts the TarMK contents whenever
+ * triggered by the {@link #gc()} method.
*/
- private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
+ private final BackgroundThread compactionThread;
/**
- * Flag to request segment compaction during the next flush.
+ * Flag to request revision cleanup during the next flush.
*/
- private final AtomicBoolean compactNeeded = new AtomicBoolean(false);
+ private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
/**
* List of old tar file generations that are waiting to be removed.
*/
private final LinkedList<File> toBeRemoved = newLinkedList();
- /**
- * Synchronization aid used by the background flush thread to stop itself
- * as soon as the {@link #close()} method is called.
- */
- private final CountDownLatch timeToClose = new CountDownLatch(1);
-
public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB,
boolean memoryMapping)
throws IOException {
this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0,
memoryMapping);
@@ -233,32 +226,27 @@ public class FileStore implements Segmen
persistedHead = new AtomicReference<RecordId>(null);
}
- this.flushThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- timeToClose.await(1, SECONDS);
- while (timeToClose.getCount() > 0) {
- long start = System.nanoTime();
+ this.flushThread = new BackgroundThread(
+ "TarMK flush thread [" + directory + "]", 5000, // 5s interval
+ new Runnable() {
+ @Override
+ public void run() {
try {
flush();
} catch (IOException e) {
log.warn("Failed to flush the TarMK at" +
directory, e);
}
- long time = SECONDS.convert(
- System.nanoTime() - start, NANOSECONDS);
- timeToClose.await(Math.max(5, 2 * time), SECONDS);
}
- } catch (InterruptedException e) {
- log.warn("TarMK flush thread interrupted");
- }
- }
- });
- flushThread.setName("TarMK flush thread: " + directory);
- flushThread.setDaemon(true);
- flushThread.setPriority(Thread.MIN_PRIORITY);
- flushThread.start();
+ });
+ this.compactionThread = new BackgroundThread(
+ "TarMK compaction thread [" + directory + "]", -1,
+ new Runnable() {
+ @Override
+ public void run() {
+ compact();
+ }
+ });
log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
}
@@ -344,10 +332,6 @@ public class FileStore implements Segmen
}
public void flush() throws IOException {
- if (compactNeeded.getAndSet(false)) {
- compact();
- }
-
synchronized (persistedHead) {
RecordId before = persistedHead.get();
RecordId after = head.get();
@@ -482,17 +466,13 @@ public class FileStore implements Segmen
@Override
public void close() {
- try {
- // avoid deadlocks while joining the flush thread
- timeToClose.countDown();
- try {
- flushThread.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Interrupted while joining the TarMK flush thread",
e);
- }
+ // avoid deadlocks by closing (and joining) the background
+ // threads before acquiring the synchronization lock
+ compactionThread.close();
+ flushThread.close();
- synchronized (this) {
+ synchronized (this) {
+ try {
flush();
writer.close();
@@ -505,14 +485,14 @@ public class FileStore implements Segmen
journalLock.release();
journalFile.close();
-
- System.gc(); // for any memory-mappings that are no longer used
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to close the TarMK at " + directory, e);
}
- } catch (IOException e) {
- throw new RuntimeException(
- "Failed to close the TarMK at " + directory, e);
}
+ System.gc(); // for any memory-mappings that are no longer used
+
log.info("TarMK closed: {}", directory);
}
@@ -641,7 +621,7 @@ public class FileStore implements Segmen
@Override
public void gc() {
- compactNeeded.set(true);
+ compactionThread.trigger();
}
public Map<String, Set<UUID>> getTarReaderIndex() {