Author: frm
Date: Wed Sep 20 14:35:59 2017
New Revision: 1809034

URL: http://svn.apache.org/viewvc?rev=1809034&view=rev
Log:
OAK-6193 - Implement a safe, responsive shutdown protocol in FileStore

Added:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1809034&r1=1809033&r2=1809034&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
 Wed Sep 20 14:35:59 2017
@@ -56,7 +56,6 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -85,6 +84,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.WriterCacheManager;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry;
+import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser;
 import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
@@ -157,13 +157,10 @@ public class FileStore extends AbstractF
      */
     private final AtomicBoolean sufficientMemory = new AtomicBoolean(true);
 
-    /**
-     * Flag signalling shutdown of the file store
-     */
-    private volatile boolean shutdown;
-
     private final FileStoreStats stats;
 
+    private final ShutDown shutDown = new ShutDown();
+
     @Nonnull
     private final SegmentNotFoundExceptionListener snfeListener;
 
@@ -211,7 +208,7 @@ public class FileStore extends AbstractF
                 new Runnable() {
                     @Override
                     public void run() {
-                        if (shutdown) {
+                        if (shutDown.shutDownRequested()) {
                             return;
                         }
                         try {
@@ -245,9 +242,11 @@ public class FileStore extends AbstractF
     }
 
     FileStore bind(TarRevisions revisions) throws IOException {
-        this.revisions = revisions;
-        this.revisions.bind(this, tracker, initialNode());
-        return this;
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            this.revisions = revisions;
+            this.revisions.bind(this, tracker, initialNode());
+            return this;
+        }
     }
 
     @Nonnull
@@ -281,7 +280,7 @@ public class FileStore extends AbstractF
      */
     public Runnable getGCRunner() {
         return new SafeRunnable(format("TarMK revision gc [%s]", directory), 
() -> {
-            try {
+            try (ShutDownCloser ignored = shutDown.keepAlive()) {
                 garbageCollector.run();
             } catch (IOException e) {
                 log.error("Error running revision garbage collection", e);
@@ -300,44 +299,55 @@ public class FileStore extends AbstractF
      * @return the size of this store.
      */
     private long size() {
-        return tarFiles.size();
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return tarFiles.size();
+        }
     }
 
-    public int readerCount(){
-        return tarFiles.readerCount();
+    public int readerCount() {
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return tarFiles.readerCount();
+        }
     }
 
     public FileStoreStats getStats() {
         return stats;
     }
 
-    public void flush() throws IOException {
+    private void doFlush() throws IOException {
         if (revisions == null) {
             return;
         }
-        revisions.flush(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                segmentWriter.flush();
-                tarFiles.flush();
-                stats.flushed();
-                return null;
-            }
+        revisions.flush(() -> {
+            segmentWriter.flush();
+            tarFiles.flush();
+            stats.flushed();
+            return null;
         });
     }
 
+    public void flush() throws IOException {
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            doFlush();
+        }
+    }
+
     /**
      * Run full garbage collection: estimation, compaction, cleanup.
      */
     public void fullGC() throws IOException {
-        garbageCollector.runFull();
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            garbageCollector.runFull();
+        }
     }
 
     /**
      * Run tail garbage collection.
      */
     public void tailGC() throws IOException {
-        garbageCollector.runTail();
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            garbageCollector.runTail();
+        }
     }
 
     /**
@@ -345,7 +355,9 @@ public class FileStore extends AbstractF
      * @return
      */
     public GCEstimation estimateCompactionGain() {
-        return garbageCollector.estimateCompactionGain();
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return garbageCollector.estimateCompactionGain();
+        }
     }
 
     /**
@@ -355,11 +367,15 @@ public class FileStore extends AbstractF
      * @return {@code true} on success, {@code false} otherwise.
      */
     public boolean compactFull() {
-        return garbageCollector.compactFull().isSuccess();
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return garbageCollector.compactFull().isSuccess();
+        }
     }
 
     public boolean compactTail() {
-        return garbageCollector.compactTail().isSuccess();
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return garbageCollector.compactTail().isSuccess();
+        }
     }
 
     /**
@@ -370,11 +386,13 @@ public class FileStore extends AbstractF
      * skipping the reclaimed segments.
      */
     public void cleanup() throws IOException {
-        CompactionResult compactionResult = CompactionResult.skipped(
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            fileReaper.add(garbageCollector.cleanup(CompactionResult.skipped(
                 getGcGeneration(),
                 garbageCollector.gcOptions,
-                revisions.getHead());
-        fileReaper.add(garbageCollector.cleanup(compactionResult));
+                revisions.getHead()
+            )));
+        }
     }
 
     /**
@@ -390,7 +408,9 @@ public class FileStore extends AbstractF
      * @param collector  reference collector called back for each blob 
reference found
      */
     public void collectBlobReferences(Consumer<String> collector) throws 
