Author: chetanm
Date: Tue Nov 15 07:49:45 2016
New Revision: 1769750
URL: http://svn.apache.org/viewvc?rev=1769750&view=rev
Log:
OAK-4935 - support prefiltering of async index updates
-- Pass on ChangeCollectorProvider to AsyncIndexUpdate
-- Added some debug logging to ChangeCollectorProvider to log the changeSet
that got build up
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java?rev=1769750&r1=1769749&r2=1769750&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
Tue Nov 15 07:49:45 2016
@@ -20,6 +20,7 @@
package org.apache.jackrabbit.oak.plugins.index;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -33,6 +34,8 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Reference;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardIndexEditorProvider;
@@ -71,8 +74,13 @@ public class AsyncIndexerService {
private static final char CONFIG_SEP = ':';
private final Logger log = LoggerFactory.getLogger(getClass());
private final WhiteboardIndexEditorProvider indexEditorProvider = new
WhiteboardIndexEditorProvider();
+
@Reference
private NodeStore nodeStore;
+
+ @Reference(target = "(type=" + ChangeCollectorProvider.TYPE + ")")
+ private ValidatorProvider validatorProvider;
+
private IndexMBeanRegistration indexRegistration;
@Activate
@@ -87,11 +95,12 @@ public class AsyncIndexerService {
for (AsyncConfig c : asyncIndexerConfig) {
AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore,
indexEditorProvider);
+
task.setValidatorProviders(Collections.singletonList(validatorProvider));
task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin));
indexRegistration.registerAsyncIndexer(task, c.timeIntervalInSecs);
}
log.info("Configured async indexers {} ", asyncIndexerConfig);
- log.info("Lease time: {} mins", leaseTimeOutMin);
+ log.info("Lease time: {} mins and AsyncIndexUpdate configured with
{}", leaseTimeOutMin, validatorProvider.getClass().getName());
}
@Deactivate
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java?rev=1769750&r1=1769749&r2=1769750&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeCollectorProvider.java
Tue Nov 15 07:49:45 2016
@@ -61,9 +61,10 @@ import com.google.common.collect.Iterabl
description = "It hooks into the commit and collects a ChangeSet of
changed items of a commit which " +
"is then used to speed up observation processing"
)
-@Property(name = "type", value = "changeCollectorProvider", propertyPrivate =
true)
+@Property(name = "type", value = ChangeCollectorProvider.TYPE, propertyPrivate
= true)
@Service(ValidatorProvider.class)
public class ChangeCollectorProvider extends ValidatorProvider {
+ public static final String TYPE = "changeCollectorProvider";
private static final Logger LOG =
LoggerFactory.getLogger(ChangeCollectorProvider.class);
@@ -212,7 +213,9 @@ public class ChangeCollectorProvider ext
// but if we're at the root, then we add the ChangeSet to the
// CommitContext of the CommitInfo
CommitContext commitContext = (CommitContext)
support.getInfo().getInfo().get(CommitContext.NAME);
- commitContext.set(COMMIT_CONTEXT_OBSERVATION_CHANGESET,
support.getChangeSetBuilder().build());
+ ChangeSet changeSet = support.getChangeSetBuilder().build();
+ commitContext.set(COMMIT_CONTEXT_OBSERVATION_CHANGESET, changeSet);
+ LOG.debug("Collected changeSet for commit {} is {}",
support.getInfo(), changeSet);
}
@Override
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1769750&r1=1769749&r2=1769750&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Tue Nov 15 07:49:45 2016
@@ -1631,7 +1631,7 @@ public class AsyncIndexUpdateTest {
}
}
- private static class CommitInfoCollector implements Observer {
+ static class CommitInfoCollector implements Observer {
List<CommitInfo> infos = Lists.newArrayList();
@Override
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java?rev=1769750&r1=1769749&r2=1769750&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
Tue Nov 15 07:49:45 2016
@@ -24,8 +24,18 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdateTest.CommitInfoCollector;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexerService.AsyncConfig;
+import
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.sling.testing.mock.osgi.MockOsgi;
import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
@@ -33,17 +43,22 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import static
org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static
org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static
org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
import static org.junit.Assert.*;
public class AsyncIndexerServiceTest {
@Rule
public final OsgiContext context = new OsgiContext();
+ private MemoryNodeStore nodeStore = new MemoryNodeStore();
private AsyncIndexerService service = new AsyncIndexerService();
@Before
public void setUp() {
- context.registerService(NodeStore.class, new MemoryNodeStore());
+ context.registerService(NodeStore.class, nodeStore);
+ context.registerService(ValidatorProvider.class, new
ChangeCollectorProvider());
MockOsgi.injectServices(service, context.bundleContext());
}
@@ -71,6 +86,35 @@ public class AsyncIndexerServiceTest {
assertEquals(TimeUnit.MINUTES.toMillis(20),
indexUpdate.getLeaseTimeOut());
}
+ @Test
+ public void changeCollectionEnabled() throws Exception{
+ Map<String,Object> config = ImmutableMap.<String, Object>of(
+ "asyncConfigs", new String[] {"async:5"}
+ );
+ context.registerService(IndexEditorProvider.class, new
PropertyIndexEditorProvider());
+ MockOsgi.activate(service, context.bundleContext(), config);
+
+ NodeBuilder builder = nodeStore.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+
+ // merge it back in
+ nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ CommitInfoCollector infoCollector = new CommitInfoCollector();
+ nodeStore.addObserver(infoCollector);
+
+ AsyncIndexUpdate indexUpdate = getIndexUpdate("async");
+ indexUpdate.run();
+
+ CommitContext commitContext = (CommitContext)
infoCollector.infos.get(0).getInfo().get(CommitContext.NAME);
+ assertNotNull(commitContext);
+ ChangeSet changeSet = (ChangeSet)
commitContext.get(ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET);
+ assertNotNull(changeSet);
+ }
+
private AsyncIndexUpdate getIndexUpdate(String name) {
return (AsyncIndexUpdate) context.getServices(Runnable.class,
"(oak.async="+name+")")[0];
}