Author: mreutegg Date: Wed Oct 31 09:26:16 2018 New Revision: 1845320 URL: http://svn.apache.org/viewvc?rev=1845320&view=rev Log: OAK-7869: Commit queue stuck when input stream of blob blocks
Add ignored test Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java?rev=1845320&r1=1845319&r2=1845320&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java (original) +++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java Wed Oct 31 09:26:16 2018 @@ -16,25 +16,45 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.io.InputStream; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.Monitor; + import org.apache.jackrabbit.oak.json.JsopDiff; import org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper; +import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.stats.Clock; +import org.jetbrains.annotations.NotNull; import org.junit.After; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS_RESOLUTION; +import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge; import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests DocumentNodeStore on various DocumentStore back-ends. */ public class DocumentNodeStoreIT extends AbstractDocumentStoreTest { + @Rule + public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider(); public DocumentNodeStoreIT(DocumentStoreFixture dsf) { super(dsf); @@ -45,7 +65,6 @@ public class DocumentNodeStoreIT extends Revision.resetClockToDefault(); } - @Test public void modifiedResetWithDiff() throws Exception { Clock clock = new Clock.Virtual(); @@ -115,6 +134,111 @@ public class DocumentNodeStoreIT extends ns2.dispose(); } + @Ignore("OAK-7869") + @Test + public void blockingBlob() throws Exception { + ExecutorService updateExecutor = newSingleThreadExecutor(); + ExecutorService commitExecutor = newSingleThreadExecutor(); + DocumentNodeStore store = builderProvider.newBuilder() + .setDocumentStore(ds).build(); + try { + + // A blob whose stream blocks on read + BlockingBlob blockingBlob = new BlockingBlob(); + + // Use a background thread to add the blocking blob to a property + updateExecutor.submit((Callable<?>) () -> { + DocumentNodeState root = store.getRoot(); + NodeBuilder builder = root.builder(); + builder.setProperty("blockingBlob", blockingBlob); + merge(store, builder); + return null; + }); + + // Wait for reading on the blob to block + assertTrue(blockingBlob.waitForRead(1, SECONDS)); + + // Commit something else in another background thread + Future<Void> committed = commitExecutor.submit(() -> { + DocumentNodeState root = store.getRoot(); + NodeBuilder builder = root.builder(); + builder.child("foo"); + merge(store, builder); + return null; + }); + + // Commit should not get blocked by the blob blocked on reading + try { + committed.get(5, SECONDS); + } catch (TimeoutException e) { + fail("Commit must not block"); + } finally { + blockingBlob.unblock(); + } + } finally { + commitExecutor.shutdown(); + updateExecutor.shutdown(); + } + } + + /** + * A blob that blocks on read until unblocked + */ + class BlockingBlob extends AbstractBlob { + private final AtomicBoolean blocking = new AtomicBoolean(true); + private final Monitor readMonitor = new Monitor(); + private boolean reading = false; + + boolean waitForRead(int time, TimeUnit unit) throws InterruptedException { + readMonitor.enter(); + try { + return readMonitor.waitFor(new Monitor.Guard(readMonitor) { + @Override + public boolean isSatisfied() { + return reading; + } + }, time, unit); + } finally { + readMonitor.leave(); + } + } + + void unblock() { + blocking.set(false); + } + + @NotNull + @Override + public InputStream getNewStream() { + return new InputStream() { + + @Override + public int read() { + return readOrEnd(); + } + + @Override + public int read(@NotNull byte[] b, int off, int len) { + return readOrEnd(); + } + + private int readOrEnd() { + if (blocking.get()) { + reading = true; + return 0; + } else { + return -1; + } + } + }; + } + + @Override + public long length() { + return 1; + } + } + private class TestBuilder extends DocumentNodeStoreBuilder<TestBuilder> { @Override