Author: chetanm
Date: Tue Nov 15 07:49:45 2016
New Revision: 1769750

URL: http://svn.apache.org/viewvc?rev=1769750&view=rev
Log:
OAK-4935 - support prefiltering of async index updates

-- Pass on ChangeCollectorProvider to AsyncIndexUpdate
-- Added some debug logging to ChangeCollectorProvider to log the changeSet 
that got build up

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java?rev=1769750&r1=1769749&r2=1769750&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
 Tue Nov 15 07:49:45 2016
@@ -20,6 +20,7 @@
 package org.apache.jackrabbit.oak.plugins.index;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +34,8 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardIndexEditorProvider;
@@ -71,8 +74,13 @@ public class AsyncIndexerService {
     private static final char CONFIG_SEP = ':';
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final WhiteboardIndexEditorProvider indexEditorProvider = new 
WhiteboardIndexEditorProvider();
+
     @Reference
     private NodeStore nodeStore;
+
+    @Reference(target = "(type=" + ChangeCollectorProvider.TYPE + ")")
+    private ValidatorProvider validatorProvider;
+
     private IndexMBeanRegistration indexRegistration;
 
     @Activate
@@ -87,11 +95,12 @@ public class AsyncIndexerService {
 
         for (AsyncConfig c : asyncIndexerConfig) {
             AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore, 
indexEditorProvider);
+            
task.setValidatorProviders(Collections.singletonList(validatorProvider));
             task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin));
             indexRegistration.registerAsyncIndexer(task, c.timeIntervalInSecs);
         }
         log.info("Configured async indexers {} ", asyncIndexerConfig);
-        log.info("Lease time: {} mins", leaseTimeOutMin);
+        log.info("Lease time: {} mins and AsyncIndexUpdate configured with 
{}", leaseTimeOutMin, validatorProvider.getClass().getName());
     }
 
     @Deactivate

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java?rev=1769750&r1=1769749&r2=1769750&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java
 Tue Nov 15 07:49:45 2016
@@ -61,9 +61,10 @@ import com.google.common.collect.Iterabl
         description = "It hooks into the commit and collects a ChangeSet of 
changed items of a commit which " +
                 "is then used to speed up observation processing"
 )
-@Property(name = "type", value = "changeCollectorProvider", propertyPrivate = 
true)
+@Property(name = "type", value = ChangeCollectorProvider.TYPE, propertyPrivate 
= true)
 @Service(ValidatorProvider.class)
 public class ChangeCollectorProvider extends ValidatorProvider {
+    public static final String TYPE = "changeCollectorProvider";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ChangeCollectorProvider.class);
 
@@ -212,7 +213,9 @@ public class ChangeCollectorProvider ext
             // but if we're at the root, then we add the ChangeSet to the
             // CommitContext of the CommitInfo
             CommitContext commitContext = (CommitContext) 
support.getInfo().getInfo().get(CommitContext.NAME);
-            commitContext.set(COMMIT_CONTEXT_OBSERVATION_CHANGESET, 
support.getChangeSetBuilder().build());
+            ChangeSet changeSet = support.getChangeSetBuilder().build();
+            commitContext.set(COMMIT_CONTEXT_OBSERVATION_CHANGESET, changeSet);
+            LOG.debug("Collected changeSet for commit {} is {}", 
support.getInfo(), changeSet);
         }
 
         @Override

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1769750&r1=1769749&r2=1769750&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
 Tue Nov 15 07:49:45 2016
@@ -1631,7 +1631,7 @@ public class AsyncIndexUpdateTest {
         }
     }
 
-    private static class CommitInfoCollector implements Observer {
+    static class CommitInfoCollector implements Observer {
         List<CommitInfo> infos = Lists.newArrayList();
 
         @Override

Modified: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java?rev=1769750&r1=1769749&r2=1769750&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
 Tue Nov 15 07:49:45 2016
@@ -24,8 +24,18 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import 
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdateTest.CommitInfoCollector;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexerService.AsyncConfig;
+import 
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
@@ -33,17 +43,22 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
 import static org.junit.Assert.*;
 
 public class AsyncIndexerServiceTest {
     @Rule
     public final OsgiContext context = new OsgiContext();
 
+    private MemoryNodeStore nodeStore = new MemoryNodeStore();
     private AsyncIndexerService service = new AsyncIndexerService();
 
     @Before
     public void setUp() {
-        context.registerService(NodeStore.class, new MemoryNodeStore());
+        context.registerService(NodeStore.class, nodeStore);
+        context.registerService(ValidatorProvider.class, new 
ChangeCollectorProvider());
         MockOsgi.injectServices(service, context.bundleContext());
     }
 
@@ -71,6 +86,35 @@ public class AsyncIndexerServiceTest {
         assertEquals(TimeUnit.MINUTES.toMillis(20), 
indexUpdate.getLeaseTimeOut());
     }
 
+    @Test
+    public void changeCollectionEnabled() throws Exception{
+        Map<String,Object> config = ImmutableMap.<String, Object>of(
+                "asyncConfigs", new String[] {"async:5"}
+        );
+        context.registerService(IndexEditorProvider.class, new 
PropertyIndexEditorProvider());
+        MockOsgi.activate(service, context.bundleContext(), config);
+
+        NodeBuilder builder = nodeStore.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        // merge it back in
+        nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        CommitInfoCollector infoCollector = new CommitInfoCollector();
+        nodeStore.addObserver(infoCollector);
+
+        AsyncIndexUpdate indexUpdate = getIndexUpdate("async");
+        indexUpdate.run();
+
+        CommitContext commitContext = (CommitContext) 
infoCollector.infos.get(0).getInfo().get(CommitContext.NAME);
+        assertNotNull(commitContext);
+        ChangeSet changeSet = (ChangeSet) 
commitContext.get(ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET);
+        assertNotNull(changeSet);
+    }
+
     private AsyncIndexUpdate getIndexUpdate(String name) {
         return (AsyncIndexUpdate) context.getServices(Runnable.class, 
"(oak.async="+name+")")[0];
     }


Reply via email to