Author: tomekr Date: Wed Nov 2 12:16:32 2016 New Revision: 1767652 URL: http://svn.apache.org/viewvc?rev=1767652&view=rev Log: OAK-4871: Multiplexing NodeStore
Code provided by Robert Munteanu <[email protected]> Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountedNodeStore.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingContext.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeBuilder.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeState.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStore.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStoreService.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/package-info.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingChildrenCountTest.java jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingCompareTest.java jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/multiplex/ jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingMemoryFixture.java jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStoreBuilderTest.java jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStoreTest.java jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingSegmentFixture.java Modified: jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountInfo.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/SimpleMountInfoProvider.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mount.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/MountInfoProvider.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mounts.java jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/package-info.java jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/NodeStoreFixtures.java jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/spi/state/NodeStoreTest.java Modified: jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java?rev=1767652&r1=1767651&r2=1767652&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java (original) +++ jackrabbit/oak/trunk/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java Wed Nov 2 12:16:32 2016 @@ -43,7 +43,7 @@ public final class FixturesHelper { * default fixtures when no {@code nsfixtures} is provided */ public enum Fixture { - DOCUMENT_NS, SEGMENT_MK, DOCUMENT_RDB, MEMORY_NS, DOCUMENT_MEM, SEGMENT_TAR + DOCUMENT_NS, SEGMENT_MK, DOCUMENT_RDB, MEMORY_NS, DOCUMENT_MEM, SEGMENT_TAR, MULTIPLEXED_SEGMENT, MULTIPLEXED_MEM } private static final Set<Fixture> FIXTURES; Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountInfo.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountInfo.java?rev=1767652&r1=1767651&r2=1767652&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountInfo.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountInfo.java Wed Nov 2 12:16:32 2016 @@ -30,6 +30,7 @@ import com.google.common.collect.Iterabl import org.apache.jackrabbit.oak.spi.mount.Mount; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath; import static org.apache.jackrabbit.oak.commons.PathUtils.isAncestor; final class MountInfo implements Mount { @@ -67,6 +68,18 @@ final class MountInfo implements Mount { return true; } } + + return false; + } + + @Override + public boolean isDirectlyUnder(String path) { + path = SANITIZE_PATH.apply(path); + for (String includedPath : includedPaths) { + if (getParentPath(includedPath).equals(path)) { + return true; + } + } return false; } Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountedNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountedNodeStore.java?rev=1767652&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountedNodeStore.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MountedNodeStore.java Wed Nov 2 12:16:32 2016 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.multiplex; + +import org.apache.jackrabbit.oak.spi.mount.Mount; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +class MountedNodeStore { + + private final Mount mount; + + private final NodeStore nodeStore; + + public MountedNodeStore(Mount mount, NodeStore nodeStore) { + this.mount = mount; + this.nodeStore = nodeStore; + } + + public Mount getMount() { + return mount; + } + + public NodeStore getNodeStore() { + return nodeStore; + } + + boolean hasChildren(Iterable<String> children) { + // since we can't possibly know if a node matching the + // 'oak:mount-*' pattern exists below a given path + // we are forced to iterate for each node store + for (String childNodeName : children) { + if (childNodeName.startsWith(getMount().getPathFragmentName())) { + return true; + } + } + return false; + } +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingContext.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingContext.java?rev=1767652&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingContext.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingContext.java Wed Nov 2 12:16:32 2016 @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.multiplex; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.spi.mount.Mount; +import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.ImmutableMap.copyOf; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.uniqueIndex; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; + +class MultiplexingContext { + + private final MountInfoProvider mip; + + private final MountedNodeStore globalStore; + + private final List<MountedNodeStore> nonDefaultStores; + + private final Map<Mount, MountedNodeStore> nodeStoresByMount; + + MultiplexingContext(MountInfoProvider mip, NodeStore globalStore, List<MountedNodeStore> nonDefaultStores) { + this.mip = mip; + this.globalStore = new MountedNodeStore(mip.getDefaultMount(), globalStore); + this.nonDefaultStores = nonDefaultStores; + this.nodeStoresByMount = copyOf(uniqueIndex(getAllMountedNodeStores(), new Function<MountedNodeStore, Mount>() { + @Override + public Mount apply(MountedNodeStore input) { + return input.getMount(); + } + })); + } + + MountedNodeStore getGlobalStore() { + return globalStore; + } + + List<MountedNodeStore> getNonDefaultStores() { + return nonDefaultStores; + } + + MountedNodeStore getOwningStore(String path) { + Mount mount = mip.getMountByPath(path); + if (nodeStoresByMount.containsKey(mount)) { + return nodeStoresByMount.get(mount); + } else { + throw new IllegalArgumentException("Unable to find an owning store for path " + path); + } + } + + List<MountedNodeStore> getContributingStoresForNodes(String path, final Map<MountedNodeStore, NodeState> nodeStates) { + return getContributingStores(path, new Function<MountedNodeStore, Iterable<String>>() { + @Override + public Iterable<String> apply(MountedNodeStore input) { + return nodeStates.get(input).getChildNodeNames(); + } + }); + } + + List<MountedNodeStore> getContributingStoresForBuilders(String path, final Map<MountedNodeStore, NodeBuilder> nodeBuilders) { + return getContributingStores(path, new Function<MountedNodeStore, Iterable<String>>() { + @Override + public Iterable<String> apply(MountedNodeStore input) { + return nodeBuilders.get(input).getChildNodeNames(); + } + }); + } + + private List<MountedNodeStore> getContributingStores(String path, Function<MountedNodeStore, Iterable<String>> childrenProvider) { + Mount owningMount = mip.getMountByPath(path); + if (!owningMount.isDefault() && nodeStoresByMount.containsKey(owningMount)) { + MountedNodeStore nodeStore = nodeStoresByMount.get(owningMount); + if (nodeStore != globalStore) { + return singletonList(nodeStore); + } + } + + // scenario 2 - multiple mounts participate + List<MountedNodeStore> mountedStores = newArrayList(); + mountedStores.add(globalStore); + + // we need mounts placed exactly one level beneath this path + Collection<Mount> mounts = mip.getMountsPlacedDirectlyUnder(path); + + // query the mounts next + for (MountedNodeStore mountedNodeStore : nonDefaultStores) { + if (mounts.contains(mountedNodeStore.getMount())) { + mountedStores.add(mountedNodeStore); + } else { + if (mountedNodeStore.hasChildren(childrenProvider.apply(mountedNodeStore))) { + mountedStores.add(mountedNodeStore); + } + } + } + + return mountedStores; + } + + Iterable<MountedNodeStore> getAllMountedNodeStores() { + return concat(singleton(globalStore), nonDefaultStores); + } + + Blob createBlob(InputStream inputStream) throws IOException { + return globalStore.getNodeStore().createBlob(inputStream); + } + + int getStoresCount() { + return nonDefaultStores.size() + 1; + } + + Predicate<String> belongsToStore(final MountedNodeStore mountedNodeStore, final String parentPath) { + return new Predicate<String>() { + @Override + public boolean apply(String childName) { + return getOwningStore(PathUtils.concat(parentPath, childName)) == mountedNodeStore; + } + }; + } +} Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeBuilder.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeBuilder.java?rev=1767652&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeBuilder.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeBuilder.java Wed Nov 2 12:16:32 2016 @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.multiplex; + +import com.google.common.base.Function; +import com.google.common.base.Objects; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; +import org.apache.jackrabbit.oak.spi.state.MoveDetector; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.ImmutableMap.copyOf; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Maps.transformValues; +import static java.lang.Long.MAX_VALUE; +import static java.util.Collections.singleton; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE; +import static org.apache.jackrabbit.oak.plugins.multiplex.MultiplexingNodeState.STOP_COUNTING_CHILDREN; +import static org.apache.jackrabbit.oak.plugins.multiplex.MultiplexingNodeState.accumulateChildSizes; +import static org.apache.jackrabbit.oak.spi.state.AbstractNodeState.checkValidName; + +class MultiplexingNodeBuilder implements NodeBuilder { + + private final String path; + + private final MultiplexingContext ctx; + + private final Map<MountedNodeStore, NodeBuilder> nodeBuilders; + + private final Map<MountedNodeStore, NodeBuilder> rootBuilders; + + private final MountedNodeStore owningStore; + + MultiplexingNodeBuilder(String path, Map<MountedNodeStore, NodeBuilder> nodeBuilders, Map<MountedNodeStore, NodeBuilder> rootBuilders, MultiplexingContext ctx) { + checkArgument(nodeBuilders.size() == ctx.getStoresCount(), "Got %s builders but the context manages %s stores", nodeBuilders.size(), ctx.getStoresCount()); + checkArgument(rootBuilders.size() == ctx.getStoresCount(), "Got %s builders but the context manages %s stores", rootBuilders.size(), ctx.getStoresCount()); + + this.path = path; + this.ctx = ctx; + this.nodeBuilders = newHashMap(nodeBuilders); + this.rootBuilders = copyOf(rootBuilders); + this.owningStore = ctx.getOwningStore(path); + } + + Map<MountedNodeStore, NodeBuilder> getBuilders() { + return nodeBuilders; + } + + @Override + public NodeState getNodeState() { + Map<MountedNodeStore, NodeState> rootNodes = buildersToNodeStates(rootBuilders); + return new MultiplexingNodeState(path, rootNodes, ctx); + } + + @Override + public NodeState getBaseState() { + Map<MountedNodeStore, NodeState> rootNodes = buildersToBaseStates(rootBuilders); + return new MultiplexingNodeState(path, rootNodes, ctx); + } + + private static Map<MountedNodeStore, NodeState> buildersToNodeStates(Map<MountedNodeStore, NodeBuilder> builders) { + return copyOf(transformValues(builders, new Function<NodeBuilder, NodeState>() { + @Override + public NodeState apply(NodeBuilder input) { + return input.getNodeState(); + } + })); + } + + private static Map<MountedNodeStore, NodeState> buildersToBaseStates(Map<MountedNodeStore, NodeBuilder> builders) { + return copyOf(transformValues(builders, new Function<NodeBuilder, NodeState>() { + @Override + public NodeState apply(NodeBuilder input) { + return input.getBaseState(); + } + })); + } + + // node or property-related methods ; directly delegate to wrapped builder + @Override + public boolean exists() { + return getWrappedNodeBuilder().exists(); + } + + @Override + public boolean isNew() { + return getWrappedNodeBuilder().isNew(); + } + + @Override + public boolean isNew(String name) { + return getWrappedNodeBuilder().isNew(name); + } + + @Override + public boolean isModified() { + return getWrappedNodeBuilder().isModified(); + } + + @Override + public boolean isReplaced() { + return getWrappedNodeBuilder().isReplaced(); + } + + @Override + public boolean isReplaced(String name) { + return getWrappedNodeBuilder().isReplaced(name); + } + + @Override + public long getPropertyCount() { + return getWrappedNodeBuilder().getPropertyCount(); + } + + @Override + public Iterable<? extends PropertyState> getProperties() { + return getWrappedNodeBuilder().getProperties(); + } + + @Override + public boolean hasProperty(String name) { + return getWrappedNodeBuilder().hasProperty(name); + } + + @Override + public PropertyState getProperty(String name) { + return getWrappedNodeBuilder().getProperty(name); + } + + @Override + public boolean getBoolean(String name) { + return getWrappedNodeBuilder().getBoolean(name); + } + + @Override + public String getString(String name) { + return getWrappedNodeBuilder().getString(name); + } + + @Override + public String getName(String name) { + return getWrappedNodeBuilder().getName(name); + } + + @Override + public Iterable<String> getNames(String name) { + return getWrappedNodeBuilder().getNames(name); + } + + @Override + public NodeBuilder setProperty(PropertyState property) throws IllegalArgumentException { + getWrappedNodeBuilder().setProperty(property); + return this; + } + + @Override + public <T> NodeBuilder setProperty(String name, T value) throws IllegalArgumentException { + getWrappedNodeBuilder().setProperty(name, value); + return this; + } + + @Override + public <T> NodeBuilder setProperty(String name, T value, Type<T> type) throws IllegalArgumentException { + getWrappedNodeBuilder().setProperty(name, value, type); + return this; + } + + @Override + public NodeBuilder removeProperty(String name) { + getWrappedNodeBuilder().removeProperty(name); + return this; + } + + // child-related methods, require multiplexing + @Override + public long getChildNodeCount(final long max) { + List<MountedNodeStore> contributingStores = ctx.getContributingStoresForBuilders(path, nodeBuilders); + if (contributingStores.isEmpty()) { + return 0; // this shouldn't happen + } else if (contributingStores.size() == 1) { + return getWrappedNodeBuilder().getChildNodeCount(max); + } else { + // Count the children in each contributing store. + return accumulateChildSizes(concat(transform(contributingStores, new Function<MountedNodeStore, Iterable<String>>() { + @Override + public Iterable<String> apply(MountedNodeStore input) { + NodeBuilder contributing = nodeBuilders.get(input); + if (contributing.getChildNodeCount(max) == MAX_VALUE) { + return singleton(STOP_COUNTING_CHILDREN); + } else { + return filter(contributing.getChildNodeNames(), ctx.belongsToStore(input, path)); + } + } + })), max); + } + } + + @Override + public Iterable<String> getChildNodeNames() { + return concat(transform(ctx.getContributingStoresForBuilders(path, nodeBuilders), new Function<MountedNodeStore, Iterable<String>>() { + @Override + public Iterable<String> apply(final MountedNodeStore mountedNodeStore) { + return filter(nodeBuilders.get(mountedNodeStore).getChildNodeNames(), ctx.belongsToStore(mountedNodeStore, path)); + } + })); + } + + @Override + public boolean hasChildNode(String name) { + String childPath = PathUtils.concat(path, name); + MountedNodeStore mountedStore = ctx.getOwningStore(childPath); + return nodeBuilders.get(mountedStore).hasChildNode(name); + } + + @Override + public NodeBuilder child(String name) { + String childPath = PathUtils.concat(path, name); + MountedNodeStore mountedNodeStore = ctx.getOwningStore(childPath); + createAncestors(mountedNodeStore); + nodeBuilders.get(mountedNodeStore).child(name); + return getChildNode(name); + } + + private void createAncestors(MountedNodeStore mountedNodeStore) { + if (mountedNodeStore == owningStore) { + return; + } + if (nodeBuilders.get(mountedNodeStore).exists()) { + return; + } + NodeBuilder builder = rootBuilders.get(mountedNodeStore); + for (String element : PathUtils.elements(path)) { + builder = builder.child(element); + } + nodeBuilders.put(mountedNodeStore, builder); + } + + @Override + public NodeBuilder getChildNode(String name) { + String childPath = PathUtils.concat(path, name); + MountedNodeStore mountedStore = ctx.getOwningStore(childPath); + if (!nodeBuilders.get(mountedStore).hasChildNode(name)) { + return MISSING_NODE.builder(); + } + + Map<MountedNodeStore, NodeBuilder> newNodeBuilders = newHashMap(); + for (MountedNodeStore mns : ctx.getAllMountedNodeStores()) { + newNodeBuilders.put(mns, nodeBuilders.get(mns).getChildNode(name)); + } + return new MultiplexingNodeBuilder(childPath, newNodeBuilders, rootBuilders, ctx); + } + + @Override + public NodeBuilder setChildNode(String name) throws IllegalArgumentException { + return setChildNode(name, EmptyNodeState.EMPTY_NODE); + } + + @Override + public NodeBuilder setChildNode(String name, NodeState nodeState) { + String childPath = PathUtils.concat(path, name); + MountedNodeStore mountedNodeStore = ctx.getOwningStore(childPath); + createAncestors(mountedNodeStore); + nodeBuilders.get(mountedNodeStore).setChildNode(name, nodeState); + return getChildNode(name); + } + + @Override + public boolean remove() { + return getWrappedNodeBuilder().remove(); + } + + @Override + public boolean moveTo(NodeBuilder newParent, String newName) { + checkNotNull(newParent); + checkValidName(newName); + if ("/".equals(path) || !exists() || newParent.hasChildNode(newName)) { + return false; + } else { + if (newParent.exists()) { + annotateSourcePath(); + NodeState nodeState = getNodeState(); + newParent.setChildNode(newName, nodeState); + remove(); + return true; + } else { + return false; + } + } + } + + @Override + public Blob createBlob(InputStream stream) throws IOException { + return ctx.createBlob(stream); + } + + private NodeBuilder getWrappedNodeBuilder() { + return nodeBuilders.get(owningStore); + } + + private void annotateSourcePath() { + String sourcePath = getSourcePath(); + if (!isTransientlyAdded(sourcePath)) { + setProperty(MoveDetector.SOURCE_PATH, sourcePath); + } + } + + private final String getSourcePath() { + // Traverse up the hierarchy until we encounter the first builder + // having a source path annotation or until we hit the root + MultiplexingNodeBuilder builder = this; + String sourcePath = getSourcePathAnnotation(builder); + while (sourcePath == null && !"/".equals(builder.path)) { + String parentPath = PathUtils.getParentPath(builder.path); + builder = getBuilderByPath(parentPath); + sourcePath = getSourcePathAnnotation(builder); + } + + if (sourcePath == null) { + // Neither self nor any parent has a source path annotation. The source + // path is just the path of this builder + return path; + } else { + // The source path is the source path of the first parent having a source + // path annotation with the relative path from this builder up to that + // parent appended. + return PathUtils.concat(sourcePath, + PathUtils.relativize(builder.path, path)); + } + } + + private static String getSourcePathAnnotation(MultiplexingNodeBuilder builder) { + PropertyState base = builder.getBaseState().getProperty(MoveDetector.SOURCE_PATH); + PropertyState head = builder.getNodeState().getProperty(MoveDetector.SOURCE_PATH); + if (Objects.equal(base, head)) { + // Both null: no source path annotation + // Both non null but equals: source path annotation is from a previous commit + return null; + } else { + return head.getValue(Type.STRING); + } + } + + private boolean isTransientlyAdded(String sourcePath) { + NodeState node = rootBuilders.get(owningStore).getBaseState(); + for (String name : PathUtils.elements(sourcePath)) { + node = node.getChildNode(name); + } + return !node.exists(); + } + + private MultiplexingNodeBuilder getBuilderByPath(String path) { + return new MultiplexingNodeBuilder(path, getBuildersByPath(rootBuilders, path), rootBuilders, ctx); + } + + private static Map<MountedNodeStore, NodeBuilder> getBuildersByPath(Map<MountedNodeStore, NodeBuilder> rootNodes, final String path) { + return copyOf(transformValues(rootNodes, new Function<NodeBuilder, NodeBuilder>() { + @Override + public NodeBuilder apply(NodeBuilder input) { + NodeBuilder result = input; + for (String element : PathUtils.elements(path)) { + if (result.hasChildNode(element)) { + result = result.getChildNode(element); + } else { + result = MISSING_NODE.builder(); + break; + } + } + return result; + } + })); + } +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeState.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeState.java?rev=1767652&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeState.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeState.java Wed Nov 2 12:16:32 2016 @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.multiplex; + +import com.google.common.base.Function; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.plugins.memory.MemoryChildNodeEntry; +import org.apache.jackrabbit.oak.spi.state.AbstractNodeState; +import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; + +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Predicates.compose; +import static com.google.common.collect.ImmutableMap.copyOf; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Maps.transformValues; +import static java.lang.Long.MAX_VALUE; +import static java.util.Collections.singleton; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE; +import static org.apache.jackrabbit.oak.spi.state.ChildNodeEntry.GET_NAME; + +class MultiplexingNodeState extends AbstractNodeState { + + // A note on content held by node stores which is outside the mount boundaries + // + // As a matter of design, mounted stores will definitely hold information _above_ + // their mounted, path, e.g. a store mounted at /a/b/c will definitely have nodes + // /a and /a/b, which will not be visible through the multiplexing node store. + // + // If a node store holds information _below_ a path which belongs to another + // repository, the multiplexing node store will not consider that information. + // + // For instance, with a node store mounted at /libs and the root store + // having a node at /libs/food, both the /libs and /libs/foo nodes from + // the root store will be ignored + + static final String STOP_COUNTING_CHILDREN = new String(MultiplexingNodeState.class.getName() + ".stopCountingChildren"); + + private final String path; + + private final MultiplexingContext ctx; + + private final MountedNodeStore owningStore; + + private final Map<MountedNodeStore, NodeState> rootStates; + + private final Map<MountedNodeStore, NodeState> nodeStates; + + MultiplexingNodeState(String path, Map<MountedNodeStore, NodeState> rootStates, MultiplexingContext ctx) { + checkArgument(rootStates.size() == ctx.getStoresCount(), "Got %s node states but the context manages %s stores", rootStates.size(), ctx.getStoresCount()); + + this.path = path; + this.ctx = ctx; + this.rootStates = copyOf(rootStates); + this.nodeStates = copyOf(getNodesByPath(this.rootStates, path)); + this.owningStore = ctx.getOwningStore(path); + } + + MultiplexingNodeState(String path, Map<MountedNodeStore, NodeState> nodeStates, Map<MountedNodeStore, NodeState> rootStates, MultiplexingContext ctx) { + checkArgument(nodeStates.size() == ctx.getStoresCount(), "Got %s node states but the context manages %s stores", nodeStates.size(), ctx.getStoresCount()); + checkArgument(rootStates.size() == ctx.getStoresCount(), "Got %s node states but the context manages %s stores", rootStates.size(), ctx.getStoresCount()); + + this.path = path; + this.ctx = ctx; + this.rootStates = copyOf(rootStates); + this.nodeStates = copyOf(nodeStates); + this.owningStore = ctx.getOwningStore(path); + } + + @Override + public boolean exists() { + return getWrappedNodeState().exists(); + } + + // delegate all property access to wrapped node + @Override + public boolean hasProperty(String name) { + return getWrappedNodeState().hasProperty(name); + } + + @Override + public PropertyState getProperty(String name) { + return getWrappedNodeState().getProperty(name); + } + + @Override + public long getPropertyCount() { + return getWrappedNodeState().getPropertyCount(); + } + + @Override + public Iterable<? extends PropertyState> getProperties() { + return getWrappedNodeState().getProperties(); + } + + // child node operations + @Override + public boolean hasChildNode(String name) { + String childPath = PathUtils.concat(path, name); + MountedNodeStore mountedStore = ctx.getOwningStore(childPath); + return nodeStates.get(mountedStore).hasChildNode(name); + } + + @Override + public NodeState getChildNode(String name) { + String childPath = PathUtils.concat(path, name); + MountedNodeStore mountedStore = ctx.getOwningStore(childPath); + if (!nodeStates.get(mountedStore).hasChildNode(name)) { + return MISSING_NODE; + } + + Map<MountedNodeStore, NodeState> newNodeStates = newHashMap(); + for (MountedNodeStore mns : ctx.getAllMountedNodeStores()) { + newNodeStates.put(mns, nodeStates.get(mns).getChildNode(name)); + } + return new MultiplexingNodeState(childPath, newNodeStates, rootStates, ctx); + } + + @Override + public long getChildNodeCount(final long max) { + List<MountedNodeStore> contributingStores = ctx.getContributingStoresForNodes(path, nodeStates); + if (contributingStores.isEmpty()) { + return 0; // this shouldn't happen + } else if (contributingStores.size() == 1) { + return getWrappedNodeState().getChildNodeCount(max); + } else { + // Count the children in each contributing store. + return accumulateChildSizes(concat(transform(contributingStores, new Function<MountedNodeStore, Iterable<String>>() { + @Override + public Iterable<String> apply(MountedNodeStore mns) { + NodeState contributing = nodeStates.get(mns); + if (contributing.getChildNodeCount(max) == MAX_VALUE) { + return singleton(STOP_COUNTING_CHILDREN); + } else { + return filter(contributing.getChildNodeNames(), ctx.belongsToStore(mns, path)); + } + } + })), max); + } + } + + static long accumulateChildSizes(Iterable<String> nodeNames, long max) { + long totalCount = 0; + for (String name : nodeNames) { + totalCount++; + if (name == STOP_COUNTING_CHILDREN || totalCount >= max) { + return MAX_VALUE; + } + } + return totalCount; + } + + @Override + public Iterable<? extends ChildNodeEntry> getChildNodeEntries() { + Iterable<? extends ChildNodeEntry> nativeChildren = concat(transform(ctx.getContributingStoresForNodes(path, nodeStates), new Function<MountedNodeStore, Iterable<? extends ChildNodeEntry>>() { + @Override + public Iterable<? extends ChildNodeEntry> apply(final MountedNodeStore mountedNodeStore) { + return filter(nodeStates.get(mountedNodeStore).getChildNodeEntries(), compose(ctx.belongsToStore(mountedNodeStore, path), GET_NAME)); + } + })); + return transform(nativeChildren, new Function<ChildNodeEntry, ChildNodeEntry>() { + @Override + public ChildNodeEntry apply(ChildNodeEntry input) { + NodeState wrapped = getChildNode(input.getName()); + return new MemoryChildNodeEntry(input.getName(), wrapped); + } + }); + } + + @Override + public boolean compareAgainstBaseState(NodeState base, NodeStateDiff diff) { + if (base instanceof MultiplexingNodeState) { + MultiplexingNodeState multiBase = (MultiplexingNodeState) base; + NodeStateDiff wrappingDiff = new WrappingDiff(diff, multiBase); + boolean full = getWrappedNodeState().compareAgainstBaseState(multiBase.getWrappedNodeState(), new ChildrenDiffFilter(wrappingDiff, owningStore, true)); + for (MountedNodeStore mns : ctx.getContributingStoresForNodes(path, nodeStates)) { + if (owningStore == mns) { + continue; + } + NodeStateDiff childrenDiffFilter = new ChildrenDiffFilter(wrappingDiff, mns, false); + NodeState contributing = nodeStates.get(mns); + NodeState contributingBase = multiBase.nodeStates.get(mns); + full = full && contributing.compareAgainstBaseState(contributingBase, childrenDiffFilter); + } + return full; + } else { + return super.compareAgainstBaseState(base, diff); + } + } + + // write operations + @Override + public NodeBuilder builder() { + Map<MountedNodeStore, NodeBuilder> rootBuilders = copyOf(transformValues(rootStates, new Function<NodeState, NodeBuilder>() { + @Override + public NodeBuilder apply(NodeState input) { + return input.builder(); + } + })); + Map<MountedNodeStore, NodeBuilder> nodeBuilders = copyOf(transformValues(rootBuilders, new Function<NodeBuilder, NodeBuilder>() { + @Override + public NodeBuilder apply(NodeBuilder input) { + NodeBuilder result = input; + for (String element : PathUtils.elements(path)) { + if (result.hasChildNode(element)) { + result = result.getChildNode(element); + } else { + result = MISSING_NODE.builder(); + break; + } + } + return result; + } + })); + return new MultiplexingNodeBuilder(path, nodeBuilders, rootBuilders, ctx); + } + + private NodeState getWrappedNodeState() { + return nodeStates.get(owningStore); + } + + private static Map<MountedNodeStore, NodeState> getNodesByPath(Map<MountedNodeStore, NodeState> rootNodes, final String path) { + return copyOf(transformValues(rootNodes, new Function<NodeState, NodeState>() { + @Override + public NodeState apply(NodeState input) { + NodeState result = input; + for (String element : PathUtils.elements(path)) { + if (result.hasChildNode(element)) { + result = result.getChildNode(element); + } else { + result = MISSING_NODE; + break; + } + } + return result; + } + })); + } + + private class ChildrenDiffFilter implements NodeStateDiff { + + private final NodeStateDiff diff; + + private final MountedNodeStore mns; + + private final boolean includeProperties; + + public ChildrenDiffFilter(NodeStateDiff diff, MountedNodeStore mns, boolean includeProperties) { + this.diff = diff; + this.mns = mns; + this.includeProperties = includeProperties; + } + + @Override + public boolean propertyAdded(PropertyState after) { + if (includeProperties) { + return diff.propertyAdded(after); + } else { + return true; + } + } + + @Override + public boolean propertyChanged(PropertyState before, PropertyState after) { + if (includeProperties) { + return diff.propertyChanged(before, after); + } else { + return true; + } + } + + @Override + public boolean propertyDeleted(PropertyState before) { + if (includeProperties) { + return diff.propertyDeleted(before); + } else { + return true; + } + } + + @Override + public boolean childNodeAdded(String name, NodeState after) { + if (belongsToNodeStore(name)) { + return diff.childNodeAdded(name, after); + } else { + return true; + } + } + + @Override + public boolean childNodeChanged(String name, NodeState before, NodeState after) { + if (belongsToNodeStore(name)) { + return diff.childNodeChanged(name, before, after); + } else { + return true; + } + } + + @Override + public boolean childNodeDeleted(String name, NodeState before) { + if (belongsToNodeStore(name)) { + return diff.childNodeDeleted(name, before); + } else { + return true; + } + } + + private boolean belongsToNodeStore(String name) { + return ctx.getOwningStore(PathUtils.concat(path, name)) == mns; + } + } + + private class WrappingDiff implements NodeStateDiff { + + private final NodeStateDiff diff; + + private final MultiplexingNodeState base; + + public WrappingDiff(NodeStateDiff diff, MultiplexingNodeState base) { + this.diff = diff; + this.base = base; + } + + @Override + public boolean propertyAdded(PropertyState after) { + return diff.propertyAdded(after); + } + + @Override + public boolean propertyChanged(PropertyState before, PropertyState after) { + return diff.propertyChanged(before, after); + } + + @Override + public boolean propertyDeleted(PropertyState before) { + return diff.propertyDeleted(before); + } + + @Override + public boolean childNodeAdded(String name, NodeState after) { + return diff.childNodeAdded(name, wrapAfter(name)); + } + + @Override + public boolean childNodeChanged(String name, NodeState before, NodeState after) { + return diff.childNodeChanged(name, wrapBefore(name), wrapAfter(name)); + } + + @Override + public boolean childNodeDeleted(String name, NodeState before) { + return diff.childNodeDeleted(name, wrapBefore(name)); + } + + private NodeState wrapBefore(String name) { + return base.getChildNode(name); + } + + private NodeState wrapAfter(String name) { + return MultiplexingNodeState.this.getChildNode(name); + } + } + +} Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStore.java?rev=1767652&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStore.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStore.java Wed Nov 2 12:16:32 2016 @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.multiplex; + +import com.google.common.base.Predicate; +import com.google.common.collect.Lists; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.spi.commit.CommitHook; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.commit.Observable; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.mount.Mount; +import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider; +import org.apache.jackrabbit.oak.spi.state.ApplyDiff; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Predicates.isNull; +import static com.google.common.collect.ImmutableMap.copyOf; +import static com.google.common.collect.Iterables.any; +import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Maps.filterKeys; +import static com.google.common.collect.Maps.newHashMap; + +/** + * A {@link NodeStore} implementation that multiplexes other {@link NodeStore} instances + * mounted under paths defined by {@link MountInfo}. + * + * <p>The main objective of this implementation is to multiplex operations working on + * at most single read-write store with any number of read-only stores. While the + * multiplexing would technically work at the NodeStore level there are several + * less-than-obvious issues which prevent it: + * <ol> + * <li>Thread safety of the write operation can be quite costly, and will come on top + * of the thread safety measures already put in place by the multiplexed node stores.</li> + * <li>Many JCR subsystems require global state, e.g. the versioning store. This global state + * can become corrupt if multiple mounts operate on it or if mounts are added and removed.</li> + * </ol> + * + * As such, the only supported configuration is at most a single write-enabled store.</p> + * + * <p>Because of the limitation described above, right now the only correct way to use + * MultiplexingNodeStore is to create a normal repository, split it into parts + * using oak-upgrade {@code --{include,exclude}-paths} and then configure this + * node store implementation to multiplex split parts together.</p> + */ +public class MultiplexingNodeStore implements NodeStore, Observable { + + private static final String CHECKPOINT_ID_PREFIX = "multiplexing.checkpoint."; + + final MultiplexingContext ctx; + + private final List<Observer> observers = new CopyOnWriteArrayList<>(); + + // visible for testing only + MultiplexingNodeStore(MountInfoProvider mip, NodeStore globalStore, List<MountedNodeStore> nonDefaultStore) { + this.ctx = new MultiplexingContext(mip, globalStore, nonDefaultStore); + } + + @Override + public NodeState getRoot() { + // the multiplexed root state exposes the node states as they are + // at this certain point in time, so we eagerly retrieve them from all stores + Map<MountedNodeStore, NodeState> nodeStates = newHashMap(); + for (MountedNodeStore nodeStore : ctx.getAllMountedNodeStores()) { + nodeStates.put(nodeStore, nodeStore.getNodeStore().getRoot()); + } + return createRootNodeState(nodeStates); + } + + @Override + public NodeState merge(NodeBuilder builder, CommitHook commitHook, CommitInfo info) throws CommitFailedException { + checkArgument(builder instanceof MultiplexingNodeBuilder); + MultiplexingNodeBuilder nodeBuilder = (MultiplexingNodeBuilder) builder; + + // run commit hooks and apply the changes to the builder instance + NodeState processed = commitHook.processCommit(getRoot(), rebase(nodeBuilder), info); + processed.compareAgainstBaseState(builder.getNodeState(), new ApplyDiff(nodeBuilder)); + + assertNoChangesOnReadOnlyMounts(nodeBuilder); + + // apply the accumulated changes on individual NodeStore instances + Map<MountedNodeStore, NodeState> resultStates = newHashMap(); + for (MountedNodeStore mountedNodeStore : ctx.getAllMountedNodeStores()) { + NodeStore nodeStore = mountedNodeStore.getNodeStore(); + NodeBuilder partialBuilder = nodeBuilder.getBuilders().get(mountedNodeStore); + NodeState result = nodeStore.merge(partialBuilder, EmptyHook.INSTANCE, info); + resultStates.put(mountedNodeStore, result); + } + MultiplexingNodeState newRoot = createRootNodeState(resultStates); + + for (Observer observer : observers) { + observer.contentChanged(newRoot, info); + } + return newRoot; + } + + private void assertNoChangesOnReadOnlyMounts(MultiplexingNodeBuilder nodeBuilder) throws CommitFailedException { + for (MountedNodeStore mountedNodeStore : ctx.getAllMountedNodeStores()) { + if (!mountedNodeStore.getMount().isReadOnly()) { + continue; + } + NodeBuilder partialBuilder = nodeBuilder.getBuilders().get(mountedNodeStore); + if (!partialBuilder.getNodeState().equals(partialBuilder.getBaseState())) { + // TODO - add proper error code + throw new CommitFailedException("Multiplex", 31, "Unable to perform changes on read-only mount " + mountedNodeStore.getMount()); + } + } + } + + @Override + public NodeState rebase(NodeBuilder builder) { + checkArgument(builder instanceof MultiplexingNodeBuilder); + + MultiplexingNodeBuilder nodeBuilder = (MultiplexingNodeBuilder) builder; + Map<MountedNodeStore, NodeState> resultStates = newHashMap(); + for (MountedNodeStore mountedNodeStore : ctx.getAllMountedNodeStores()) { + NodeStore nodeStore = mountedNodeStore.getNodeStore(); + NodeBuilder partialBuilder = nodeBuilder.getBuilders().get(mountedNodeStore); + NodeState result = nodeStore.rebase(partialBuilder); + resultStates.put(mountedNodeStore, result); + } + return createRootNodeState(resultStates); + } + + @Override + public NodeState reset(NodeBuilder builder) { + checkArgument(builder instanceof MultiplexingNodeBuilder); + + MultiplexingNodeBuilder nodeBuilder = (MultiplexingNodeBuilder) builder; + Map<MountedNodeStore, NodeState> resultStates = newHashMap(); + for (MountedNodeStore mountedNodeStore : ctx.getAllMountedNodeStores()) { + NodeStore nodeStore = mountedNodeStore.getNodeStore(); + NodeBuilder partialBuilder = nodeBuilder.getBuilders().get(mountedNodeStore); + NodeState result = nodeStore.reset(partialBuilder); + resultStates.put(mountedNodeStore, result); + } + return createRootNodeState(resultStates); + } + + private MultiplexingNodeState createRootNodeState(Map<MountedNodeStore, NodeState> rootStates) { + return new MultiplexingNodeState("/", rootStates, ctx); + } + + @Override + public Blob createBlob(InputStream inputStream) throws IOException { + // since there is no way to infer a path for a blob, we create all blobs in the root store + return ctx.createBlob(inputStream); + } + + @Override + public Blob getBlob(String reference) { + for (MountedNodeStore nodeStore : ctx.getAllMountedNodeStores()) { + Blob found = nodeStore.getNodeStore().getBlob(reference); + if (found != null) { + return found; + } + } + return null; + } + + public Iterable<String> checkpoints() { + final NodeStore globalNodeStore = ctx.getGlobalStore().getNodeStore(); + return filter(globalNodeStore.checkpoints(), new Predicate<String>() { + @Override + public boolean apply(String checkpoint) { + return isMultiplexingCheckpoint(checkpoint); + } + }); + } + + private boolean isMultiplexingCheckpoint(String checkpoint) { + Map<String, String> props = ctx.getGlobalStore().getNodeStore().checkpointInfo(checkpoint); + if (props == null) { + return false; + } + for (MountedNodeStore mns : ctx.getNonDefaultStores()) { + if (!props.containsKey(CHECKPOINT_ID_PREFIX + mns.getMount().getName())) { + return false; + } + } + return true; + } + + @Override + public String checkpoint(long lifetime, Map<String, String> properties) { + Map<String, String> globalProperties = newHashMap(properties); + for (MountedNodeStore mns : ctx.getNonDefaultStores()) { + String checkpoint = mns.getNodeStore().checkpoint(lifetime, properties); + globalProperties.put(CHECKPOINT_ID_PREFIX + mns.getMount().getName(), checkpoint); + } + return ctx.getGlobalStore().getNodeStore().checkpoint(lifetime, globalProperties); + } + + + @Override + public String checkpoint(long lifetime) { + return checkpoint(lifetime, Collections. <String, String> emptyMap()); + } + + @Override + public Map<String, String> checkpointInfo(String checkpoint) { + return copyOf(filterKeys(ctx.getGlobalStore().getNodeStore().checkpointInfo(checkpoint), new Predicate<String>() { + @Override + public boolean apply(String input) { + return !input.startsWith(CHECKPOINT_ID_PREFIX); + } + })); + } + + @Override + public NodeState retrieve(String checkpoint) { + Map<String, String> props = ctx.getGlobalStore().getNodeStore().checkpointInfo(checkpoint); + if (props == null) { + return null; + } + Map<MountedNodeStore, NodeState> nodeStates = newHashMap(); + nodeStates.put(ctx.getGlobalStore(), ctx.getGlobalStore().getNodeStore().retrieve(checkpoint)); + for (MountedNodeStore nodeStore : ctx.getNonDefaultStores()) { + String partialCheckpoint = props.get(CHECKPOINT_ID_PREFIX + nodeStore.getMount().getName()); + if (partialCheckpoint == null) { + return null; + } else { + nodeStates.put(nodeStore, nodeStore.getNodeStore().retrieve(partialCheckpoint)); + } + } + if (any(nodeStates.values(), isNull())) { + return null; + } + return new MultiplexingNodeState("/", nodeStates, ctx); + } + + @Override + public boolean release(String checkpoint) { + Map<String, String> props = ctx.getGlobalStore().getNodeStore().checkpointInfo(checkpoint); + if (props == null) { + return false; + } + boolean result = ctx.getGlobalStore().getNodeStore().release(checkpoint); + for (MountedNodeStore nodeStore : ctx.getNonDefaultStores()) { + String partialCheckpoint = props.get(CHECKPOINT_ID_PREFIX + nodeStore.getMount().getName()); + if (partialCheckpoint == null) { + result = false; + } else { + result = nodeStore.getNodeStore().release(partialCheckpoint) && result; + } + } + return result; + } + + @Override + public Closeable addObserver(final Observer observer) { + observer.contentChanged(getRoot(), null); + observers.add(observer); + return new Closeable() { + @Override + public void close() throws IOException { + observers.remove(observer); + } + }; + } + + public static class Builder { + + private final MountInfoProvider mip; + + private final NodeStore globalStore; + + private final List<MountedNodeStore> nonDefaultStores = Lists.newArrayList(); + + public Builder(MountInfoProvider mip, NodeStore globalStore) { + this.mip = checkNotNull(mip, "mountInfoProvider"); + this.globalStore = checkNotNull(globalStore, "globalStore"); + } + + public Builder addMount(String mountName, NodeStore store) { + checkNotNull(store, "store"); + checkNotNull(mountName, "mountName"); + + Mount mount = checkNotNull(mip.getMountByName(mountName), "No mount with name %s found in %s", mountName, mip); + nonDefaultStores.add(new MountedNodeStore(mount, store)); + return this; + } + + public MultiplexingNodeStore build() { + checkReadWriteMountsNumber(); + checkMountsAreConsistentWithMounts(); + return new MultiplexingNodeStore(mip, globalStore, nonDefaultStores); + } + + private void checkReadWriteMountsNumber() { + List<String> readWriteMountNames = Lists.newArrayList(); + if (!mip.getDefaultMount().isReadOnly()) { + readWriteMountNames.add(mip.getDefaultMount().getName()); + } + for (Mount mount : mip.getNonDefaultMounts()) { + if (!mount.isReadOnly()) { + readWriteMountNames.add(mount.getName()); + } + } + checkArgument(readWriteMountNames.size() <= 1, + "Expected at most 1 write-enabled mount, but got %s: %s.", readWriteMountNames.size(), readWriteMountNames); + } + + private void checkMountsAreConsistentWithMounts() { + int buildMountCount = nonDefaultStores.size(); + int mipMountCount = mip.getNonDefaultMounts().size(); + checkArgument(buildMountCount == mipMountCount, + "Inconsistent mount configuration. Builder received %s mounts, but MountInfoProvider knows about %s.", + buildMountCount, mipMountCount); + } + } +} Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStoreService.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStoreService.java?rev=1767652&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStoreService.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/MultiplexingNodeStoreService.java Wed Nov 2 12:16:32 2016 @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.multiplex; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.ConfigurationPolicy; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.ReferencePolicy; +import org.apache.jackrabbit.oak.commons.PropertiesUtil; +import org.apache.jackrabbit.oak.spi.mount.Mount; +import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Dictionary; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Component(policy = ConfigurationPolicy.REQUIRE, + metatype = true, + label = "Apache Jackrabbit Oak Multiplexing NodeStore Service", + description = "NodeStore implementation proxying all the operations " + + "to other nodestores configured in OSGi" +) +public class MultiplexingNodeStoreService { + + private static final Logger LOG = LoggerFactory.getLogger(MultiplexingNodeStoreService.class); + + private static final String GLOBAL_ROLE = "multiplexing:global"; + + private static final String MOUNT_ROLE_PREFIX = "multiplexing:mount:"; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY, policy = ReferencePolicy.STATIC) + private MountInfoProvider mountInfoProvider; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_MULTIPLE, policy = ReferencePolicy.DYNAMIC, bind = "bindNodeStore", unbind = "unbindNodeStore", referenceInterface = NodeStoreProvider.class) + private List<NodeStoreWithProps> nodeStores = new ArrayList<>(); + + private ComponentContext context; + + private ServiceRegistration nsReg; + + @Activate + protected void activate(ComponentContext context) { + this.context = context; + registerMultiplexingNodeStore(); + } + + @Deactivate + protected void deactivate() { + unregisterMultiplexingNodeStore(); + } + + private void registerMultiplexingNodeStore() { + if (nsReg != null) { + return; // already registered + } + + NodeStoreWithProps globalNs = null; + Set<String> availableMounts = new HashSet<>(); + for (NodeStoreWithProps ns : nodeStores) { + if (isGlobalNodeStore(ns)) { + globalNs = ns; + } else { + availableMounts.add(getMountName(ns)); + } + } + + if (globalNs == null) { + LOG.info("Multiplexing node store registration is deferred until there's a global node store registered in OSGi"); + return; + } else { + LOG.info("Found global node store: {}", getDescription(globalNs)); + } + + for (Mount m : mountInfoProvider.getNonDefaultMounts()) { + if (!availableMounts.contains(m.getName())) { + LOG.info("Multiplexing node store registration is deferred until there's mount {} registered in OSGi", m.getName()); + return; + } + } + LOG.info("Node stores for all configured mounts are available"); + + MultiplexingNodeStore.Builder builder = new MultiplexingNodeStore.Builder(mountInfoProvider, globalNs.getNodeStoreProvider().getNodeStore()); + + for (NodeStoreWithProps ns : nodeStores) { + if (isGlobalNodeStore(ns)) { + continue; + } + String mountName = getMountName(ns); + if (mountName != null) { + builder.addMount(mountName, ns.getNodeStoreProvider().getNodeStore()); + LOG.info("Mounting {} as {}", getDescription(ns), mountName); + } + } + + Dictionary<String, Object> props = new Hashtable<String, Object>(); + props.put(Constants.SERVICE_PID, MultiplexingNodeStore.class.getName()); + props.put("oak.nodestore.description", new String[] { "nodeStoreType=multiplexing" } ); + + LOG.info("Registering the multiplexing node store"); + + nsReg = context.getBundleContext().registerService( + new String[]{ + NodeStore.class.getName() + }, + builder.build(), + props); + } + + private boolean isGlobalNodeStore(NodeStoreWithProps ns) { + return GLOBAL_ROLE.equals(ns.getRole()); + } + + private String getMountName(NodeStoreWithProps ns) { + String role = ns.getRole(); + if (!role.startsWith(MOUNT_ROLE_PREFIX)) { + return null; + } + return role.substring(MOUNT_ROLE_PREFIX.length()); + } + + private String getDescription(NodeStoreWithProps ns) { + return PropertiesUtil.toString(ns.getProps().get("oak.nodestore.description"), ns.getNodeStoreProvider().getClass().toString()); + } + + private void unregisterMultiplexingNodeStore() { + if (nsReg != null) { + LOG.info("Unregistering the multiplexing node store"); + nsReg.unregister(); + nsReg = null; + } + } + + protected void bindNodeStore(NodeStoreProvider ns, Map<String, ?> config) { + NodeStoreWithProps newNs = new NodeStoreWithProps(ns, config); + nodeStores.add(newNs); + + unregisterMultiplexingNodeStore(); + registerMultiplexingNodeStore(); + } + + protected void unbindNodeStore(NodeStoreProvider ns) { + Iterator<NodeStoreWithProps> it = nodeStores.iterator(); + while (it.hasNext()) { + if (it.next().getNodeStoreProvider() == ns) { + it.remove(); + } + } + + unregisterMultiplexingNodeStore(); + registerMultiplexingNodeStore(); + } + + private static class NodeStoreWithProps { + + private final NodeStoreProvider nodeStore; + + private final Map<String, ?> props; + + public NodeStoreWithProps(NodeStoreProvider nodeStore, Map<String, ?> props) { + this.nodeStore = nodeStore; + this.props = props; + } + + public NodeStoreProvider getNodeStoreProvider() { + return nodeStore; + } + + public Map<String, ?> getProps() { + return props; + } + + public String getRole() { + return PropertiesUtil.toString(props.get(NodeStoreProvider.ROLE), null); + } + } +} \ No newline at end of file Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/SimpleMountInfoProvider.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/SimpleMountInfoProvider.java?rev=1767652&r1=1767651&r2=1767652&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/SimpleMountInfoProvider.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/SimpleMountInfoProvider.java Wed Nov 2 12:16:32 2016 @@ -85,6 +85,17 @@ public class SimpleMountInfoProvider imp } @Override + public Collection<Mount> getMountsPlacedDirectlyUnder(String path) { + Collection<Mount> mounts = Lists.newArrayList(); + for ( Mount mount : this.mounts.values()) { + if ( mount.isDirectlyUnder(path) ) { + mounts.add(mount); + } + } + return mounts; + } + + @Override public Mount getDefaultMount() { return defMount; } Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/package-info.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/package-info.java?rev=1767652&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/package-info.java (added) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/multiplex/package-info.java Wed Nov 2 12:16:32 2016 @@ -0,0 +1,45 @@ +/** + * <h1>Multiplexing support</h1> + * + * <p>This package contains support classes for implementing a multiplexed persistence at the {@linkplain org.apache.jackrabbit.oak.spi.state.NodeStore} level.</p> + * + * <h2>Design goals</h2> + * <p> + * <ol> + * <li>Transparency of the multiplexing setup. Neither the NodeStores nor the code using a multiplexed + * NodeStore should be aware of the specific implementation being used.</li> + * <li>Persistence-agnosticity. The multiplexing support should be applicable to any conformant + * NodeStore implementation.</li> + * <li>Negligible performance impact. Multiplexing should not add a significat performance overhead.</li> + * </ol> + * </p> + * + * <h2>Implementation</h2> + * + * <p>The main entry point is the {@link org.apache.jackrabbit.oak.plugins.multiplex.MultiplexingNodeStore}, + * which wraps one or more NodeStore instances. Also of interest are the {@link org.apache.jackrabbit.oak.plugins.multiplex.MultiplexingNodeState} and {@link org.apache.jackrabbit.oak.plugins.multiplex.MultiplexingNodeBuilder}.</p> + * + * <p> These classes maintain internal mappings of the 'native' objects. For instance, if the + * multiplexing NodeStore holds two MemoryNodeStore instances, then a call to {@linkplain org.apache.jackrabbit.oak.spi.state.NodeStore#getRoot()} + * will return a multiplexing NodeState backed by two MemoryNodeState instances. Similarly, a call to {@linkplain org.apache.jackrabbit.oak.spi.state.NodeState#builder()} will return a multiplexing + * NodeBuilder backed by two MemoryNodeState instances.</p> + * + * <p>Using this approach allows us to always keep related NodeStore, NodeState and NodeBuilder + * instances isolated from other instances.</p> + * + * <h2>Open items</h2> + * + * <p>1. Brute-force support for oak:mount nodes.</p> + * + * <p>The {@link org.apache.jackrabbit.oak.spi.mount.Mount#getPathFragmentName()} method defines + * a name pattern that can be used by mounted stores to contribute to a patch which is not + * owned by them. For instance, a mount named <em>apps</em> which owns <tt>/libs,/apps</tt> + * can own another subtree anywhere in the repository given that a node named <tt>:oak-mount-apps</tt> + * is found.</p> + * + * <p>The current implementation naively queries all stores whenever the child node list is prepared. + * This is obviously correct but may be slow.</p> + * {@link org.apache.jackrabbit.oak.plugins.multiplex.MultiplexingContext#getContributingStores(java.lang.String, com.google.common.base.Function)}</p> + */ +package org.apache.jackrabbit.oak.plugins.multiplex; + Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mount.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mount.java?rev=1767652&r1=1767651&r2=1767652&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mount.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mount.java Wed Nov 2 12:16:32 2016 @@ -19,12 +19,15 @@ package org.apache.jackrabbit.oak.spi.mount; +import aQute.bnd.annotation.ProviderType; + /** * A ContentRepository represents one big tree. A Mount * refers to a set of paths in that tree which are possibly * stored in a separate physical persistent stores. In a * default setup all paths belong to a default Mount. */ +@ProviderType public interface Mount { /** * Name of the mount. If this <code>Mount</code> @@ -92,4 +95,6 @@ public interface Mount { * @return true if this Mount is rooted under given path */ boolean isUnder(String path); + + boolean isDirectlyUnder(String path); } Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/MountInfoProvider.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/MountInfoProvider.java?rev=1767652&r1=1767651&r2=1767652&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/MountInfoProvider.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/MountInfoProvider.java Wed Nov 2 12:16:32 2016 @@ -24,6 +24,9 @@ import java.util.Collection; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import aQute.bnd.annotation.ProviderType; + +@ProviderType public interface MountInfoProvider { /** @@ -59,13 +62,16 @@ public interface MountInfoProvider { /** * Returns all mounts placed under the specified path - * + * * @param path the path under which mounts are to be found * @return a collection of mounts, possibly empty */ @Nonnull Collection<Mount> getMountsPlacedUnder(String path); + @Nonnull + Collection<Mount> getMountsPlacedDirectlyUnder(String path); + /** * Returns the default mount */ Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mounts.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mounts.java?rev=1767652&r1=1767651&r2=1767652&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mounts.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/Mounts.java Wed Nov 2 12:16:32 2016 @@ -53,6 +53,10 @@ public final class Mounts { return Collections.emptySet(); } + public Collection<Mount> getMountsPlacedDirectlyUnder(String path) { + return Collections.emptySet(); + } + @Override public Mount getDefaultMount() { return DEFAULT_MOUNT; @@ -114,6 +118,16 @@ public final class Mounts { return false; } } + return true; + } + + @Override + public boolean isDirectlyUnder(String path) { + for (Mount m : mounts) { + if (m.isDirectlyUnder(path)) { + return false; + } + } return true; } } Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/package-info.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/package-info.java?rev=1767652&r1=1767651&r2=1767652&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/package-info.java (original) +++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/mount/package-info.java Wed Nov 2 12:16:32 2016 @@ -16,9 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -@Version("2.0.0") -@Export(optional = "provide:=true") +@Version("2.1.0") package org.apache.jackrabbit.oak.spi.mount; -import aQute.bnd.annotation.Export; import aQute.bnd.annotation.Version; \ No newline at end of file