IOException {
-        garbageCollector.collectBlobReferences(collector);
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            garbageCollector.collectBlobReferences(collector);
+        }
     }
 
     /**
@@ -404,42 +424,45 @@ public class FileStore extends AbstractF
     @Override
     @Nonnull
     public SegmentWriter getWriter() {
-        return segmentWriter;
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return segmentWriter;
+        }
     }
 
     @Override
     @Nonnull
     public TarRevisions getRevisions() {
-        return revisions;
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return revisions;
+        }
     }
 
     @Override
     public void close() {
-        // Flag the store as shutting / shut down
-        shutdown = true;
-
-        // avoid deadlocks by closing (and joining) the background
-        // thread before acquiring the synchronization lock
-        fileStoreScheduler.close();
-
-        try {
-            flush();
-        } catch (IOException e) {
-            log.warn("Unable to flush the store", e);
-        }
+        try (ShutDownCloser ignored = shutDown.shutDown()) {
+            // avoid deadlocks by closing (and joining) the background
+            // thread before acquiring the synchronization lock
+            fileStoreScheduler.close();
 
-        Closer closer = Closer.create();
-        closer.register(revisions);
-        if (lock != null) {
             try {
-                lock.release();
+                doFlush();
             } catch (IOException e) {
-                log.warn("Unable to release the file lock", e);
+                log.warn("Unable to flush the store", e);
+            }
+
+            Closer closer = Closer.create();
+            closer.register(revisions);
+            if (lock != null) {
+                try {
+                    lock.release();
+                } catch (IOException e) {
+                    log.warn("Unable to release the file lock", e);
+                }
             }
+            closer.register(lockFile);
+            closer.register(tarFiles);
+            closeAndLogOnFail(closer);
         }
-        closer.register(lockFile);
-        closer.register(tarFiles);
-        closeAndLogOnFail(closer);
 
         // Try removing pending files in case the scheduler didn't have a 
chance to run yet
         System.gc(); // for any memory-mappings that are no longer used
@@ -450,19 +473,16 @@ public class FileStore extends AbstractF
 
     @Override
     public boolean containsSegment(SegmentId id) {
-        return tarFiles.containsSegment(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return tarFiles.containsSegment(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
+        }
     }
 
     @Override
     @Nonnull
     public Segment readSegment(final SegmentId id) {
-        try {
-            return segmentCache.getSegment(id, new Callable<Segment>() {
-                @Override
-                public Segment call() throws Exception {
-                    return readSegmentUncached(tarFiles, id);
-                }
-            });
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            return segmentCache.getSegment(id, () -> 
readSegmentUncached(tarFiles, id));
         } catch (ExecutionException e) {
             SegmentNotFoundException snfe = asSegmentNotFoundException(e, id);
             snfeListener.notify(id, snfe);
@@ -472,34 +492,35 @@ public class FileStore extends AbstractF
 
     @Override
     public void writeSegment(SegmentId id, byte[] buffer, int offset, int 
length) throws IOException {
-        Segment segment = null;
+        try (ShutDownCloser ignored = shutDown.keepAlive()) {
+            Segment segment = null;
 
-        // If the segment is a data segment, create a new instance of Segment 
to
-        // access some internal information stored in the segment and to store
-        // in an in-memory cache for later use.
-
-        GCGeneration generation = GCGeneration.NULL;
-        Set<UUID> references = null;
-        Set<String> binaryReferences = null;
-
-        if (id.isDataSegmentId()) {
-            ByteBuffer data;
-
-            if (offset > 4096) {
-                data = ByteBuffer.allocate(length);
-                data.put(buffer, offset, length);
-                data.rewind();
-            } else {
-                data = ByteBuffer.wrap(buffer, offset, length);
-            }
+            // If the segment is a data segment, create a new instance of 
Segment to
+            // access some internal information stored in the segment and to 
store
+            // in an in-memory cache for later use.
+
+            GCGeneration generation = GCGeneration.NULL;
+            Set<UUID> references = null;
+            Set<String> binaryReferences = null;
+
+            if (id.isDataSegmentId()) {
+                ByteBuffer data;
+
+                if (offset > 4096) {
+                    data = ByteBuffer.allocate(length);
+                    data.put(buffer, offset, length);
+                    data.rewind();
+                } else {
+                    data = ByteBuffer.wrap(buffer, offset, length);
+                }
 
-            segment = new Segment(tracker, segmentReader, id, data);
-            generation = segment.getGcGeneration();
-            references = readReferences(segment);
-            binaryReferences = readBinaryReferences(segment);
-        }
+                segment = new Segment(tracker, segmentReader, id, data);
+                generation = segment.getGcGeneration();
+                references = readReferences(segment);
+                binaryReferences = readBinaryReferences(segment);
+            }
 
-        tarFiles.writeSegment(
+            tarFiles.writeSegment(
                 id.asUUID(),
                 buffer,
                 offset,
@@ -507,11 +528,12 @@ public class FileStore extends AbstractF
                 generation,
                 references,
                 binaryReferences
-        );
+            );
 
-        // Keep this data segment in memory as it's likely to be accessed soon.
-        if (segment != null) {
-            segmentCache.putSegment(segment);
+            // Keep this data segment in memory as it's likely to be accessed 
soon.
+            if (segment != null) {
+                segmentCache.putSegment(segment);
+            }
         }
     }
 
@@ -1116,7 +1138,7 @@ public class FileStore extends AbstractF
                     reason = "Not enough memory";
                     return true;
                 }
-                if (store.shutdown) {
+                if (store.shutDown.shutDownRequested()) {
                     reason = "The FileStore is shutting down";
                     return true;
                 }

Added: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java?rev=1809034&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
 Wed Sep 20 14:35:59 2017
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+class ShutDown {
+
+    interface ShutDownCloser extends AutoCloseable {
+
+        void close();
+
+    }
+
+    private volatile boolean shutDownRequested;
+
+    private boolean shutDown;
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    ShutDownCloser keepAlive() {
+        lock.readLock().lock();
+
+        if (shutDown) {
+            lock.readLock().unlock();
+            throw new IllegalStateException("already shut down");
+        }
+
+        return () -> lock.readLock().unlock();
+    }
+
+    ShutDownCloser shutDown() {
+        shutDownRequested = true;
+        lock.writeLock().lock();
+
+        if (shutDown) {
+            lock.writeLock().unlock();
+            throw new IllegalStateException("already shut down");
+        }
+
+        return () -> {
+            shutDown = true;
+            lock.writeLock().unlock();
+        };
+    }
+
+    boolean shutDownRequested() {
+        return shutDownRequested;
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ShutDown.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to