Author: mduerig
Date: Thu Jun 2 14:02:33 2016
New Revision: 1746580
URL: http://svn.apache.org/viewvc?rev=1746580&view=rev
Log:
OAK-4291: FileStore.flush prone to races leading to corruption
Make SegmentBufferWriterPool.flush synchrnous again but avoid flusing segments
while holding locks
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java?rev=1746580&r1=1746579&r2=1746580&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
Thu Jun 2 14:02:33 2016
@@ -34,6 +34,8 @@ import java.util.Set;
import javax.annotation.Nonnull;
import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
/**
* This {@link WriteOperationHandler} uses a pool of {@link
SegmentBufferWriter}s,
@@ -43,8 +45,27 @@ import com.google.common.base.Supplier;
* {@link SegmentWriter}.
*/
public class SegmentBufferWriterPool implements WriteOperationHandler {
+
+ /**
+ * Monitor protecting the state of this pool. Neither of {@link #writers},
+ * {@link #borrowed} and {@link #disposed} must be modified without owning
+ * this monitor.
+ */
+ private final Monitor poolMonitor = new Monitor(true);
+
+ /**
+ * Pool of current writers that are not in use
+ */
private final Map<Object, SegmentBufferWriter> writers = newHashMap();
+
+ /**
+ * Writers that are currently in use
+ */
private final Set<SegmentBufferWriter> borrowed = newHashSet();
+
+ /**
+ * Retired writers that have not yet been flushed
+ */
private final Set<SegmentBufferWriter> disposed = newHashSet();
@Nonnull
@@ -95,38 +116,111 @@ public class SegmentBufferWriterPool imp
@Override
public void flush() throws IOException {
List<SegmentBufferWriter> toFlush = newArrayList();
- synchronized (this) {
+ List<SegmentBufferWriter> toReturn = newArrayList();
+
+ poolMonitor.enter();
+ try {
+ // Collect all writers that are not currently in use and clear
+ // the list so they won't get re-used anymore.
toFlush.addAll(writers.values());
- toFlush.addAll(disposed);
writers.clear();
- disposed.clear();
+
+ // Collect all borrowed writers, which we need to wait for.
+ // Clear the list so they will get disposed once returned.
+ toReturn.addAll(borrowed);
borrowed.clear();
+ } finally {
+ poolMonitor.leave();
+ }
+
+ // Wait for the return of the borrowed writers. This is the
+ // case once all of them appear in the disposed set.
+ if (safeEnterWhen(poolMonitor, allReturned(toReturn))) {
+ try {
+ // Collect all disposed writers and clear the list to mark them
+ // as flushed.
+ toFlush.addAll(toReturn);
+ disposed.removeAll(toReturn);
+ } finally {
+ poolMonitor.leave();
+ }
}
- // Call flush from outside a synchronized context to avoid
+
+ // Call flush from outside the pool monitor to avoid potential
// deadlocks of that method calling SegmentStore.writeSegment
for (SegmentBufferWriter writer : toFlush) {
writer.flush();
}
}
- private synchronized SegmentBufferWriter borrowWriter(Object key) {
- SegmentBufferWriter writer = writers.remove(key);
- if (writer == null) {
- writer = new SegmentBufferWriter(store, tracker, reader, version,
getWriterId(wid), gcGeneration.get());
- } else if (writer.getGeneration() != gcGeneration.get()) {
- disposed.add(writer);
- writer = new SegmentBufferWriter(store, tracker, reader, version,
getWriterId(wid), gcGeneration.get());
- }
- borrowed.add(writer);
- return writer;
- }
-
- private synchronized void returnWriter(Object key, SegmentBufferWriter
writer) {
- if (borrowed.remove(writer)) {
- checkState(writers.put(key, writer) == null);
- } else {
- // Defer flush this writer as it was borrowed while flush() was
called.
- disposed.add(writer);
+ /**
+ * Create a {@code Guard} that is satisfied if and only if {@link
#disposed}
+ * contains all items in {@code toReturn}
+ */
+ @Nonnull
+ private Guard allReturned(final List<SegmentBufferWriter> toReturn) {
+ return new Guard(poolMonitor) {
+
+ @Override
+ public boolean isSatisfied() {
+ return disposed.containsAll(toReturn);
+ }
+
+ };
+ }
+
+ /**
+ * Same as {@code monitor.enterWhen(guard)} but copes with that pesky
{@code
+ * InterruptedException} by catching it and setting this thread's
+ * interrupted flag.
+ */
+ private static boolean safeEnterWhen(Monitor monitor, Guard guard) {
+ try {
+ monitor.enterWhen(guard);
+ return true;
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ /**
+ * Return a writer from the pool by its {@code key}. This method may return
+ * a fresh writer at any time. Callers need to return a writer before
+ * borrowing it again. Failing to do so leads to undefined behaviour.
+ */
+ private SegmentBufferWriter borrowWriter(Object key) {
+ poolMonitor.enter();
+ try {
+ SegmentBufferWriter writer = writers.remove(key);
+ if (writer == null) {
+ writer = new SegmentBufferWriter(store, tracker, reader,
version, getWriterId(wid), gcGeneration.get());
+ } else if (writer.getGeneration() != gcGeneration.get()) {
+ disposed.add(writer);
+ writer = new SegmentBufferWriter(store, tracker, reader,
version, getWriterId(wid), gcGeneration.get());
+ }
+ borrowed.add(writer);
+ return writer;
+ } finally {
+ poolMonitor.leave();
+ }
+ }
+
+ /**
+ * Return a writer to the pool using the {@code key} that was used to
borrow
+ * it.
+ */
+ private void returnWriter(Object key, SegmentBufferWriter writer) {
+ poolMonitor.enter();
+ try {
+ if (borrowed.remove(writer)) {
+ checkState(writers.put(key, writer) == null);
+ } else {
+ // Defer flush this writer as it was borrowed while flush()
was called.
+ disposed.add(writer);
+ }
+ } finally {
+ poolMonitor.leave();
}
}
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java?rev=1746580&r1=1746579&r2=1746580&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
Thu Jun 2 14:02:33 2016
@@ -58,9 +58,6 @@ interface WriteOperationHandler {
/**
* Flush any pending changes on any {@link SegmentBufferWriter} managed by
this instance.
- * This method <em>does not block</em> to wait for concurrent write
operations. However, if
- * a write operation is currently in progress a call to this method
ensures the respective
- * changes are properly flushed at the end of that call.
* @throws IOException
*/
void flush() throws IOException;
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java?rev=1746580&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
Thu Jun 2 14:02:33 2016
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.collect.Maps.newConcurrentMap;
+import static com.google.common.collect.Sets.intersection;
+import static com.google.common.collect.Sets.newHashSet;
+import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Suppliers;
+import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
+import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
+import org.junit.After;
+import org.junit.Test;
+
+public class SegmentBufferWriterPoolTest {
+ private final MemoryStore store = new MemoryStore();
+
+ private final RecordId rootId = store.getRevisions().getHead();
+
+ private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool(
+ store, store.getTracker(), store.getReader(), LATEST_VERSION, "",
Suppliers.ofInstance(0));
+
+ private final ExecutorService[] executors = new ExecutorService[] {
+ newSingleThreadExecutor(), newSingleThreadExecutor(),
newSingleThreadExecutor()};
+
+ public SegmentBufferWriterPoolTest() throws IOException { }
+
+ @After
+ public void tearDown() {
+ for (ExecutorService executor : executors) {
+ executor.shutdown();
+ }
+ }
+
+ private Future<RecordId> execute(final WriteOperation op, int executor) {
+ return executors[executor].submit(new Callable<RecordId>() {
+ @Override
+ public RecordId call() throws Exception {
+ return pool.execute(op);
+ }
+ });
+ }
+
+ private WriteOperation createOp(final String key, final
ConcurrentMap<String, SegmentBufferWriter> map) {
+ return new WriteOperation() {
+ @Nonnull @Override
+ public RecordId execute(@Nonnull SegmentBufferWriter writer) {
+ map.put(key, writer);
+ return rootId;
+ }
+ };
+ }
+
+ @Test
+ public void testThreadAffinity() throws IOException, ExecutionException,
InterruptedException {
+ ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
+ Future<RecordId> res1 = execute(createOp("a", map1), 0);
+ Future<RecordId> res2 = execute(createOp("b", map1), 1);
+ Future<RecordId> res3 = execute(createOp("c", map1), 2);
+
+ // Give the tasks some time to complete
+ sleepUninterruptibly(10, MILLISECONDS);
+
+ assertEquals(rootId, res1.get());
+ assertEquals(rootId, res2.get());
+ assertEquals(rootId, res3.get());
+ assertEquals(3, map1.size());
+
+ ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
+ Future<RecordId> res4 = execute(createOp("a", map2), 0);
+ Future<RecordId> res5 = execute(createOp("b", map2), 1);
+ Future<RecordId> res6 = execute(createOp("c", map2), 2);
+
+ // Give the tasks some time to complete
+ sleepUninterruptibly(10, MILLISECONDS);
+
+ assertEquals(rootId, res4.get());
+ assertEquals(rootId, res5.get());
+ assertEquals(rootId, res6.get());
+ assertEquals(3, map2.size());
+ assertEquals(map1, map2);
+ }
+
+ @Test
+ public void testFlush() throws ExecutionException, InterruptedException,
IOException {
+ ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
+ Future<RecordId> res1 = execute(createOp("a", map1), 0);
+ Future<RecordId> res2 = execute(createOp("b", map1), 1);
+ Future<RecordId> res3 = execute(createOp("c", map1), 2);
+
+ // Give the tasks some time to complete
+ sleepUninterruptibly(10, MILLISECONDS);
+
+ assertEquals(rootId, res1.get());
+ assertEquals(rootId, res2.get());
+ assertEquals(rootId, res3.get());
+ assertEquals(3, map1.size());
+
+ pool.flush();
+
+ ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
+ Future<RecordId> res4 = execute(createOp("a", map2), 0);
+ Future<RecordId> res5 = execute(createOp("b", map2), 1);
+ Future<RecordId> res6 = execute(createOp("c", map2), 2);
+
+ // Give the tasks some time to complete
+ sleepUninterruptibly(10, MILLISECONDS);
+
+ assertEquals(rootId, res4.get());
+ assertEquals(rootId, res5.get());
+ assertEquals(rootId, res6.get());
+ assertEquals(3, map2.size());
+ assertTrue(intersection(newHashSet(map1.values()),
newHashSet(map2.values())).isEmpty());
+ }
+
+ @Test
+ public void testFlushBlocks() throws ExecutionException,
InterruptedException {
+ Future<RecordId> res = execute(new WriteOperation() {
+ @CheckForNull @Override
+ public RecordId execute(@Nonnull SegmentBufferWriter writer) {
+ try {
+ // This should deadlock as flush waits for this write
+ // operation to finish, which in this case contains the
+ // call to flush itself.
+ executors[1].submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ pool.flush();
+ return null;
+ }
+ }).get(100, MILLISECONDS);
+ return null; // No deadlock -> null indicates test
failure
+ } catch (InterruptedException | ExecutionException ignore) {
+ return null; // No deadlock -> null indicates test
failure
+ } catch (TimeoutException ignore) {
+ return rootId; // Deadlock -> rootId indicates test pass
+ }
+ }
+ }, 0);
+
+ assertEquals(rootId, res.get());
+ }
+
+}