Author: chetanm
Date: Tue Nov 15 07:49:33 2016
New Revision: 1769749
URL: http://svn.apache.org/viewvc?rev=1769749&view=rev
Log:
OAK-4935 - support prefiltering of async index updates
Pass ValidatorProviders as part of commit. Later this would be used to pass in
ChangeCollector validator
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1769749&r1=1769748&r2=1769749&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
Tue Nov 15 07:49:33 2016
@@ -29,7 +29,9 @@ import static org.apache.jackrabbit.oak.
import java.io.Closeable;
import java.util.Calendar;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
@@ -47,6 +49,7 @@ import javax.management.openmbean.OpenDa
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
+import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
@@ -61,9 +64,12 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider;
import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -183,6 +189,8 @@ public class AsyncIndexUpdate implements
*/
private long lastCheckpointCleanUpTime;
+ private List<ValidatorProvider> validatorProviders =
Collections.emptyList();
+
public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
@Nonnull IndexEditorProvider provider, boolean switchOnSync) {
this.name = checkNotNull(name);
@@ -226,6 +234,8 @@ public class AsyncIndexUpdate implements
private final AtomicBoolean forcedStop;
+ private List<ValidatorProvider> validatorProviders =
Collections.emptyList();
+
/** Expiration time of the last lease we committed */
private long lease;
@@ -260,7 +270,7 @@ public class AsyncIndexUpdate implements
NodeBuilder builder = root.builder();
NodeBuilder async = builder.child(ASYNC);
async.setProperty(leaseName, lease);
- mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease,
name);
+ mergeWithConcurrencyCheck(store, validatorProviders, builder,
checkpoint, beforeLease, name);
hasLease = true;
}
@@ -274,7 +284,7 @@ public class AsyncIndexUpdate implements
NodeBuilder async = builder.child(ASYNC);
updateTempCheckpoints(async, checkpoint, afterCheckpoint);
- mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name);
+ mergeWithConcurrencyCheck(store, validatorProviders, builder,
checkpoint, lease, name);
// reset updates counter
indexStats.resetUpdates();
@@ -312,7 +322,7 @@ public class AsyncIndexUpdate implements
NodeBuilder builder = store.getRoot().builder();
NodeBuilder async = builder.child(ASYNC);
async.removeProperty(leaseName);
- mergeWithConcurrencyCheck(store, builder, async.getString(name),
lease, name);
+ mergeWithConcurrencyCheck(store, validatorProviders, builder,
async.getString(name), lease, name);
}
@Override
@@ -328,7 +338,7 @@ public class AsyncIndexUpdate implements
long newLease = now + 2 * leaseTimeOut;
NodeBuilder builder = store.getRoot().builder();
builder.child(ASYNC).setProperty(leaseName, newLease);
- mergeWithConcurrencyCheck(store, builder, checkpoint,
lease, name);
+ mergeWithConcurrencyCheck(store, validatorProviders,
builder, checkpoint, lease, name);
lease = newLease;
}
}
@@ -337,6 +347,10 @@ public class AsyncIndexUpdate implements
public void setCheckpoint(String checkpoint) {
this.checkpoint = checkpoint;
}
+
+ public void setValidatorProviders(List<ValidatorProvider>
validatorProviders) {
+ this.validatorProviders = validatorProviders;
+ }
}
@Override
@@ -583,8 +597,10 @@ public class AsyncIndexUpdate implements
String name, long
leaseTimeOut, String beforeCheckpoint,
AsyncIndexStats
indexStats,
AtomicBoolean
stopFlag) {
- return new AsyncUpdateCallback(store, name, leaseTimeOut,
+ AsyncUpdateCallback callback = new AsyncUpdateCallback(store, name,
leaseTimeOut,
beforeCheckpoint, indexStats, stopFlag);
+ callback.setValidatorProviders(validatorProviders);
+ return callback;
}
protected boolean updateIndex(NodeState before, String beforeCheckpoint,
@@ -643,7 +659,7 @@ public class AsyncIndexUpdate implements
}
updatePostRunStatus = true;
}
- mergeWithConcurrencyCheck(store, builder, beforeCheckpoint,
+ mergeWithConcurrencyCheck(store, validatorProviders, builder,
beforeCheckpoint,
callback.lease, name);
if (indexUpdate.isReindexingPerformed()) {
log.info("[{}] Reindexing completed for indexes: {} in {}",
@@ -675,9 +691,9 @@ public class AsyncIndexUpdate implements
return name + "-temp";
}
- private static void mergeWithConcurrencyCheck(final NodeStore store,
- NodeBuilder builder, final String checkpoint, final long lease,
- final String name) throws CommitFailedException {
+ private static void mergeWithConcurrencyCheck(final NodeStore store,
List<ValidatorProvider> validatorProviders,
+ NodeBuilder builder, final
String checkpoint, final long lease,
+ final String name) throws
CommitFailedException {
CommitHook concurrentUpdateCheck = new CommitHook() {
@Override @Nonnull
public NodeState processCommit(
@@ -694,9 +710,12 @@ public class AsyncIndexUpdate implements
}
}
};
+ List<EditorProvider> editorProviders = Lists.newArrayList();
+ editorProviders.add(new ConflictValidatorProvider());
+ editorProviders.addAll(validatorProviders);
CompositeHook hooks = new CompositeHook(
new ConflictHook(new AnnotatingConflictHandler()),
- new EditorHook(new ConflictValidatorProvider()),
+ new
EditorHook(CompositeEditorProvider.compose(editorProviders)),
concurrentUpdateCheck);
try {
store.merge(builder, hooks, createCommitInfo());
@@ -732,6 +751,10 @@ public class AsyncIndexUpdate implements
return this;
}
+ public void setValidatorProviders(List<ValidatorProvider>
validatorProviders) {
+ this.validatorProviders = validatorProviders;
+ }
+
public boolean isClosed(){
return closed || forcedStopFlag.get();
}
@@ -1225,7 +1248,7 @@ public class AsyncIndexUpdate implements
}
if (!updated.isEmpty()) {
- mergeWithConcurrencyCheck(store, builder, refCheckpoint,
lease, name);
+ mergeWithConcurrencyCheck(store, validatorProviders, builder,
refCheckpoint, lease, name);
log.info(
"[{}] Successfully split index definitions {} to async
task named {} with referenced checkpoint {}.",
name, updated, newIndexTaskName, refCheckpoint);
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1769749&r1=1769748&r2=1769749&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Tue Nov 15 07:49:33 2016
@@ -25,6 +25,7 @@ import static org.apache.jackrabbit.oak.
import static
org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
import static
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE;
import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -51,11 +52,13 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.management.openmbean.CompositeData;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
import
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
@@ -66,9 +69,12 @@ import org.apache.jackrabbit.oak.query.i
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.DefaultValidator;
import org.apache.jackrabbit.oak.spi.commit.Editor;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.commit.Validator;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.query.PropertyValues;
import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -1557,6 +1563,74 @@ public class AsyncIndexUpdateTest {
assertNotNull(infoCollector.infos.get(0).getInfo().get(CommitContext.NAME));
}
+ @Test
+ public void validatorProviderInvocation() throws Exception{
+ MemoryNodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+
+ // merge it back in
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
provider);
+ CollectingValidatorProvider v = new CollectingValidatorProvider();
+
+ async.setValidatorProviders(ImmutableList.<ValidatorProvider>of(v));
+ async.run();
+
+ assertFalse(v.visitedPaths.isEmpty());
+ assertThat(v.visitedPaths, hasItem("/:async"));
+ assertThat(v.visitedPaths, hasItem("/oak:index/rootIndex"));
+
+ }
+
+ private static class CollectingValidatorProvider extends ValidatorProvider
{
+ final Set<String> visitedPaths = Sets.newHashSet();
+
+ @Override
+ protected Validator getRootValidator(NodeState before, NodeState
after, CommitInfo info) {
+ return new CollectingValidator("/");
+ }
+
+ public void reset(){
+ visitedPaths.clear();
+ }
+
+ private class CollectingValidator extends DefaultValidator {
+ private final String path;
+
+ public CollectingValidator(String path){
+ this.path = path;
+ }
+
+ @Override
+ public void enter(NodeState before, NodeState after) throws
CommitFailedException {
+ visitedPaths.add(path);
+ super.enter(before, after);
+ }
+
+ @Override
+ public Validator childNodeAdded(String name, NodeState after)
throws CommitFailedException {
+ return new CollectingValidator(PathUtils.concat(path, name));
+ }
+
+ @Override
+ public Validator childNodeChanged(String name, NodeState before,
NodeState after) throws CommitFailedException {
+ return new CollectingValidator(PathUtils.concat(path, name));
+ }
+
+ @Override
+ public Validator childNodeDeleted(String name, NodeState before)
throws CommitFailedException {
+ return new CollectingValidator(PathUtils.concat(path, name));
+ }
+ }
+ }
+
private static class CommitInfoCollector implements Observer {
List<CommitInfo> infos = Lists.newArrayList();