Author: mreutegg
Date: Wed Oct 31 09:26:16 2018
New Revision: 1845320

URL: http://svn.apache.org/viewvc?rev=1845320&view=rev
Log:
OAK-7869: Commit queue stuck when input stream of blob blocks

Add ignored test

Modified:
    
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java

Modified: 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java?rev=1845320&r1=1845319&r2=1845320&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java
 Wed Oct 31 09:26:16 2018
@@ -16,25 +16,45 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.io.InputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Monitor;
+
 import org.apache.jackrabbit.oak.json.JsopDiff;
 import 
org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper;
+import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.stats.Clock;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS_RESOLUTION;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
 import static 
org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests DocumentNodeStore on various DocumentStore back-ends.
  */
 public class DocumentNodeStoreIT extends AbstractDocumentStoreTest {
 
+    @Rule
+    public DocumentMKBuilderProvider builderProvider = new 
DocumentMKBuilderProvider();
 
     public DocumentNodeStoreIT(DocumentStoreFixture dsf) {
         super(dsf);
@@ -45,7 +65,6 @@ public class DocumentNodeStoreIT extends
         Revision.resetClockToDefault();
     }
 
-
     @Test
     public void modifiedResetWithDiff() throws Exception {
         Clock clock = new Clock.Virtual();
@@ -115,6 +134,111 @@ public class DocumentNodeStoreIT extends
         ns2.dispose();
     }
 
+    @Ignore("OAK-7869")
+    @Test
+    public void blockingBlob() throws Exception {
+        ExecutorService updateExecutor = newSingleThreadExecutor();
+        ExecutorService commitExecutor = newSingleThreadExecutor();
+        DocumentNodeStore store = builderProvider.newBuilder()
+                .setDocumentStore(ds).build();
+        try {
+
+            // A blob whose stream blocks on read
+            BlockingBlob blockingBlob = new BlockingBlob();
+
+            // Use a background thread to add the blocking blob to a property
+            updateExecutor.submit((Callable<?>) () -> {
+                DocumentNodeState root = store.getRoot();
+                NodeBuilder builder = root.builder();
+                builder.setProperty("blockingBlob", blockingBlob);
+                merge(store, builder);
+                return null;
+            });
+
+            // Wait for reading on the blob to block
+            assertTrue(blockingBlob.waitForRead(1, SECONDS));
+
+            // Commit something else in another background thread
+            Future<Void> committed = commitExecutor.submit(() -> {
+                DocumentNodeState root = store.getRoot();
+                NodeBuilder builder = root.builder();
+                builder.child("foo");
+                merge(store, builder);
+                return null;
+            });
+
+            // Commit should not get blocked by the blob blocked on reading
+            try {
+                committed.get(5, SECONDS);
+            } catch (TimeoutException e) {
+                fail("Commit must not block");
+            } finally {
+                blockingBlob.unblock();
+            }
+        } finally {
+            commitExecutor.shutdown();
+            updateExecutor.shutdown();
+        }
+    }
+
+    /**
+     *  A blob that blocks on read until unblocked
+     */
+    class BlockingBlob extends AbstractBlob {
+        private final AtomicBoolean blocking = new AtomicBoolean(true);
+        private final Monitor readMonitor = new Monitor();
+        private boolean reading = false;
+
+        boolean waitForRead(int time, TimeUnit unit) throws 
InterruptedException {
+            readMonitor.enter();
+            try {
+                return readMonitor.waitFor(new Monitor.Guard(readMonitor) {
+                    @Override
+                    public boolean isSatisfied() {
+                        return reading;
+                    }
+                }, time, unit);
+            } finally {
+                readMonitor.leave();
+            }
+        }
+
+        void unblock() {
+            blocking.set(false);
+        }
+
+        @NotNull
+        @Override
+        public InputStream getNewStream() {
+            return new InputStream() {
+
+                @Override
+                public int read() {
+                    return readOrEnd();
+                }
+
+                @Override
+                public int read(@NotNull byte[] b, int off, int len) {
+                    return readOrEnd();
+                }
+
+                private int readOrEnd() {
+                    if (blocking.get()) {
+                        reading = true;
+                        return 0;
+                    } else {
+                        return -1;
+                    }
+                }
+            };
+        }
+
+        @Override
+        public long length() {
+            return 1;
+        }
+    }
+
     private class TestBuilder extends DocumentNodeStoreBuilder<TestBuilder> {
 
         @Override


Reply via email to