Author: chetanm
Date: Tue Nov 15 07:49:33 2016
New Revision: 1769749

URL: http://svn.apache.org/viewvc?rev=1769749&view=rev
Log:
OAK-4935 - support prefiltering of async index updates

Pass ValidatorProviders as part of commit. Later this would be used to pass in 
ChangeCollector validator

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1769749&r1=1769748&r2=1769749&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
 Tue Nov 15 07:49:33 2016
@@ -29,7 +29,9 @@ import static org.apache.jackrabbit.oak.
 
 import java.io.Closeable;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
@@ -47,6 +49,7 @@ import javax.management.openmbean.OpenDa
 import javax.management.openmbean.OpenType;
 import javax.management.openmbean.SimpleType;
 
+import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
@@ -61,9 +64,12 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider;
 import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
 import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
 import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
 import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -183,6 +189,8 @@ public class AsyncIndexUpdate implements
      */
     private long lastCheckpointCleanUpTime;
 
+    private List<ValidatorProvider> validatorProviders = 
Collections.emptyList();
+
     public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
             @Nonnull IndexEditorProvider provider, boolean switchOnSync) {
         this.name = checkNotNull(name);
@@ -226,6 +234,8 @@ public class AsyncIndexUpdate implements
 
         private final AtomicBoolean forcedStop;
 
+        private List<ValidatorProvider> validatorProviders = 
Collections.emptyList();
+
         /** Expiration time of the last lease we committed */
         private long lease;
 
@@ -260,7 +270,7 @@ public class AsyncIndexUpdate implements
             NodeBuilder builder = root.builder();
             NodeBuilder async = builder.child(ASYNC);
             async.setProperty(leaseName, lease);
-            mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease, 
name);
+            mergeWithConcurrencyCheck(store, validatorProviders, builder, 
checkpoint, beforeLease, name);
             hasLease = true;
         }
 
@@ -274,7 +284,7 @@ public class AsyncIndexUpdate implements
             NodeBuilder async = builder.child(ASYNC);
 
             updateTempCheckpoints(async, checkpoint, afterCheckpoint);
-            mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name);
+            mergeWithConcurrencyCheck(store, validatorProviders, builder, 
checkpoint, lease, name);
 
             // reset updates counter
             indexStats.resetUpdates();
@@ -312,7 +322,7 @@ public class AsyncIndexUpdate implements
             NodeBuilder builder = store.getRoot().builder();
             NodeBuilder async = builder.child(ASYNC);
             async.removeProperty(leaseName);
-            mergeWithConcurrencyCheck(store, builder, async.getString(name), 
lease, name);
+            mergeWithConcurrencyCheck(store, validatorProviders, builder, 
async.getString(name), lease, name);
         }
 
         @Override
@@ -328,7 +338,7 @@ public class AsyncIndexUpdate implements
                     long newLease = now + 2 * leaseTimeOut;
                     NodeBuilder builder = store.getRoot().builder();
                     builder.child(ASYNC).setProperty(leaseName, newLease);
-                    mergeWithConcurrencyCheck(store, builder, checkpoint, 
lease, name);
+                    mergeWithConcurrencyCheck(store, validatorProviders, 
builder, checkpoint, lease, name);
                     lease = newLease;
                 }
             }
@@ -337,6 +347,10 @@ public class AsyncIndexUpdate implements
         public void setCheckpoint(String checkpoint) {
             this.checkpoint = checkpoint;
         }
+
+        public void setValidatorProviders(List<ValidatorProvider> 
validatorProviders) {
+            this.validatorProviders = validatorProviders;
+        }
     }
 
     @Override
@@ -583,8 +597,10 @@ public class AsyncIndexUpdate implements
                                                          String name, long 
leaseTimeOut, String beforeCheckpoint,
                                                          AsyncIndexStats 
indexStats,
                                                          AtomicBoolean 
stopFlag) {
-        return new AsyncUpdateCallback(store, name, leaseTimeOut,
+        AsyncUpdateCallback callback = new AsyncUpdateCallback(store, name, 
leaseTimeOut,
                 beforeCheckpoint, indexStats, stopFlag);
+        callback.setValidatorProviders(validatorProviders);
+        return callback;
     }
 
     protected boolean updateIndex(NodeState before, String beforeCheckpoint,
@@ -643,7 +659,7 @@ public class AsyncIndexUpdate implements
                 }
                 updatePostRunStatus = true;
             }
