Author: alexparvulescu Date: Mon Mar 31 12:39:21 2014 New Revision: 1583316
URL: http://svn.apache.org/r1583316 Log: OAK-1456 Non-blocking reindexing Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/AsyncPropertyIndexTest.java (with props) Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1583316&r1=1583315&r2=1583316&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon Mar 31 12:39:21 2014 @@ -23,8 +23,14 @@ import static org.apache.jackrabbit.oak. import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE; import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE; import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_RUNNING; +import static org.apache.jackrabbit.oak.commons.PathUtils.elements; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; import java.util.Calendar; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -82,11 +88,28 @@ public class AsyncIndexUpdate implements private final AsyncIndexStats indexStats = new AsyncIndexStats(); + /** Flag to switch to synchronous updates once the index caught up to the repo */ + private final boolean switchOnSync; + + /** + * Set of reindexed definitions updated between runs because a single diff + * can report less definitions than there really are. Used in coordination + * with the switchOnSync flag, so we know which def need to be updated after + * a run with no changes. + */ + private final Set<String> reindexedDefinitions = new HashSet<String>(); + public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store, - @Nonnull IndexEditorProvider provider) { + @Nonnull IndexEditorProvider provider, boolean switchOnSync) { this.name = checkNotNull(name); this.store = checkNotNull(store); this.provider = checkNotNull(provider); + this.switchOnSync = switchOnSync; + } + + public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store, + @Nonnull IndexEditorProvider provider) { + this(name, store, provider, false); } /** @@ -113,8 +136,8 @@ public class AsyncIndexUpdate implements public synchronized void run() { log.debug("Running background index task {}", name); - if(isAlreadyRunning(store)){ - log.debug("Async job found to be already running. Skipping"); + if (isAlreadyRunning(store, name)) { + log.debug("Async job '{}' found to be already running. Skipping", name); return; } @@ -143,28 +166,45 @@ public class AsyncIndexUpdate implements CommitFailedException exception = EditorDiff.process(indexUpdate, before, after); - if (exception == null && callback.dirty) { - async.setProperty(name, checkpoint); - try { - store.merge(builder, new CommitHook() { - @Override @Nonnull - public NodeState processCommit( - NodeState before, NodeState after, CommitInfo info) - throws CommitFailedException { - // check for concurrent updates by this async task - PropertyState stateAfterRebase = before - .getChildNode(ASYNC).getProperty(name); - if (Objects.equal(state, stateAfterRebase)) { - return postAsyncRunStatus(after.builder(), indexStats) - .getNodeState(); - } else { - throw CONCURRENT_UPDATE; - } + if (exception == null) { + if (callback.dirty) { + async.setProperty(name, checkpoint); + try { + store.merge(builder, newCommitHook(name, state, indexStats), + CommitInfo.EMPTY); + } catch (CommitFailedException e) { + if (e != CONCURRENT_UPDATE) { + exception = e; + } + } + if (switchOnSync) { + reindexedDefinitions.addAll(indexUpdate + .getReindexedDefinitions()); + } + } else if (switchOnSync && !reindexedDefinitions.isEmpty()) { + log.debug("No changes detected after diff, will try to switch to synchronous updates on " + + reindexedDefinitions); + async.setProperty(name, checkpoint); + + // no changes after diff, switch to sync on the async defs + for (String path : reindexedDefinitions) { + NodeBuilder c = builder; + for (String p : elements(path)) { + c = c.getChildNode(p); + } + if (c.exists() && !c.getBoolean(REINDEX_PROPERTY_NAME)) { + c.removeProperty(ASYNC_PROPERTY_NAME); + } + } + + try { + store.merge(builder, newCommitHook(name, state, indexStats), + CommitInfo.EMPTY); + reindexedDefinitions.clear(); + } catch (CommitFailedException e) { + if (e != CONCURRENT_UPDATE) { + exception = e; } - }, CommitInfo.EMPTY); - } catch (CommitFailedException e) { - if (e != CONCURRENT_UPDATE) { - exception = e; } } } @@ -182,15 +222,36 @@ public class AsyncIndexUpdate implements } } + private static CommitHook newCommitHook(final String name, + final PropertyState state, final AsyncIndexStats indexStats) + throws CommitFailedException { + return new CommitHook() { + @Override + @Nonnull + public NodeState processCommit(NodeState before, NodeState after, + CommitInfo info) throws CommitFailedException { + // check for concurrent updates by this async task + PropertyState stateAfterRebase = before.getChildNode(ASYNC) + .getProperty(name); + if (Objects.equal(state, stateAfterRebase)) { + return postAsyncRunStatus(after.builder(), indexStats, name) + .getNodeState(); + } else { + throw CONCURRENT_UPDATE; + } + } + }; + } + private static void preAsyncRun(NodeStore store, String name, AsyncIndexStats stats) throws CommitFailedException { NodeBuilder builder = store.getRoot().builder(); - preAsyncRunStatus(builder, stats); + preAsyncRunStatus(builder, stats, name); store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); } - private static boolean isAlreadyRunning(NodeStore store) { - NodeState indexState = store.getRoot().getChildNode(IndexConstants.INDEX_DEFINITIONS_NAME); + private static boolean isAlreadyRunning(NodeStore store, String name) { + NodeState indexState = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME); //Probably the first run if (!indexState.exists()) { @@ -198,8 +259,8 @@ public class AsyncIndexUpdate implements } //Check if already running or timed out - if (STATUS_RUNNING.equals(indexState.getString("async-status"))) { - PropertyState startTime = indexState.getProperty("async-start"); + if (STATUS_RUNNING.equals(indexState.getString(name + "-status"))) { + PropertyState startTime = indexState.getProperty(name + "-start"); Calendar start = Conversions.convert(startTime.getValue(Type.DATE)).toCalendar(); Calendar now = Calendar.getInstance(); long delta = now.getTimeInMillis() - start.getTimeInMillis(); @@ -218,23 +279,23 @@ public class AsyncIndexUpdate implements } private static void preAsyncRunStatus(NodeBuilder builder, - AsyncIndexStats stats) { + AsyncIndexStats stats, String name) { String now = now(); stats.start(now); - builder.getChildNode(IndexConstants.INDEX_DEFINITIONS_NAME) - .setProperty("async-status", STATUS_RUNNING) - .setProperty("async-start", now, Type.DATE) - .removeProperty("async-done"); + builder.getChildNode(INDEX_DEFINITIONS_NAME) + .setProperty(name + "-status", STATUS_RUNNING) + .setProperty(name + "-start", now, Type.DATE) + .removeProperty(name + "-done"); } private static NodeBuilder postAsyncRunStatus(NodeBuilder builder, - AsyncIndexStats stats) { + AsyncIndexStats stats, String name) { String now = now(); stats.done(now); - builder.getChildNode(IndexConstants.INDEX_DEFINITIONS_NAME) - .setProperty("async-status", STATUS_DONE) - .setProperty("async-done", now, Type.DATE) - .removeProperty("async-start"); + builder.getChildNode(INDEX_DEFINITIONS_NAME) + .setProperty(name + "-status", STATUS_DONE) + .setProperty(name + "-done", now, Type.DATE) + .removeProperty(name + "-start"); return builder; } Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java?rev=1583316&r1=1583315&r2=1583316&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java Mon Mar 31 12:39:21 2014 @@ -31,8 +31,12 @@ public interface IndexConstants { String REINDEX_PROPERTY_NAME = "reindex"; + String REINDEX_ASYNC_PROPERTY_NAME = "reindex-async"; + String ASYNC_PROPERTY_NAME = "async"; - + + String ASYNC_REINDEX_VALUE = "async-reindex"; + String ENTRY_COUNT_PROPERTY_NAME = "entryCount"; /** Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java?rev=1583316&r1=1583315&r2=1583316&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java Mon Mar 31 12:39:21 2014 @@ -19,13 +19,20 @@ package org.apache.jackrabbit.oak.plugin import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.newArrayListWithCapacity; +import static org.apache.jackrabbit.oak.commons.PathUtils.concat; import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_REINDEX_VALUE; import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_ASYNC_PROPERTY_NAME; import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME; import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME; import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE; +import static org.apache.jackrabbit.oak.spi.commit.VisibleEditor.wrap; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -35,7 +42,6 @@ import org.apache.jackrabbit.oak.api.Pro import org.apache.jackrabbit.oak.spi.commit.CompositeEditor; import org.apache.jackrabbit.oak.spi.commit.Editor; import org.apache.jackrabbit.oak.spi.commit.EditorDiff; -import org.apache.jackrabbit.oak.spi.commit.VisibleEditor; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -51,6 +57,15 @@ class IndexUpdate implements Editor { private final NodeBuilder builder; + /** Parent updater, or {@code null} if this is the root updater. */ + private final IndexUpdate parent; + + /** Name of this node, or {@code null} for the root node. */ + private final String name; + + /** Path of this editor, built lazily in {@link #getPath()}. */ + private String path; + /** * Editors for indexes that will be normally updated. */ @@ -59,7 +74,7 @@ class IndexUpdate implements Editor { /** * Editors for indexes that need to be re-indexed. */ - private final List<Editor> reindex = newArrayList(); + private final Map<String, Editor> reindex = new HashMap<String, Editor>(); /** * Callback for the update events of the indexing job @@ -70,6 +85,9 @@ class IndexUpdate implements Editor { IndexEditorProvider provider, String async, NodeState root, NodeBuilder builder, IndexUpdateCallback updateCallback) { + this.parent = null; + this.name = null; + this.path = "/"; this.provider = checkNotNull(provider); this.async = async; this.root = checkNotNull(root); @@ -78,7 +96,8 @@ class IndexUpdate implements Editor { } private IndexUpdate(IndexUpdate parent, String name) { - checkNotNull(parent); + this.parent = checkNotNull(parent); + this.name = name; this.provider = parent.provider; this.async = parent.async; this.root = parent.root; @@ -93,7 +112,7 @@ class IndexUpdate implements Editor { // no-op when reindex is empty CommitFailedException exception = EditorDiff.process( - CompositeEditor.compose(reindex), MISSING_NODE, after); + CompositeEditor.compose(reindex.values()), MISSING_NODE, after); if (exception != null) { throw exception; } @@ -114,20 +133,37 @@ class IndexUpdate implements Editor { // trigger reindexing when an indexer becomes available definition.setProperty(REINDEX_PROPERTY_NAME, true); } else if (definition.getBoolean(REINDEX_PROPERTY_NAME)) { - definition.setProperty(REINDEX_PROPERTY_NAME, false); - // as we don't know the index content node name - // beforehand, we'll remove all child nodes - for (String rm : definition.getChildNodeNames()) { - definition.getChildNode(rm).remove(); + if (definition.getBoolean(REINDEX_ASYNC_PROPERTY_NAME) + && definition.getString(ASYNC_PROPERTY_NAME) == null) { + // switch index to an async update mode + definition.setProperty(ASYNC_PROPERTY_NAME, + ASYNC_REINDEX_VALUE); + } else { + definition.setProperty(REINDEX_PROPERTY_NAME, false); + // as we don't know the index content node name + // beforehand, we'll remove all child nodes + for (String rm : definition.getChildNodeNames()) { + definition.getChildNode(rm).remove(); + } + reindex.put(concat(getPath(), INDEX_DEFINITIONS_NAME, name), wrap(editor)); } - reindex.add(VisibleEditor.wrap(editor)); } else { - editors.add(VisibleEditor.wrap(editor)); + editors.add(wrap(editor)); } } } } + /** + * Returns the path of this node, building it lazily when first requested. + */ + private String getPath() { + if (path == null) { + path = concat(parent.getPath(), name); + } + return path; + } + @Override public void leave(NodeState before, NodeState after) throws CommitFailedException { @@ -202,4 +238,8 @@ class IndexUpdate implements Editor { return CompositeEditor.compose(children); } + protected Set<String> getReindexedDefinitions() { + return reindex.keySet(); + } + } Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1583316&r1=1583315&r2=1583316&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Mon Mar 31 12:39:21 2014 @@ -194,7 +194,7 @@ public class SegmentNodeStore implements } @Override @Nonnull - public String checkpoint(long lifetime) { + public synchronized String checkpoint(long lifetime) { checkArgument(lifetime > 0); String name = UUID.randomUUID().toString(); Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/AsyncPropertyIndexTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/AsyncPropertyIndexTest.java?rev=1583316&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/AsyncPropertyIndexTest.java (added) +++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/AsyncPropertyIndexTest.java Mon Mar 31 12:39:21 2014 @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.property; + +import static com.google.common.collect.ImmutableSet.of; +import static org.apache.jackrabbit.JcrConstants.JCR_SYSTEM; +import static org.apache.jackrabbit.JcrConstants.NT_BASE; +import static org.apache.jackrabbit.oak.api.Type.STRINGS; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_REINDEX_VALUE; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition; +import static org.apache.jackrabbit.oak.plugins.nodetype.NodeTypeConstants.JCR_NODE_TYPES; +import static org.apache.jackrabbit.oak.spi.commit.CommitInfo.EMPTY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.Set; + +import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate; +import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider; +import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider; +import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; +import org.apache.jackrabbit.oak.query.QueryEngineSettings; +import org.apache.jackrabbit.oak.query.ast.SelectorImpl; +import org.apache.jackrabbit.oak.query.index.FilterImpl; +import org.apache.jackrabbit.oak.spi.commit.EditorHook; +import org.apache.jackrabbit.oak.spi.query.Filter; +import org.apache.jackrabbit.oak.spi.query.PropertyValues; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +; + +/** + * Test the asynchronous reindexing ability of an synchronous index + */ +public class AsyncPropertyIndexTest { + + private IndexEditorProvider provider = new PropertyIndexEditorProvider(); + + private EditorHook hook = new EditorHook(new IndexUpdateProvider(provider)); + + @Test + public void testAsyncPropertyLookup() throws Exception { + NodeStore store = new MemoryNodeStore(); + + NodeBuilder builder = store.getRoot().builder(); + + //add a property index on 'foo' + NodeBuilder def = createIndexDefinition( + builder.child(INDEX_DEFINITIONS_NAME), "foo", true, false, + of("foo"), null); + def.setProperty(REINDEX_ASYNC_PROPERTY_NAME, true); + + // add some content + builder.child("a").setProperty("foo", "abc"); + builder.child("b").setProperty("foo", Arrays.asList("abc", "def"), + STRINGS); + NodeState head = store.merge(builder, hook, EMPTY); + + // query the index, check it doesn't get indexed by the normal PI + FilterImpl f = createFilter(head, NT_BASE); + PropertyIndexLookup lookup = new PropertyIndexLookup(head); + try { + assertEquals(of(), find(lookup, "foo", "abc", f)); + fail(); + } catch (IllegalArgumentException e) { + // expected: no index for "foo" + } + + // run async first time, there are some changes + AsyncIndexUpdate async = new AsyncIndexUpdate(ASYNC_REINDEX_VALUE, + store, provider, true); + async.run(); + assertEquals(ASYNC_REINDEX_VALUE, + store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME) + .getChildNode("foo").getString(ASYNC_PROPERTY_NAME)); + + // run async second time, there are no changes, should switch to sync + async.run(); + assertEquals(null, store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME) + .getChildNode("foo").getString(ASYNC_PROPERTY_NAME)); + + // add content, it should be indexed synchronously + builder = store.getRoot().builder(); + builder.child("c").setProperty("foo", "def"); + head = store.merge(builder, hook, EMPTY); + f = createFilter(head, NT_BASE); + lookup = new PropertyIndexLookup(head); + assertEquals(ImmutableSet.of("b", "c"), find(lookup, "foo", "def", f)); + } + + private static FilterImpl createFilter(NodeState root, String nodeTypeName) { + NodeState system = root.getChildNode(JCR_SYSTEM); + NodeState types = system.getChildNode(JCR_NODE_TYPES); + NodeState type = types.getChildNode(nodeTypeName); + SelectorImpl selector = new SelectorImpl(type, nodeTypeName); + return new FilterImpl(selector, "SELECT * FROM [" + nodeTypeName + "]", + new QueryEngineSettings()); + } + + private static Set<String> find(PropertyIndexLookup lookup, String name, + String value, Filter filter) { + return Sets.newHashSet(lookup.query(filter, name, value == null ? null + : PropertyValues.newString(value))); + } + +} Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/AsyncPropertyIndexTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain
