Author: mduerig
Date: Thu Jun  2 14:02:33 2016
New Revision: 1746580

URL: http://svn.apache.org/viewvc?rev=1746580&view=rev
Log:
OAK-4291: FileStore.flush prone to races leading to corruption
Make SegmentBufferWriterPool.flush synchrnous again but avoid flusing segments 
while holding locks

Added:
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java?rev=1746580&r1=1746579&r2=1746580&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
 Thu Jun  2 14:02:33 2016
@@ -34,6 +34,8 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 
 import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
 
 /**
  * This {@link WriteOperationHandler} uses a pool of {@link 
SegmentBufferWriter}s,
@@ -43,8 +45,27 @@ import com.google.common.base.Supplier;
  * {@link SegmentWriter}.
  */
 public class SegmentBufferWriterPool implements WriteOperationHandler {
+
+    /**
+     * Monitor protecting the state of this pool. Neither of {@link #writers},
+     * {@link #borrowed} and {@link #disposed} must be modified without owning
+     * this monitor.
+     */
+    private final Monitor poolMonitor = new Monitor(true);
+
+    /**
+     * Pool of current writers that are not in use
+     */
     private final Map<Object, SegmentBufferWriter> writers = newHashMap();
+
+    /**
+     * Writers that are currently in use
+     */
     private final Set<SegmentBufferWriter> borrowed = newHashSet();
+
+    /**
+     * Retired writers that have not yet been flushed
+     */
     private final Set<SegmentBufferWriter> disposed = newHashSet();
 
     @Nonnull
@@ -95,38 +116,111 @@ public class SegmentBufferWriterPool imp
     @Override
     public void flush() throws IOException {
         List<SegmentBufferWriter> toFlush = newArrayList();
-        synchronized (this) {
+        List<SegmentBufferWriter> toReturn = newArrayList();
+
+        poolMonitor.enter();
+        try {
+            // Collect all writers that are not currently in use and clear
+            // the list so they won't get re-used anymore.
             toFlush.addAll(writers.values());
-            toFlush.addAll(disposed);
             writers.clear();
-            disposed.clear();
+
+            // Collect all borrowed writers, which we need to wait for.
+            // Clear the list so they will get disposed once returned.
+            toReturn.addAll(borrowed);
             borrowed.clear();
+        } finally {
+            poolMonitor.leave();
+        }
+
+        // Wait for the return of the borrowed writers. This is the
+        // case once all of them appear in the disposed set.
+        if (safeEnterWhen(poolMonitor, allReturned(toReturn))) {
+            try {
+                // Collect all disposed writers and clear the list to mark them
+                // as flushed.
+                toFlush.addAll(toReturn);
+                disposed.removeAll(toReturn);
+            } finally {
+                poolMonitor.leave();
+            }
         }
-        // Call flush from outside a synchronized context to avoid
+
+        // Call flush from outside the pool monitor to avoid potential
         // deadlocks of that method calling SegmentStore.writeSegment
         for (SegmentBufferWriter writer : toFlush) {
             writer.flush();
         }
     }
 
-    private synchronized SegmentBufferWriter borrowWriter(Object key) {
-        SegmentBufferWriter writer = writers.remove(key);
-        if (writer == null) {
-            writer = new SegmentBufferWriter(store, tracker, reader, version, 
getWriterId(wid), gcGeneration.get());
-        } else if (writer.getGeneration() != gcGeneration.get()) {
-            disposed.add(writer);
-            writer = new SegmentBufferWriter(store, tracker, reader, version, 
getWriterId(wid), gcGeneration.get());
-        }
-        borrowed.add(writer);
-        return writer;
-    }
-
-    private synchronized void returnWriter(Object key, SegmentBufferWriter 
writer) {
-        if (borrowed.remove(writer)) {
-            checkState(writers.put(key, writer) == null);
-        } else {
-            // Defer flush this writer as it was borrowed while flush() was 
called.
-            disposed.add(writer);
+    /**
+     * Create a {@code Guard} that is satisfied if and only if {@link 
#disposed}
+     * contains all items in {@code toReturn}
+     */
+    @Nonnull
+    private Guard allReturned(final List<SegmentBufferWriter> toReturn) {
+        return new Guard(poolMonitor) {
+
+            @Override
+            public boolean isSatisfied() {
+                return disposed.containsAll(toReturn);
+            }
+
+        };
+    }
+
+    /**
+     * Same as {@code monitor.enterWhen(guard)} but copes with that pesky 
{@code
+     * InterruptedException} by catching it and setting this thread's
+     * interrupted flag.
+     */
+    private static boolean safeEnterWhen(Monitor monitor, Guard guard) {
+        try {
+            monitor.enterWhen(guard);
+            return true;
+        } catch (InterruptedException ignore) {
+            Thread.currentThread().interrupt();
+            return false;
+        }
+    }
+
+    /**
+     * Return a writer from the pool by its {@code key}. This method may return
+     * a fresh writer at any time. Callers need to return a writer before
+     * borrowing it again. Failing to do so leads to undefined behaviour.
+     */
+    private SegmentBufferWriter borrowWriter(Object key) {
+        poolMonitor.enter();
+        try {
+            SegmentBufferWriter writer = writers.remove(key);
+            if (writer == null) {
+                writer = new SegmentBufferWriter(store, tracker, reader, 
version, getWriterId(wid), gcGeneration.get());
+            } else if (writer.getGeneration() != gcGeneration.get()) {
+                disposed.add(writer);
+                writer = new SegmentBufferWriter(store, tracker, reader, 
version, getWriterId(wid), gcGeneration.get());
+            }
+            borrowed.add(writer);
+            return writer;
+        } finally {
+            poolMonitor.leave();
+        }
+    }
+
+    /**
+     * Return a writer to the pool using the {@code key} that was used to 
borrow
+     * it.
+     */
+    private void returnWriter(Object key, SegmentBufferWriter writer) {
+        poolMonitor.enter();
+        try {
+            if (borrowed.remove(writer)) {
+                checkState(writers.put(key, writer) == null);
+            } else {
+                // Defer flush this writer as it was borrowed while flush() 
was called.
+                disposed.add(writer);
+            }
+        } finally {
+            poolMonitor.leave();
         }
     }
 

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java?rev=1746580&r1=1746579&r2=1746580&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
 Thu Jun  2 14:02:33 2016
@@ -58,9 +58,6 @@ interface WriteOperationHandler {
 
     /**
      * Flush any pending changes on any {@link SegmentBufferWriter} managed by 
this instance.
-     * This method <em>does not block</em> to wait for concurrent write 
operations. However, if
-     * a write operation is currently in progress a call to this method 
ensures the respective
-     * changes are properly flushed at the end of that call.
      * @throws IOException
      */
     void flush() throws IOException;

Added: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java?rev=1746580&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
 Thu Jun  2 14:02:33 2016
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.collect.Maps.newConcurrentMap;
+import static com.google.common.collect.Sets.intersection;
+import static com.google.common.collect.Sets.newHashSet;
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Suppliers;
+import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
+import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
+import org.junit.After;
+import org.junit.Test;
+
+public class SegmentBufferWriterPoolTest {
+    private final MemoryStore store = new MemoryStore();
+
+    private final RecordId rootId = store.getRevisions().getHead();
+
+    private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool(
+        store, store.getTracker(), store.getReader(), LATEST_VERSION, "", 
Suppliers.ofInstance(0));
+
+    private final ExecutorService[] executors = new ExecutorService[] {
+        newSingleThreadExecutor(), newSingleThreadExecutor(), 
newSingleThreadExecutor()};
+
+    public SegmentBufferWriterPoolTest() throws IOException { }
+
+    @After
+    public void tearDown() {
+        for (ExecutorService executor : executors) {
+            executor.shutdown();
+        }
+    }
+
+    private Future<RecordId> execute(final WriteOperation op, int executor) {
+        return executors[executor].submit(new Callable<RecordId>() {
+            @Override
+            public RecordId call() throws Exception {
+                return pool.execute(op);
+            }
+        });
+    }
+
+    private WriteOperation createOp(final String key, final 
ConcurrentMap<String, SegmentBufferWriter> map) {
+        return new WriteOperation() {
+            @Nonnull @Override
+            public RecordId execute(@Nonnull SegmentBufferWriter writer) {
+                map.put(key, writer);
+                return rootId;
+            }
+        };
+    }
+
+    @Test
+    public void testThreadAffinity() throws IOException, ExecutionException, 
InterruptedException {
+        ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
+        Future<RecordId> res1 = execute(createOp("a", map1), 0);
+        Future<RecordId> res2 = execute(createOp("b", map1), 1);
+        Future<RecordId> res3 = execute(createOp("c", map1), 2);
+
+        // Give the tasks some time to complete
+        sleepUninterruptibly(10, MILLISECONDS);
+
+        assertEquals(rootId, res1.get());
+        assertEquals(rootId, res2.get());
+        assertEquals(rootId, res3.get());
+        assertEquals(3, map1.size());
+
+        ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
+        Future<RecordId> res4 = execute(createOp("a", map2), 0);
+        Future<RecordId> res5 = execute(createOp("b", map2), 1);
+        Future<RecordId> res6 = execute(createOp("c", map2), 2);
+
+        // Give the tasks some time to complete
+        sleepUninterruptibly(10, MILLISECONDS);
+
+        assertEquals(rootId, res4.get());
+        assertEquals(rootId, res5.get());
+        assertEquals(rootId, res6.get());
+        assertEquals(3, map2.size());
+        assertEquals(map1, map2);
+    }
+
+    @Test
+    public void testFlush() throws ExecutionException, InterruptedException, 
IOException {
+        ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
+        Future<RecordId> res1 = execute(createOp("a", map1), 0);
+        Future<RecordId> res2 = execute(createOp("b", map1), 1);
+        Future<RecordId> res3 = execute(createOp("c", map1), 2);
+
+        // Give the tasks some time to complete
+        sleepUninterruptibly(10, MILLISECONDS);
+
+        assertEquals(rootId, res1.get());
+        assertEquals(rootId, res2.get());
+        assertEquals(rootId, res3.get());
+        assertEquals(3, map1.size());
+
+        pool.flush();
+
+        ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
+        Future<RecordId> res4 = execute(createOp("a", map2), 0);
+        Future<RecordId> res5 = execute(createOp("b", map2), 1);
+        Future<RecordId> res6 = execute(createOp("c", map2), 2);
+
+        // Give the tasks some time to complete
+        sleepUninterruptibly(10, MILLISECONDS);
+
+        assertEquals(rootId, res4.get());
+        assertEquals(rootId, res5.get());
+        assertEquals(rootId, res6.get());
+        assertEquals(3, map2.size());
+        assertTrue(intersection(newHashSet(map1.values()), 
newHashSet(map2.values())).isEmpty());
+    }
+
+    @Test
+    public void testFlushBlocks() throws ExecutionException, 
InterruptedException {
+        Future<RecordId> res = execute(new WriteOperation() {
+            @CheckForNull @Override
+            public RecordId execute(@Nonnull SegmentBufferWriter writer) {
+                try {
+                    // This should deadlock as flush waits for this write
+                    // operation to finish, which in this case contains the
+                    // call to flush itself.
+                    executors[1].submit(new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            pool.flush();
+                            return null;
+                        }
+                    }).get(100, MILLISECONDS);
+                    return null;    // No deadlock -> null indicates test 
failure
+                } catch (InterruptedException | ExecutionException ignore) {
+                    return null;    // No deadlock -> null indicates test 
failure
+                } catch (TimeoutException ignore) {
+                    return rootId;  // Deadlock -> rootId indicates test pass
+                }
+            }
+        }, 0);
+
+        assertEquals(rootId, res.get());
+    }
+
+}


Reply via email to