Author: mreutegg
Date: Wed Oct 31 09:26:16 2018
New Revision: 1845320
URL: http://svn.apache.org/viewvc?rev=1845320&view=rev
Log:
OAK-7869: Commit queue stuck when input stream of blob blocks
Add ignored test
Modified:
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java
Modified:
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java?rev=1845320&r1=1845319&r2=1845320&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java
(original)
+++
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java
Wed Oct 31 09:26:16 2018
@@ -16,25 +16,45 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import java.io.InputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Monitor;
+
import org.apache.jackrabbit.oak.json.JsopDiff;
import
org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper;
+import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.Clock;
+import org.jetbrains.annotations.NotNull;
import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS_RESOLUTION;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
import static
org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests DocumentNodeStore on various DocumentStore back-ends.
*/
public class DocumentNodeStoreIT extends AbstractDocumentStoreTest {
+ @Rule
+ public DocumentMKBuilderProvider builderProvider = new
DocumentMKBuilderProvider();
public DocumentNodeStoreIT(DocumentStoreFixture dsf) {
super(dsf);
@@ -45,7 +65,6 @@ public class DocumentNodeStoreIT extends
Revision.resetClockToDefault();
}
-
@Test
public void modifiedResetWithDiff() throws Exception {
Clock clock = new Clock.Virtual();
@@ -115,6 +134,111 @@ public class DocumentNodeStoreIT extends
ns2.dispose();
}
+ @Ignore("OAK-7869")
+ @Test
+ public void blockingBlob() throws Exception {
+ ExecutorService updateExecutor = newSingleThreadExecutor();
+ ExecutorService commitExecutor = newSingleThreadExecutor();
+ DocumentNodeStore store = builderProvider.newBuilder()
+ .setDocumentStore(ds).build();
+ try {
+
+ // A blob whose stream blocks on read
+ BlockingBlob blockingBlob = new BlockingBlob();
+
+ // Use a background thread to add the blocking blob to a property
+ updateExecutor.submit((Callable<?>) () -> {
+ DocumentNodeState root = store.getRoot();
+ NodeBuilder builder = root.builder();
+ builder.setProperty("blockingBlob", blockingBlob);
+ merge(store, builder);
+ return null;
+ });
+
+ // Wait for reading on the blob to block
+ assertTrue(blockingBlob.waitForRead(1, SECONDS));
+
+ // Commit something else in another background thread
+ Future<Void> committed = commitExecutor.submit(() -> {
+ DocumentNodeState root = store.getRoot();
+ NodeBuilder builder = root.builder();
+ builder.child("foo");
+ merge(store, builder);
+ return null;
+ });
+
+ // Commit should not get blocked by the blob blocked on reading
+ try {
+ committed.get(5, SECONDS);
+ } catch (TimeoutException e) {
+ fail("Commit must not block");
+ } finally {
+ blockingBlob.unblock();
+ }
+ } finally {
+ commitExecutor.shutdown();
+ updateExecutor.shutdown();
+ }
+ }
+
+ /**
+ * A blob that blocks on read until unblocked
+ */
+ class BlockingBlob extends AbstractBlob {
+ private final AtomicBoolean blocking = new AtomicBoolean(true);
+ private final Monitor readMonitor = new Monitor();
+ private boolean reading = false;
+
+ boolean waitForRead(int time, TimeUnit unit) throws
InterruptedException {
+ readMonitor.enter();
+ try {
+ return readMonitor.waitFor(new Monitor.Guard(readMonitor) {
+ @Override
+ public boolean isSatisfied() {
+ return reading;
+ }
+ }, time, unit);
+ } finally {
+ readMonitor.leave();
+ }
+ }
+
+ void unblock() {
+ blocking.set(false);
+ }
+
+ @NotNull
+ @Override
+ public InputStream getNewStream() {
+ return new InputStream() {
+
+ @Override
+ public int read() {
+ return readOrEnd();
+ }
+
+ @Override
+ public int read(@NotNull byte[] b, int off, int len) {
+ return readOrEnd();
+ }
+
+ private int readOrEnd() {
+ if (blocking.get()) {
+ reading = true;
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ };
+ }
+
+ @Override
+ public long length() {
+ return 1;
+ }
+ }
+
private class TestBuilder extends DocumentNodeStoreBuilder<TestBuilder> {
@Override