Author: stefanegli
Date: Thu Jul  6 11:57:27 2017
New Revision: 1801032

URL: http://svn.apache.org/viewvc?rev=1801032&view=rev
Log:
OAK-6276 : introduce VisibilityToken to Clusterable : allows to check for 
visibility checks between Clusterable NodeStores

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
    
jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java?rev=1801032&r1=1801031&r2=1801032&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
 Thu Jul  6 11:57:27 2017
@@ -112,15 +112,39 @@ final class CommitQueue {
      *     <li>the thread is interrupted</li>
      * </ul>
      *
-     * @param conflictRevisions the revisions to become visible.
+     * @param revisions the revisions to become visible.
      */
-    void suspendUntilAll(@Nonnull Set<Revision> conflictRevisions) {
+    void suspendUntilAll(@Nonnull Set<Revision> revisions) {
+        try {
+            suspendUntilAll(revisions, suspendTimeout);
+        } catch (InterruptedException e) {
+            LOG.debug("The suspended thread has been interrupted", e);
+        }        
+    }
+    
+    /**
+     * Suspends until for each of given revisions one of the following happens:
+     * <ul>
+     *     <li>the given revision is visible from the current headRevision</li>
+     *     <li>the given revision is canceled from the commit queue</li>
+     *     <li>the suspend timeout is reached</li>
+     *     <li>the thread is interrupted</li>
+     * </ul>
+     *
+     * @param revisions the revisions to become visible.
+     * @param suspendTimeoutMillis how long to suspend at max
+     * @throws InterruptedException thrown when this thread has its interrupted
+     * status set or was interrupted while waiting. The current thread's
+     * interrupted status is cleared when this exception is thrown.
+     */
+    void suspendUntilAll(@Nonnull Set<Revision> revisions, long 
suspendTimeoutMillis) 
+            throws InterruptedException {
         Semaphore s;
         int addedRevisions;
         synchronized (suspendedCommits) {
             RevisionVector headRevision = context.getHeadRevision();
-            Set<Revision> afterHead = new 
HashSet<Revision>(conflictRevisions.size());
-            for (Revision r : conflictRevisions) {
+            Set<Revision> afterHead = new HashSet<Revision>(revisions.size());
+            for (Revision r : revisions) {
                 if (headRevision.isRevisionNewer(r)) {
                     afterHead.add(r);
                 }
@@ -131,9 +155,7 @@ final class CommitQueue {
             addedRevisions = afterHead.size();
         }
         try {
-            s.tryAcquire(addedRevisions, suspendTimeout, 
TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            LOG.debug("The suspended thread has been interrupted", e);
+            s.tryAcquire(addedRevisions, suspendTimeoutMillis, 
TimeUnit.MILLISECONDS);
         } finally {
             synchronized (suspendedCommits) {
                 suspendedCommits.remove(s);

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1801032&r1=1801031&r2=1801032&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 Thu Jul  6 11:57:27 2017
@@ -69,6 +69,7 @@ import javax.management.NotCompliantMBea
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.cache.Cache;
@@ -3171,6 +3172,52 @@ public final class DocumentNodeStore
     public String getInstanceId() {
         return String.valueOf(getClusterId());
     }
+    
+    @Override
+    public String getVisibilityToken() {
+        final DocumentNodeState theRoot = root;
+        if (theRoot == null) {
+            // unlikely but for paranoia reasons...
+            return "";
+        }
+        return theRoot.getRootRevision().asString();
+    }
+    
+    private boolean isVisible(RevisionVector rv) {
+        // do not synchronize, take a local copy instead
+        final DocumentNodeState localRoot = root;
+        if (localRoot == null) {
+            // unlikely but for paranoia reasons...
+            return false;
+        }
+        return Utils.isGreaterOrEquals(localRoot.getRootRevision(), rv);
+    }
+    
+    @Override
+    public boolean isVisible(@Nonnull String visibilityToken, long 
maxWaitMillis) throws InterruptedException {
+        if (Strings.isNullOrEmpty(visibilityToken)) {
+            // we've asked for @Nonnull..
+            // hence throwing an exception
+            throw new IllegalArgumentException("visibilityToken must not be 
null or empty");
+        }
+        // 'fromString' would throw a RuntimeException if it can't parse 
+        // that would be re thrown automatically
+        final RevisionVector visibilityTokenRv = 
RevisionVector.fromString(visibilityToken);
+
+        if (isVisible(visibilityTokenRv)) {
+            // the simple case
+            return true;
+        }
+        
+        // otherwise wait until the visibility token's revisions all become 
visible
+        // (or maxWaitMillis has passed)
+        commitQueue.suspendUntilAll(Sets.newHashSet(visibilityTokenRv), 
maxWaitMillis);
+        
+        // if we got interrupted above would throw InterruptedException
+        // otherwise, we don't know why suspendUntilAll returned, so
+        // check the final isVisible state and return it
+        return isVisible(visibilityTokenRv);
+    }
 
     public DocumentNodeStoreStatsCollector getStatsCollector() {
         return nodeStoreStatsCollector;

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java?rev=1801032&r1=1801031&r2=1801032&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java
 Thu Jul  6 11:57:27 2017
@@ -108,12 +108,32 @@ public class AtomicCounterEditorTest {
         public String getInstanceId() {
             return "1";
         }
+
+        @Override
+        public String getVisibilityToken() {
+            return "";
+        }
+
+        @Override
+        public boolean isVisible(String visibilityToken, long maxWaitMillis) 
throws InterruptedException {
+            return true;
+        }
     };
     private static final Clusterable CLUSTER_2 = new Clusterable() {
         @Override
         public String getInstanceId() {
             return "2";
         }
+
+        @Override
+        public String getVisibilityToken() {
+            return "";
+        }
+
+        @Override
+        public boolean isVisible(String visibilityToken, long maxWaitMillis) 
throws InterruptedException {
+            return true;
+        }
     };
     private static final EditorHook HOOK_NO_CLUSTER = new EditorHook(
         new TestableACEProvider(null, null, null, null));

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1801032&r1=1801031&r2=1801032&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
 Thu Jul  6 11:57:27 2017
@@ -59,10 +59,14 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -3192,6 +3196,129 @@ public class DocumentNodeStoreTest {
         assertThat(updates, everyItem(is(30)));
         assertEquals(1, updates.size());
     }
+    
+    // OAK-6276
+    @Test
+    public void visibilityToken() throws Exception {
+        DocumentStore docStore = new MemoryDocumentStore();
+        DocumentNodeStore ns1 = builderProvider.newBuilder()
+                .setDocumentStore(docStore).setAsyncDelay(0)
+                .setClusterId(1).getNodeStore();
+        ns1.getRoot();
+        ns1.runBackgroundOperations();
+        DocumentNodeStore ns2 = builderProvider.newBuilder()
+                .setDocumentStore(docStore).setAsyncDelay(0)
+                .setClusterId(2).getNodeStore();
+        ns2.getRoot();
+        
+        String vt1 = ns1.getVisibilityToken();
+        String vt2 = ns2.getVisibilityToken();
+        
+        assertTrue(ns1.isVisible(vt1, -1));
+        assertTrue(ns1.isVisible(vt1, 1));
+        assertTrue(ns1.isVisible(vt1, 100000000));
+        assertTrue(ns2.isVisible(vt2, -1));
+        assertTrue(ns2.isVisible(vt2, 1));
+        assertTrue(ns2.isVisible(vt2, 100000000));
+
+        assertFalse(ns1.isVisible(vt2, -1));
+        assertFalse(ns1.isVisible(vt2, 1));
+        assertTrue(ns2.isVisible(vt1, -1));
+        ns2.runBackgroundOperations();
+        ns1.runBackgroundOperations();
+        assertTrue(ns1.isVisible(vt2, -1));
+        assertTrue(ns2.isVisible(vt1, -1));
+        
+        vt1 = ns1.getVisibilityToken();
+        vt2 = ns2.getVisibilityToken();
+        assertTrue(ns1.isVisible(vt1, -1));
+        assertTrue(ns2.isVisible(vt2, -1));
+        assertTrue(ns1.isVisible(vt2, -1));
+        assertTrue(ns2.isVisible(vt1, -1));
+        assertTrue(ns1.isVisible(vt1, 100000000));
+        assertTrue(ns2.isVisible(vt2, 100000000));
+        assertTrue(ns1.isVisible(vt2, 100000000));
+        assertTrue(ns2.isVisible(vt1, 100000000));
+        
+        NodeBuilder b1 = ns1.getRoot().builder();
+        b1.setProperty("p1", "1");
+        ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        NodeBuilder b2 = ns2.getRoot().builder();
+        b2.setProperty("p2", "2");
+        ns2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        assertTrue(ns1.isVisible(vt1, -1));
+        assertTrue(ns2.isVisible(vt2, -1));
+        assertTrue(ns1.isVisible(vt2, -1));
+        assertTrue(ns2.isVisible(vt1, -1));
+        
+        vt1 = ns1.getVisibilityToken();
+        vt2 = ns2.getVisibilityToken();
+        assertTrue(ns1.isVisible(vt1, -1));
+        assertTrue(ns2.isVisible(vt2, -1));
+        assertFalse(ns1.isVisible(vt2, -1));
+        assertFalse(ns2.isVisible(vt1, -1));
+        assertFalse(ns1.isVisible(vt2, 1));
+        assertFalse(ns2.isVisible(vt1, 1));
+
+        ns1.runBackgroundOperations();
+        assertTrue(ns1.isVisible(vt1, -1));
+        assertTrue(ns2.isVisible(vt2, -1));
+        assertFalse(ns1.isVisible(vt2, -1));
+        assertFalse(ns2.isVisible(vt1, -1));
+        assertFalse(ns1.isVisible(vt2, 1));
+        assertFalse(ns2.isVisible(vt1, 1));
+
+        ns2.runBackgroundOperations();
+        assertTrue(ns1.isVisible(vt1, -1));
+        assertTrue(ns2.isVisible(vt2, -1));
+        assertFalse(ns1.isVisible(vt2, -1));
+        assertFalse(ns1.isVisible(vt2, 1));
+        assertTrue(ns2.isVisible(vt1, -1));
+
+        ns1.runBackgroundOperations();
+        assertTrue(ns1.isVisible(vt1, -1));
+        assertTrue(ns2.isVisible(vt2, -1));
+        assertTrue(ns1.isVisible(vt2, -1));
+        assertTrue(ns2.isVisible(vt1, -1));
+        
+        vt1 = ns1.getVisibilityToken();
+        vt2 = ns2.getVisibilityToken();
+        assertTrue(ns1.isVisible(vt2, -1));
+        assertTrue(ns2.isVisible(vt1, -1));
+
+        b1 = ns1.getRoot().builder();
+        b1.setProperty("p1", "1b");
+        ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        vt1 = ns1.getVisibilityToken();
+        assertFalse(ns2.isVisible(vt1, -1));
+        final String finalVt1 = vt1;
+        Future<Void> asyncResult = Executors.newFixedThreadPool(1).submit(new 
Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                assertTrue(ns2.isVisible(finalVt1, 10000));
+                return null;
+            }
+        });
+        try{
+            asyncResult.get(500, TimeUnit.MILLISECONDS);
+            fail("should have thrown a timeout exception");
+        } catch(TimeoutException te) {
+            // ok
+        }
+        ns1.runBackgroundOperations();
+        try{
+            asyncResult.get(500, TimeUnit.MILLISECONDS);
+            fail("should have thrown a timeout exception");
+        } catch(TimeoutException te) {
+            // ok
+        }
+        ns2.runBackgroundOperations();
+        asyncResult.get(6000, TimeUnit.MILLISECONDS);
+    }
+
 
     private static class WriteCountingStore extends MemoryDocumentStore {
         private final ThreadLocal<Boolean> createMulti = new ThreadLocal<>();

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java?rev=1801032&r1=1801031&r2=1801032&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
 Thu Jul  6 11:57:27 2017
@@ -181,5 +181,15 @@ public class AsyncIndexerServiceTest {
         public String getInstanceId() {
             return "foo";
         }
+
+        @Override
+        public String getVisibilityToken() {
+            return "";
+        }
+
+        @Override
+        public boolean isVisible(String visibilityToken, long maxWaitMillis) 
throws InterruptedException {
+            return true;
+        }
     }
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java?rev=1801032&r1=1801031&r2=1801032&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java
 Thu Jul  6 11:57:27 2017
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.spi.state;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 /**
  * Interface for bearing cluster node specific information.
@@ -32,4 +33,58 @@ public interface Clusterable {
      */
     @Nonnull
     String getInstanceId();
-}
+
+    /**
+     * Returns the visibility token of the underlying NodeStore. A 'visibility
+     * token' is an opaque String that can be used to verify if changes done on
+     * one NodeStore are visible on another NodeStore of the same cluster. This
+     * can be achieved by generating such a visibility token on the source
+     * NodeStore, passing it on to the target NodeStore (by whatever means) and
+     * checking for visibility on that target NodeStore.
+     * <p/>
+     * The visibility check returns true if the target NodeStore sees at least
+     * all the changes that the source NodeStore saw at time of visibility 
token
+     * generation. Once a visibility token is visible on a particular NodeStore
+     * it will always return true ever after. This also implies that the
+     * visibility check can only state whether at least all source changes are
+     * visible on the target and that it is independent of any further
+     * modifications.
+     * <p/>
+     * When source and target NodeStore are identical, the visibility check is
+     * expected to return true, immediately. This is based on the assumption
+     * that with a session.refresh() on that NodeStore you'll always get the
+     * latest changes applied by any other session locally.
+     * <p/>
+     * Visibility tokens are meant to be lightweight and are not expected to be
+     * persisted by the implementor. Nevertheless they should survive their
+     * validity in the case of crashes of the source and/or the target 
instance.
+     */
+    @Nullable
+    String getVisibilityToken();
+
+    /**
+     * Checks if the underlying NodeStore sees at least the changes that were
+     * visible at the time the visibility token was created on potentially
+     * another instance if in a clustered NodeStore setup.
+     * <p/>
+     * If the visibility token was created on the underlying NodeStore this
+     * check always returns true, immediately.
+     * 
+     * @param visibilityToken
+     *            the visibility token that was created on another instance in 
a
+     *            clustered NodeStore setup. Providing null is not supported 
and
+     *            might throw a RuntimeException
+     * @param maxWaitMillis
+     *            if &gt;-1 waits (at max this many milliseconds if &gt;0,
+     *            forever if ==0) until the underlying NodeStore sees at least
+     *            the changes represented by the provided visibility token. if
+     *            &lt; 0 the method does not wait
+     * @return true if the underlying NodeStore sees at least the changes that
+     *         were visible at the time the visibility token was created
+     * @throws InterruptedException
+     *             (optionally) thrown if interrupted while waiting
+     * @see VisibilityTokenProvider VisibilityTokenProvider for a definition 
and
+     *      usage of visibility tokens
+     */
+    boolean isVisible(@Nonnull String visibilityToken, long maxWaitMillis) 
throws InterruptedException;
+}
\ No newline at end of file


Reply via email to