-            mergeWithConcurrencyCheck(store, builder, beforeCheckpoint,
+            mergeWithConcurrencyCheck(store, validatorProviders, builder, 
beforeCheckpoint,
                     callback.lease, name);
             if (indexUpdate.isReindexingPerformed()) {
                 log.info("[{}] Reindexing completed for indexes: {} in {}",
@@ -675,9 +691,9 @@ public class AsyncIndexUpdate implements
         return name + "-temp";
     }
 
-    private static void mergeWithConcurrencyCheck(final NodeStore store,
-            NodeBuilder builder, final String checkpoint, final long lease,
-            final String name) throws CommitFailedException {
+    private static void mergeWithConcurrencyCheck(final NodeStore store, 
List<ValidatorProvider> validatorProviders,
+                                                  NodeBuilder builder, final 
String checkpoint, final long lease,
+                                                  final String name) throws 
CommitFailedException {
         CommitHook concurrentUpdateCheck = new CommitHook() {
             @Override @Nonnull
             public NodeState processCommit(
@@ -694,9 +710,12 @@ public class AsyncIndexUpdate implements
                 }
             }
         };
+        List<EditorProvider> editorProviders = Lists.newArrayList();
+        editorProviders.add(new ConflictValidatorProvider());
+        editorProviders.addAll(validatorProviders);
         CompositeHook hooks = new CompositeHook(
                 new ConflictHook(new AnnotatingConflictHandler()),
-                new EditorHook(new ConflictValidatorProvider()),
+                new 
EditorHook(CompositeEditorProvider.compose(editorProviders)),
                 concurrentUpdateCheck);
         try {
             store.merge(builder, hooks, createCommitInfo());
@@ -732,6 +751,10 @@ public class AsyncIndexUpdate implements
         return this;
     }
 
+    public void setValidatorProviders(List<ValidatorProvider> 
validatorProviders) {
+        this.validatorProviders = validatorProviders;
+    }
+
     public boolean isClosed(){
         return closed || forcedStopFlag.get();
     }
@@ -1225,7 +1248,7 @@ public class AsyncIndexUpdate implements
             }
 
             if (!updated.isEmpty()) {
-                mergeWithConcurrencyCheck(store, builder, refCheckpoint, 
lease, name);
+                mergeWithConcurrencyCheck(store, validatorProviders, builder, 
refCheckpoint, lease, name);
                 log.info(
                         "[{}] Successfully split index definitions {} to async 
task named {} with referenced checkpoint {}.",
                         name, updated, newIndexTaskName, refCheckpoint);

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1769749&r1=1769748&r2=1769749&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
 Tue Nov 15 07:49:33 2016
@@ -25,6 +25,7 @@ import static org.apache.jackrabbit.oak.
 import static 
org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
 import static 
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE;
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -51,11 +52,13 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.management.openmbean.CompositeData;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
 import 
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
 import 
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
@@ -66,9 +69,12 @@ import org.apache.jackrabbit.oak.query.i
 import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.DefaultValidator;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.commit.Validator;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
 import org.apache.jackrabbit.oak.spi.query.PropertyValues;
 import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -1557,6 +1563,74 @@ public class AsyncIndexUpdateTest {
         
assertNotNull(infoCollector.infos.get(0).getInfo().get(CommitContext.NAME));
     }
 
+    @Test
+    public void validatorProviderInvocation() throws Exception{
+        MemoryNodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        // merge it back in
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, 
provider);
+        CollectingValidatorProvider v = new CollectingValidatorProvider();
+
+        async.setValidatorProviders(ImmutableList.<ValidatorProvider>of(v));
+        async.run();
+
+        assertFalse(v.visitedPaths.isEmpty());
+        assertThat(v.visitedPaths, hasItem("/:async"));
+        assertThat(v.visitedPaths, hasItem("/oak:index/rootIndex"));
+
+    }
+
+    private static class CollectingValidatorProvider extends ValidatorProvider 
{
+        final Set<String> visitedPaths = Sets.newHashSet();
+
+        @Override
+        protected Validator getRootValidator(NodeState before, NodeState 
after, CommitInfo info) {
+            return new CollectingValidator("/");
+        }
+
+        public void reset(){
+            visitedPaths.clear();
+        }
+
+        private class CollectingValidator extends DefaultValidator {
+            private final String path;
+
+            public CollectingValidator(String path){
+                this.path = path;
+            }
+
+            @Override
+            public void enter(NodeState before, NodeState after) throws 
CommitFailedException {
+                visitedPaths.add(path);
+                super.enter(before, after);
+            }
+
+            @Override
+            public Validator childNodeAdded(String name, NodeState after) 
throws CommitFailedException {
+                return new CollectingValidator(PathUtils.concat(path, name));
+            }
+
+            @Override
+            public Validator childNodeChanged(String name, NodeState before, 
NodeState after) throws CommitFailedException {
+                return new CollectingValidator(PathUtils.concat(path, name));
+            }
+
+            @Override
+            public Validator childNodeDeleted(String name, NodeState before) 
throws CommitFailedException {
+                return new CollectingValidator(PathUtils.concat(path, name));
+            }
+        }
+    }
+
     private static class CommitInfoCollector implements Observer {
         List<CommitInfo> infos = Lists.newArrayList();
 


Reply via email to