Author: frm
Date: Wed Sep 20 14:35:59 2017
New Revision: 1809034
URL: http://svn.apache.org/viewvc?rev=1809034&view=rev
Log:
OAK-6193 - Implement a safe, responsive shutdown protocol in FileStore
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1809034&r1=1809033&r2=1809034&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
Wed Sep 20 14:35:59 2017
@@ -56,7 +56,6 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -85,6 +84,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.WriterCacheManager;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry;
+import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser;
import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
@@ -157,13 +157,10 @@ public class FileStore extends AbstractF
*/
private final AtomicBoolean sufficientMemory = new AtomicBoolean(true);
- /**
- * Flag signalling shutdown of the file store
- */
- private volatile boolean shutdown;
-
private final FileStoreStats stats;
+ private final ShutDown shutDown = new ShutDown();
+
@Nonnull
private final SegmentNotFoundExceptionListener snfeListener;
@@ -211,7 +208,7 @@ public class FileStore extends AbstractF
new Runnable() {
@Override
public void run() {
- if (shutdown) {
+ if (shutDown.shutDownRequested()) {
return;
}
try {
@@ -245,9 +242,11 @@ public class FileStore extends AbstractF
}
FileStore bind(TarRevisions revisions) throws IOException {
- this.revisions = revisions;
- this.revisions.bind(this, tracker, initialNode());
- return this;
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ this.revisions = revisions;
+ this.revisions.bind(this, tracker, initialNode());
+ return this;
+ }
}
@Nonnull
@@ -281,7 +280,7 @@ public class FileStore extends AbstractF
*/
public Runnable getGCRunner() {
return new SafeRunnable(format("TarMK revision gc [%s]", directory),
() -> {
- try {
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
garbageCollector.run();
} catch (IOException e) {
log.error("Error running revision garbage collection", e);
@@ -300,44 +299,55 @@ public class FileStore extends AbstractF
* @return the size of this store.
*/
private long size() {
- return tarFiles.size();
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return tarFiles.size();
+ }
}
- public int readerCount(){
- return tarFiles.readerCount();
+ public int readerCount() {
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return tarFiles.readerCount();
+ }
}
public FileStoreStats getStats() {
return stats;
}
- public void flush() throws IOException {
+ private void doFlush() throws IOException {
if (revisions == null) {
return;
}
- revisions.flush(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- segmentWriter.flush();
- tarFiles.flush();
- stats.flushed();
- return null;
- }
+ revisions.flush(() -> {
+ segmentWriter.flush();
+ tarFiles.flush();
+ stats.flushed();
+ return null;
});
}
+ public void flush() throws IOException {
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ doFlush();
+ }
+ }
+
/**
* Run full garbage collection: estimation, compaction, cleanup.
*/
public void fullGC() throws IOException {
- garbageCollector.runFull();
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ garbageCollector.runFull();
+ }
}
/**
* Run tail garbage collection.
*/
public void tailGC() throws IOException {
- garbageCollector.runTail();
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ garbageCollector.runTail();
+ }
}
/**
@@ -345,7 +355,9 @@ public class FileStore extends AbstractF
* @return
*/
public GCEstimation estimateCompactionGain() {
- return garbageCollector.estimateCompactionGain();
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return garbageCollector.estimateCompactionGain();
+ }
}
/**
@@ -355,11 +367,15 @@ public class FileStore extends AbstractF
* @return {@code true} on success, {@code false} otherwise.
*/
public boolean compactFull() {
- return garbageCollector.compactFull().isSuccess();
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return garbageCollector.compactFull().isSuccess();
+ }
}
public boolean compactTail() {
- return garbageCollector.compactTail().isSuccess();
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return garbageCollector.compactTail().isSuccess();
+ }
}
/**
@@ -370,11 +386,13 @@ public class FileStore extends AbstractF
* skipping the reclaimed segments.
*/
public void cleanup() throws IOException {
- CompactionResult compactionResult = CompactionResult.skipped(
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ fileReaper.add(garbageCollector.cleanup(CompactionResult.skipped(
getGcGeneration(),
garbageCollector.gcOptions,
- revisions.getHead());
- fileReaper.add(garbageCollector.cleanup(compactionResult));
+ revisions.getHead()
+ )));
+ }
}
/**
@@ -390,7 +408,9 @@ public class FileStore extends AbstractF
* @param collector reference collector called back for each blob
reference found
*/
public void collectBlobReferences(Consumer<String> collector) throws
IOException {
- garbageCollector.collectBlobReferences(collector);
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ garbageCollector.collectBlobReferences(collector);
+ }
}
/**
@@ -404,42 +424,45 @@ public class FileStore extends AbstractF
@Override
@Nonnull
public SegmentWriter getWriter() {
- return segmentWriter;
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return segmentWriter;
+ }
}
@Override
@Nonnull
public TarRevisions getRevisions() {
- return revisions;
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return revisions;
+ }
}
@Override
public void close() {
- // Flag the store as shutting / shut down
- shutdown = true;
-
- // avoid deadlocks by closing (and joining) the background
- // thread before acquiring the synchronization lock
- fileStoreScheduler.close();
-
- try {
- flush();
- } catch (IOException e) {
- log.warn("Unable to flush the store", e);
- }
+ try (ShutDownCloser ignored = shutDown.shutDown()) {
+ // avoid deadlocks by closing (and joining) the background
+ // thread before acquiring the synchronization lock
+ fileStoreScheduler.close();
- Closer closer = Closer.create();
- closer.register(revisions);
- if (lock != null) {
try {
- lock.release();
+ doFlush();
} catch (IOException e) {
- log.warn("Unable to release the file lock", e);
+ log.warn("Unable to flush the store", e);
+ }
+
+ Closer closer = Closer.create();
+ closer.register(revisions);
+ if (lock != null) {
+ try {
+ lock.release();
+ } catch (IOException e) {
+ log.warn("Unable to release the file lock", e);
+ }
}
+ closer.register(lockFile);
+ closer.register(tarFiles);
+ closeAndLogOnFail(closer);
}
- closer.register(lockFile);
- closer.register(tarFiles);
- closeAndLogOnFail(closer);
// Try removing pending files in case the scheduler didn't have a
chance to run yet
System.gc(); // for any memory-mappings that are no longer used
@@ -450,19 +473,16 @@ public class FileStore extends AbstractF
@Override
public boolean containsSegment(SegmentId id) {
- return tarFiles.containsSegment(id.getMostSignificantBits(),
id.getLeastSignificantBits());
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return tarFiles.containsSegment(id.getMostSignificantBits(),
id.getLeastSignificantBits());
+ }
}
@Override
@Nonnull
public Segment readSegment(final SegmentId id) {
- try {
- return segmentCache.getSegment(id, new Callable<Segment>() {
- @Override
- public Segment call() throws Exception {
- return readSegmentUncached(tarFiles, id);
- }
- });
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ return segmentCache.getSegment(id, () ->
readSegmentUncached(tarFiles, id));
} catch (ExecutionException e) {
SegmentNotFoundException snfe = asSegmentNotFoundException(e, id);
snfeListener.notify(id, snfe);
@@ -472,34 +492,35 @@ public class FileStore extends AbstractF
@Override
public void writeSegment(SegmentId id, byte[] buffer, int offset, int
length) throws IOException {
- Segment segment = null;
+ try (ShutDownCloser ignored = shutDown.keepAlive()) {
+ Segment segment = null;
- // If the segment is a data segment, create a new instance of Segment
to
- // access some internal information stored in the segment and to store
- // in an in-memory cache for later use.
-
- GCGeneration generation = GCGeneration.NULL;
- Set<UUID> references = null;
- Set<String> binaryReferences = null;
-
- if (id.isDataSegmentId()) {
- ByteBuffer data;
-
- if (offset > 4096) {
- data = ByteBuffer.allocate(length);
- data.put(buffer, offset, length);
- data.rewind();
- } else {
- data = ByteBuffer.wrap(buffer, offset, length);
- }
+ // If the segment is a data segment, create a new instance of
Segment to
+ // access some internal information stored in the segment and to
store
+ // in an in-memory cache for later use.
+
+ GCGeneration generation = GCGeneration.NULL;
+ Set<UUID> references = null;
+ Set<String> binaryReferences = null;
+
+ if (id.isDataSegmentId()) {
+ ByteBuffer data;
+
+ if (offset > 4096) {
+ data = ByteBuffer.allocate(length);
+ data.put(buffer, offset, length);
+ data.rewind();
+ } else {
+ data = ByteBuffer.wrap(buffer, offset, length);
+ }
- segment = new Segment(tracker, segmentReader, id, data);
- generation = segment.getGcGeneration();
- references = readReferences(segment);
- binaryReferences = readBinaryReferences(segment);
- }
+ segment = new Segment(tracker, segmentReader, id, data);
+ generation = segment.getGcGeneration();
+ references = readReferences(segment);
+ binaryReferences = readBinaryReferences(segment);
+ }
- tarFiles.writeSegment(
+ tarFiles.writeSegment(
id.asUUID(),
buffer,
offset,
@@ -507,11 +528,12 @@ public class FileStore extends AbstractF
generation,
references,
binaryReferences
- );
+ );
- // Keep this data segment in memory as it's likely to be accessed soon.
- if (segment != null) {
- segmentCache.putSegment(segment);
+ // Keep this data segment in memory as it's likely to be accessed
soon.
+ if (segment != null) {
+ segmentCache.putSegment(segment);
+ }
}
}
@@ -1116,7 +1138,7 @@ public class FileStore extends AbstractF
reason = "Not enough memory";
return true;
}
- if (store.shutdown) {
+ if (store.shutDown.shutDownRequested()) {
reason = "The FileStore is shutting down";
return true;
}
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java?rev=1809034&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
Wed Sep 20 14:35:59 2017
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+class ShutDown {
+
+ interface ShutDownCloser extends AutoCloseable {
+
+ void close();
+
+ }
+
+ private volatile boolean shutDownRequested;
+
+ private boolean shutDown;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ ShutDownCloser keepAlive() {
+ lock.readLock().lock();
+
+ if (shutDown) {
+ lock.readLock().unlock();
+ throw new IllegalStateException("already shut down");
+ }
+
+ return () -> lock.readLock().unlock();
+ }
+
+ ShutDownCloser shutDown() {
+ shutDownRequested = true;
+ lock.writeLock().lock();
+
+ if (shutDown) {
+ lock.writeLock().unlock();
+ throw new IllegalStateException("already shut down");
+ }
+
+ return () -> {
+ shutDown = true;
+ lock.writeLock().unlock();
+ };
+ }
+
+ boolean shutDownRequested() {
+ return shutDownRequested;
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
------------------------------------------------------------------------------
svn:eol-style = native