realize that we can generalized SingleIterationStrategy to 1) work with GraphActors and 2) work at every walk-step. Thus, given that GraphActors isn't about Iterations and we can make this work for multi-stages, I renamed the strategy MessagePassingReductionStrategy. This way we won't have to name change in the future. Also, I did some more code cleanups and added a bunch more test cases. This is perhaps the most tested strategy.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/c2a42e27 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/c2a42e27 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/c2a42e27 Branch: refs/heads/TINKERPOP-1612 Commit: c2a42e27f901b95c66e31562940a176b9f932692 Parents: 1819e05 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Fri Jan 27 11:30:15 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Fri Jan 27 14:24:35 2017 -0700 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../MessagePassingReductionStrategy.java | 162 +++++++++++++++++++ .../optimization/SingleIterationStrategy.java | 151 ----------------- .../process/traversal/TraversalStrategies.java | 4 +- .../MessagePassingReductionStrategyTest.java | 135 ++++++++++++++++ .../SingleIterationStrategyTest.java | 128 --------------- .../SparkSingleIterationStrategy.java | 42 +---- .../SparkSingleIterationStrategyTest.java | 17 +- 8 files changed, 314 insertions(+), 326 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 860d401..d523ae2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET) * Fixed a bug where `PathProcessor.keepLabels` were not being pushed down into child traversals by `PathRetractionStrategy`. * Added `SingleIterationStrategy` as a default `GraphComputer` strategy that can re-write some traversals to not require message passing. +* Added default `MessagePassingReductionStrategy` for `GraphComputer` that can reduce the number of message passing iterations. * Fixed a bug associated with user-provided maps and `GroupSideEffectStep`. * `GroupBiOperator` no longer maintains a detached traversal and thus, no more side-effect related OLAP inconsistencies. * Added `ProjectedTraverser` which wraps a traverser with a `List<Object>` of projected data. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java new file mode 100644 index 0000000..cff152e --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategy.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization; + +import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; +import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder; +import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; +import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.IdStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.IdentityStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class MessagePassingReductionStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy { + + private static final MessagePassingReductionStrategy INSTANCE = new MessagePassingReductionStrategy(); + + private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList( + IncidentToAdjacentStrategy.class, + AdjacentToIncidentStrategy.class, + FilterRankingStrategy.class, + InlineFilterStrategy.class)); + + private MessagePassingReductionStrategy() { + } + + @Override + public void apply(final Traversal.Admin<?, ?> traversal) { + // only process the first traversal step in an OLAP chain + TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> { + final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined + final Traversal.Admin<?, ?> compiledComputerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone(); + if (!compiledComputerTraversal.isLocked()) + compiledComputerTraversal.applyStrategies(); + if (!TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(LocalStep.class, LambdaHolder.class), compiledComputerTraversal) && // don't do anything with lambdas or locals as this leads to unknown adjacencies + !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH) && // when dynamic detachment is provided in 3.3.0, remove this (TODO) + !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) && // when dynamic detachment is provided in 3.3.0, remove this (TODO) + !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, compiledComputerTraversal). // this is a strict precaution that could be loosed with deeper logic on barriers in global children + stream(). + filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) { + final Traversal.Admin<?, ?> computerTraversal = step.computerTraversal.get().clone(); + // apply the strategies up to this point + final List<TraversalStrategy<?>> strategies = step.getTraversal().getStrategies().toList(); + final int indexOfStrategy = strategies.indexOf(MessagePassingReductionStrategy.instance()); + if (indexOfStrategy > 0) + TraversalHelper.applySingleLevelStrategies(step.getTraversal(), computerTraversal, strategies.get(indexOfStrategy - 1).getClass()); + if (computerTraversal.getSteps().size() > 1 && + !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) && + TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(VertexStep.class, EdgeVertexStep.class), computerTraversal) && + TraversalHelper.isLocalStarGraph(computerTraversal)) { + final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, computerTraversal).orElse(null); + if (MessagePassingReductionStrategy.insertElementId(barrier)) // out().count() -> out().id().count() + TraversalHelper.insertBeforeStep(new IdStep(computerTraversal), barrier, computerTraversal); + if (!(MessagePassingReductionStrategy.endsWithElement(null == barrier ? computerTraversal.getEndStep() : barrier))) { + Traversal.Admin newChildTraversal = new DefaultGraphTraversal<>(); + TraversalHelper.removeToTraversal(computerTraversal.getStartStep() instanceof GraphStep ? + computerTraversal.getStartStep().getNextStep() : + (Step) computerTraversal.getStartStep(), null == barrier ? + EmptyStep.instance() : + barrier, newChildTraversal); + newChildTraversal = newChildTraversal.getSteps().size() > 1 ? (Traversal.Admin) __.local(newChildTraversal) : newChildTraversal; + if (null == barrier) + TraversalHelper.insertTraversal(0, newChildTraversal, computerTraversal); + else + TraversalHelper.insertTraversal(barrier.getPreviousStep(), newChildTraversal, computerTraversal); + } + } + step.setComputerTraversal(computerTraversal); + } + }); + } + + private static boolean insertElementId(final Step<?, ?> barrier) { + if (!(barrier instanceof Barrier)) + return false; + else if (!endsWithElement(barrier.getPreviousStep())) + return false; + else if (barrier instanceof CountGlobalStep) + return true; + else if (barrier instanceof DedupGlobalStep && + ((DedupGlobalStep) barrier).getScopeKeys().isEmpty() && + ((DedupGlobalStep) barrier).getLocalChildren().isEmpty() && + barrier.getNextStep() instanceof CountGlobalStep) + return true; + else + return false; + } + + public static final boolean endsWithElement(Step<?, ?> currentStep) { + while (!(currentStep instanceof EmptyStep)) { + if (currentStep instanceof VertexStep) // only inE, in, and out send messages + return (((VertexStep) currentStep).returnsVertex() || !((VertexStep) currentStep).getDirection().equals(Direction.OUT)); + else if (currentStep instanceof EdgeVertexStep) // TODO: add GraphStep but only if its mid-traversal V()/E() + return true; + else if (currentStep instanceof TraversalFlatMapStep || currentStep instanceof TraversalMapStep || currentStep instanceof LocalStep) + return endsWithElement(((TraversalParent) currentStep).getLocalChildren().get(0).getEndStep()); + else if (!(currentStep instanceof FilterStep || currentStep instanceof SideEffectStep || currentStep instanceof IdentityStep || currentStep instanceof Barrier)) + return false; + currentStep = currentStep.getPreviousStep(); + } + return false; + } + + @Override + public Set<Class<? extends OptimizationStrategy>> applyPrior() { + return PRIORS; + } + + public static MessagePassingReductionStrategy instance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java deleted file mode 100644 index 6b509ef..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategy.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization; - -import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; -import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory; -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder; -import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; -import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.IdStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.IncidentToAdjacentStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class SingleIterationStrategy extends AbstractTraversalStrategy<TraversalStrategy.OptimizationStrategy> implements TraversalStrategy.OptimizationStrategy { - - private static final SingleIterationStrategy INSTANCE = new SingleIterationStrategy(); - - private static final Set<Class<? extends OptimizationStrategy>> PRIORS = new HashSet<>(Arrays.asList( - IncidentToAdjacentStrategy.class, - AdjacentToIncidentStrategy.class, - FilterRankingStrategy.class, - InlineFilterStrategy.class)); - - private SingleIterationStrategy() { - } - - @Override - public void apply(final Traversal.Admin<?, ?> traversal) { - // only process the first traversal step in an OLAP chain - TraversalHelper.getFirstStepOfAssignableClass(TraversalVertexProgramStep.class, traversal).ifPresent(step -> { - final Graph graph = traversal.getGraph().orElse(EmptyGraph.instance()); // best guess at what the graph will be as its dynamically determined - final Traversal.Admin<?, ?> compiledComputerTraversal = step.generateProgram(graph, EmptyMemory.instance()).getTraversal().get().clone(); - if (!compiledComputerTraversal.isLocked()) - compiledComputerTraversal.applyStrategies(); - if (!TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(LocalStep.class, LambdaHolder.class), compiledComputerTraversal) && - !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH) && // when dynamic detachment is provided in 3.3.0, remove this (TODO) - !compiledComputerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) && // when dynamic detachment is provided in 3.3.0, remove this (TODO) - !(TraversalHelper.getStepsOfAssignableClass(TraversalParent.class, compiledComputerTraversal). // this is a strict precaution that could be loosed with deeper logic on barriers in global children - stream(). - filter(parent -> !parent.getGlobalChildren().isEmpty()).findAny().isPresent())) { - final Traversal.Admin<?, ?> computerTraversal = step.computerTraversal.getPure(); - if (computerTraversal.getSteps().size() > 1 && - !(computerTraversal.getStartStep().getNextStep() instanceof Barrier) && - TraversalHelper.hasStepOfAssignableClassRecursively(Arrays.asList(VertexStep.class, EdgeVertexStep.class), computerTraversal) && - TraversalHelper.isLocalStarGraph(computerTraversal)) { - final Traversal.Admin newChildTraversal = new DefaultGraphTraversal<>(); - final Step barrier = (Step) TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, computerTraversal).orElse(null); - if (SingleIterationStrategy.insertElementId(barrier)) // out().count() -> out().id().count() - TraversalHelper.insertBeforeStep(new IdStep(computerTraversal), barrier, computerTraversal); - if (!(SingleIterationStrategy.endsWithElement(null == barrier ? computerTraversal.getEndStep() : barrier.getPreviousStep()))) { - TraversalHelper.removeToTraversal(computerTraversal.getStartStep() instanceof GraphStep ? - computerTraversal.getStartStep().getNextStep() : - (Step) computerTraversal.getStartStep(), null == barrier ? - EmptyStep.instance() : - barrier, newChildTraversal); - if (null == barrier) - TraversalHelper.insertTraversal(0, (Traversal.Admin) __.local(newChildTraversal), computerTraversal); - else - TraversalHelper.insertTraversal(barrier.getPreviousStep(), (Traversal.Admin) __.local(newChildTraversal), computerTraversal); - } - } - step.setComputerTraversal(computerTraversal); - } - }); - } - - private static boolean insertElementId(final Step<?, ?> barrier) { - if (!(barrier instanceof Barrier)) - return false; - else if (!endsWithElement(barrier.getPreviousStep())) - return false; - else if (barrier instanceof CountGlobalStep) - return true; - else if (barrier instanceof DedupGlobalStep && - ((DedupGlobalStep) barrier).getScopeKeys().isEmpty() && - ((DedupGlobalStep) barrier).getLocalChildren().isEmpty() && - barrier.getNextStep() instanceof CountGlobalStep) - return true; - else - return false; - } - - private static boolean endsWithElement(Step<?, ?> currentStep) { - while (!(currentStep instanceof EmptyStep)) { - if (currentStep instanceof VertexStep || currentStep instanceof EdgeVertexStep) // TODO: add GraphStep but only if its mid-traversal V()/E() - return true; - else if (currentStep instanceof TraversalFlatMapStep || currentStep instanceof TraversalMapStep) - return endsWithElement(((TraversalParent) currentStep).getLocalChildren().get(0).getEndStep()); - else if (!(currentStep instanceof FilterStep || currentStep instanceof SideEffectStep)) - return false; - currentStep = currentStep.getPreviousStep(); - } - return false; - } - - @Override - public Set<Class<? extends OptimizationStrategy>> applyPrior() { - return PRIORS; - } - - public static SingleIterationStrategy instance() { - return INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java index a1cb99c..7f96850 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java @@ -20,7 +20,7 @@ package org.apache.tinkerpop.gremlin.process.traversal; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy; -import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy; @@ -223,7 +223,7 @@ public interface TraversalStrategies extends Serializable, Cloneable { final TraversalStrategies graphComputerStrategies = new DefaultTraversalStrategies(); graphComputerStrategies.addStrategies( GraphFilterStrategy.instance(), - SingleIterationStrategy.instance(), + MessagePassingReductionStrategy.instance(), OrderLimitStrategy.instance(), PathProcessorStrategy.instance(), ComputerVerificationStrategy.instance()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java new file mode 100644 index 0000000..1171dcd --- /dev/null +++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/MessagePassingReductionStrategyTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization; + +import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies; +import org.apache.tinkerpop.gremlin.structure.Column; +import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; + +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.bothE; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE; +import static org.apache.tinkerpop.gremlin.structure.Column.keys; +import static org.junit.Assert.assertEquals; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(Parameterized.class) +public class MessagePassingReductionStrategyTest { + + @Parameterized.Parameter(value = 0) + public Traversal original; + + @Parameterized.Parameter(value = 1) + public Traversal optimized; + + @Parameterized.Parameter(value = 2) + public Collection<TraversalStrategy> otherStrategies; + + @Test + public void doTest() { + final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>(); + final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin()); + rootTraversal.addStep(parent.asStep()); + parent.setComputerTraversal(this.original.asAdmin()); + final TraversalStrategies strategies = new DefaultTraversalStrategies(); + strategies.addStrategies(MessagePassingReductionStrategy.instance()); + for (final TraversalStrategy strategy : this.otherStrategies) { + strategies.addStrategies(strategy); + } + rootTraversal.setStrategies(strategies); + rootTraversal.asAdmin().applyStrategies(); + assertEquals(this.optimized, parent.computerTraversal.get()); + } + + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> generateTestParameters() { + final Function<Traverser<Vertex>, Vertex> mapFunction = t -> t.get().vertices(Direction.OUT).next(); + return Arrays.asList(new Object[][]{ + {__.V().out().count(), __.V().outE().count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())}, + {__.V().in().count(), __.V().local(__.inE().id()).count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())}, + {__.V().id(), __.V().id(), Collections.emptyList()}, + {__.V().local(out()), __.V().local(out()), Collections.emptyList()}, + {__.V().local(in()), __.V().local(in()), Collections.emptyList()}, + {__.V().local(out()).count(), __.V().local(out()).count(), Collections.emptyList()}, + {__.V().local(in()).count(), __.V().local(in()).count(), Collections.emptyList()}, + {__.V().out().count(), __.V().local(out().id()).count(), Collections.emptyList()}, + {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()}, + {__.V().in().id(), __.V().local(in().id()), Collections.emptyList()}, + {in().id(), __.local(in().id()), Collections.emptyList()}, // test inject + {__.V().out().id(), __.V().local(out().id()), Collections.emptyList()}, + {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()}, + {__.V().outE().inV().id().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()}, + {__.V().map(outE().inV()).count(), __.V().local(__.map(outE().inV()).id()).count(), Collections.emptyList()}, + {__.V().out().map(outE().inV()).count(), __.V().out().map(outE().inV()).count(), Collections.emptyList()}, + {__.V().outE().map(__.inV()).id().count(), __.V().local(__.outE().map(__.inV()).id()).count(), Collections.emptyList()}, + {__.V().outE().map(__.inV()).count(), __.V().local(outE().map(__.inV()).id()).count(), Collections.emptyList()}, + {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()}, + {__.V().outE().inV().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()}, + {__.V().out().id().count(), __.V().local(out().id()).count(), Collections.emptyList()}, + {__.V().in().id().count(), __.V().local(in().id()).count(), Collections.emptyList()}, + {__.V().in().id().select("id-map").dedup().count(), __.V().local(in().id().select("id-map")).dedup().count(), Collections.emptyList()}, + {__.V().in().id().groupCount().select(keys).unfold().dedup().count(), __.V().local(in().id()).groupCount().select(keys).unfold().dedup().count(), Collections.emptyList()}, + {__.V().outE().hasLabel("knows").values("weight"), __.V().local(outE().hasLabel("knows").values("weight")), Collections.emptyList()}, + {__.V().outE().values("weight").sum(), __.V().local(outE().values("weight")).sum(), Collections.emptyList()}, + {__.V().inE().values("weight"), __.V().local(inE().values("weight")), Collections.emptyList()}, + {__.V().inE().values("weight").sum(), __.V().local(inE().values("weight")).sum(), Collections.emptyList()}, + {__.V().inE().values("weight").sum().dedup().count(), __.V().local(inE().values("weight")).sum().dedup().count(), Collections.emptyList()}, + {__.V().as("a").out("knows").as("b").select("a", "b"), __.V().as("a").out("knows").as("b").select("a", "b"), Collections.emptyList()}, + {__.V().out().groupCount("x").cap("x"), __.V().out().groupCount("x").cap("x"), Collections.emptyList()}, + {__.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), __.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), Collections.emptyList()}, + {__.V().outE().inV().groupCount(), __.V().outE().inV().groupCount(), Collections.emptyList()}, + {__.V().outE().inV().groupCount().by("name"), __.V().outE().inV().groupCount().by("name"), Collections.emptyList()}, + {__.V().inE().id().groupCount(), __.V().local(inE().id()).groupCount(), Collections.emptyList()}, + {__.V().inE().values("weight").groupCount(), __.V().local(inE().values("weight")).groupCount(), Collections.emptyList()}, + {__.V().outE().inV().tree(), __.V().outE().inV().tree(), Collections.emptyList()}, + {__.V().in().values("name").groupCount(), __.V().in().values("name").groupCount(), Collections.emptyList()}, + {__.V().outE().inV().groupCount("x"), __.V().outE().inV().groupCount("x"), Collections.emptyList()}, + {__.V().in().dedup().count(), __.V().local(in().id()).dedup().count(), Collections.emptyList()}, + {__.V().bothE().dedup().count(), __.V().local(bothE().id()).dedup().count(), Collections.emptyList()}, + {__.V().bothE().dedup().by("name").count(), __.V().bothE().dedup().by("name").count(), Collections.emptyList()}, + {__.V().map(mapFunction).inV().count(), __.V().map(mapFunction).inV().count(), Collections.emptyList()}, + {__.V().groupCount().by(__.out().count()), __.V().groupCount().by(__.out().count()), Collections.emptyList()}, + {__.V().identity().out().identity().count(), __.V().local(__.identity().out().identity().id()).count(), Collections.emptyList()}, + {__.V().identity().out().identity().dedup().count(), __.V().local(__.identity().out().identity().id()).dedup().count(), Collections.emptyList()}, + }); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java deleted file mode 100644 index 081a541..0000000 --- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/optimization/SingleIterationStrategyTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization; - -import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.DefaultGraphTraversal; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.AdjacentToIncidentStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalStrategies; -import org.apache.tinkerpop.gremlin.structure.Column; -import org.apache.tinkerpop.gremlin.structure.Direction; -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.function.Function; - -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.bothE; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE; -import static org.apache.tinkerpop.gremlin.structure.Column.keys; -import static org.junit.Assert.assertEquals; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -@RunWith(Parameterized.class) -public class SingleIterationStrategyTest { - - @Parameterized.Parameter(value = 0) - public Traversal original; - - @Parameterized.Parameter(value = 1) - public Traversal optimized; - - @Parameterized.Parameter(value = 2) - public Collection<TraversalStrategy> otherStrategies; - - @Test - public void doTest() { - final Traversal.Admin<?, ?> rootTraversal = new DefaultGraphTraversal<>(); - final TraversalVertexProgramStep parent = new TraversalVertexProgramStep(rootTraversal, this.original.asAdmin()); - rootTraversal.addStep(parent.asStep()); - parent.setComputerTraversal(this.original.asAdmin()); - final TraversalStrategies strategies = new DefaultTraversalStrategies(); - strategies.addStrategies(SingleIterationStrategy.instance()); - for (final TraversalStrategy strategy : this.otherStrategies) { - strategies.addStrategies(strategy); - } - rootTraversal.setStrategies(strategies); - rootTraversal.asAdmin().applyStrategies(); - assertEquals(this.optimized, parent.computerTraversal.get()); - } - - - @Parameterized.Parameters(name = "{0}") - public static Iterable<Object[]> generateTestParameters() { - final Function<Traverser<Vertex>,Vertex> mapFunction = t -> t.get().vertices(Direction.OUT).next(); - return Arrays.asList(new Object[][]{ - {__.V().out().count(), __.V().local(out().id()).count(), Collections.singletonList(AdjacentToIncidentStrategy.instance())}, // TODO - {__.V().id(), __.V().id(), Collections.emptyList()}, - {__.V().out().count(), __.V().local(out().id()).count(), Collections.emptyList()}, - {__.V().out().label().count(), __.V().out().label().count(), Collections.emptyList()}, - {__.V().in().id(), __.V().local(in().id()), Collections.emptyList()}, - {in().id(), __.local(in().id()), Collections.emptyList()}, // test inject - {__.V().out().id(), __.V().local(out().id()), Collections.emptyList()}, - {__.V().both().id(), __.V().local(__.both().id()), Collections.emptyList()}, - {__.V().outE().inV().id().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()}, - {__.V().map(outE().inV()).count(), __.V().local(__.map(outE().inV()).id()).count(), Collections.emptyList()}, - {__.V().out().map(outE().inV()).count(), __.V().out().map(outE().inV()).count(), Collections.emptyList()}, - {__.V().outE().map(__.inV()).id().count(), __.V().local(__.outE().map(__.inV()).id()).count(), Collections.emptyList()}, - {__.V().outE().map(__.inV()).count(), __.V().local(outE().map(__.inV()).id()).count(), Collections.emptyList()}, - {__.V().outE().map(__.inV()).values("name").count(), __.V().outE().map(__.inV()).values("name").count(), Collections.emptyList()}, - {__.V().outE().inV().count(), __.V().local(outE().inV().id()).count(), Collections.emptyList()}, - {__.V().out().id().count(), __.V().local(out().id()).count(), Collections.emptyList()}, - {__.V().in().id().count(), __.V().local(in().id()).count(), Collections.emptyList()}, - {__.V().in().id().select("id-map").dedup().count(), __.V().local(in().id().select("id-map")).dedup().count(), Collections.emptyList()}, - {__.V().in().id().groupCount().select(keys).unfold().dedup().count(), __.V().local(in().id()).groupCount().select(keys).unfold().dedup().count(), Collections.emptyList()}, - {__.V().outE().hasLabel("knows").values("weight"), __.V().local(outE().hasLabel("knows").values("weight")), Collections.emptyList()}, - {__.V().outE().values("weight").sum(), __.V().local(outE().values("weight")).sum(), Collections.emptyList()}, - {__.V().inE().values("weight"), __.V().local(inE().values("weight")), Collections.emptyList()}, - {__.V().inE().values("weight").sum(), __.V().local(inE().values("weight")).sum(), Collections.emptyList()}, - {__.V().inE().values("weight").sum().dedup().count(), __.V().local(inE().values("weight")).sum().dedup().count(), Collections.emptyList()}, - {__.V().as("a").out("knows").as("b").select("a", "b"), __.V().as("a").out("knows").as("b").select("a", "b"), Collections.emptyList()}, - {__.V().out().groupCount("x").cap("x"), __.V().out().groupCount("x").cap("x"), Collections.emptyList()}, - {__.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), __.V().outE().inV().groupCount().select(Column.values).unfold().dedup().count(), Collections.emptyList()}, - {__.V().outE().inV().groupCount(), __.V().outE().inV().groupCount(), Collections.emptyList()}, - {__.V().outE().inV().groupCount().by("name"), __.V().outE().inV().groupCount().by("name"), Collections.emptyList()}, - {__.V().inE().id().groupCount(), __.V().local(inE().id()).groupCount(), Collections.emptyList()}, - {__.V().inE().values("weight").groupCount(), __.V().local(inE().values("weight")).groupCount(), Collections.emptyList()}, - {__.V().outE().inV().tree(), __.V().outE().inV().tree(), Collections.emptyList()}, - {__.V().in().values("name").groupCount(), __.V().in().values("name").groupCount(), Collections.emptyList()}, - {__.V().outE().inV().groupCount("x"), __.V().outE().inV().groupCount("x"), Collections.emptyList()}, - {__.V().in().dedup().count(), __.V().local(in().id()).dedup().count(), Collections.emptyList()}, - {__.V().bothE().dedup().count(), __.V().local(bothE().id()).dedup().count(), Collections.emptyList()}, - {__.V().bothE().dedup().by("name").count(), __.V().bothE().dedup().by("name").count(), Collections.emptyList()}, - {__.V().map(mapFunction).inV().count(), __.V().map(mapFunction).inV().count(), Collections.emptyList()}, - {__.V().groupCount().by(__.out().count()),__.V().groupCount().by(__.out().count()),Collections.emptyList()} - }); - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java index 1288b0d..9e85df6 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategy.java @@ -21,23 +21,15 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.o import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy; import org.apache.tinkerpop.gremlin.process.computer.util.EmptyMemory; import org.apache.tinkerpop.gremlin.process.traversal.Scope; -import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; -import org.apache.tinkerpop.gremlin.process.traversal.step.branch.LocalStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.filter.FilterStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaFlatMapStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.LambdaMapStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; @@ -83,9 +75,9 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg } } if (!doesMessagePass && - !SparkSingleIterationStrategy.endsWithInElement(computerTraversal) && - !(computerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) || // todo: remove this when dynamic detachment is available in 3.3.0 - computerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH))) { // todo: remove this when dynamic detachment is available in 3.3.0 + !MessagePassingReductionStrategy.endsWithElement(computerTraversal.getEndStep()) && + !(computerTraversal.getTraverserRequirements().contains(TraverserRequirement.LABELED_PATH) || // TODO: remove this when dynamic detachment is available in 3.3.0 + computerTraversal.getTraverserRequirements().contains(TraverserRequirement.PATH))) { // TODO: remove this when dynamic detachment is available in 3.3.0 step.setComputer(step.getComputer() // if no message passing, don't partition the loaded graph .configure(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, true) @@ -95,34 +87,8 @@ public final class SparkSingleIterationStrategy extends AbstractTraversalStrateg } } - private static final boolean endsWithInElement(final Traversal.Admin<?, ?> traversal) { - Step<?, ?> current = traversal.getEndStep(); - while (current instanceof Barrier) { // clip off any tail barriers - current = current.getPreviousStep(); - } - while (!(current instanceof EmptyStep)) { - if ((current instanceof VertexStep && (((VertexStep) current).returnsVertex() || - !((VertexStep) current).getDirection().equals(Direction.OUT))) || - current instanceof EdgeVertexStep) { - return true; - } else if (current instanceof TraversalMapStep || current instanceof TraversalFlatMapStep || current instanceof LocalStep) { - if (endsWithInElement(((TraversalParent) current).getLocalChildren().get(0))) - return true; - } else if (current instanceof TraversalParent) { - if (((TraversalParent) current).getGlobalChildren().stream().filter(SparkSingleIterationStrategy::endsWithInElement).findAny().isPresent()) - return true; - } - if (!(current instanceof FilterStep || current instanceof SideEffectStep)) { // side-effects and filters do not alter the mapping and thus, deeper analysis is required - return false; - } - current = current.getPreviousStep(); - } - return false; - } - public static SparkSingleIterationStrategy instance() { return INSTANCE; } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c2a42e27/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java index 4297649..8f97576 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkSingleIterationStrategyTest.java @@ -24,7 +24,7 @@ import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; -import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.SingleIterationStrategy; +import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.MessagePassingReductionStrategy; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; @@ -67,11 +67,11 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest { /////////// WITHOUT SINGLE-ITERATION STRATEGY LESS SINGLE-PASS OPTIONS ARE AVAILABLE Graph graph = GraphFactory.open(configuration); - GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, SingleIterationStrategy.class); + GraphTraversalSource g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class, MessagePassingReductionStrategy.class); assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance())); assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent()); - assertFalse(g.getStrategies().toList().contains(SingleIterationStrategy.instance())); - assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent()); + assertFalse(g.getStrategies().toList().contains(MessagePassingReductionStrategy.instance())); + assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof MessagePassingReductionStrategy).findAny().isPresent()); assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance())); assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent()); @@ -95,11 +95,11 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest { /////////// WITH SINGLE-ITERATION STRATEGY MORE SINGLE-PASS OPTIONS ARE AVAILABLE graph = GraphFactory.open(configuration); - g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(SingleIterationStrategy.instance()); + g = graph.traversal().withComputer().withoutStrategies(SparkInterceptorStrategy.class).withStrategies(MessagePassingReductionStrategy.instance()); assertFalse(g.getStrategies().toList().contains(SparkInterceptorStrategy.instance())); assertFalse(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkInterceptorStrategy).findAny().isPresent()); - assertTrue(g.getStrategies().toList().contains(SingleIterationStrategy.instance())); - assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SingleIterationStrategy).findAny().isPresent()); + assertTrue(g.getStrategies().toList().contains(MessagePassingReductionStrategy.instance())); + assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof MessagePassingReductionStrategy).findAny().isPresent()); assertTrue(g.getStrategies().toList().contains(SparkSingleIterationStrategy.instance())); assertTrue(g.V().count().explain().getStrategyTraversals().stream().filter(pair -> pair.getValue0() instanceof SparkSingleIterationStrategy).findAny().isPresent()); @@ -114,6 +114,8 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest { test(true, g.V().bothE().values("weight").limit(2)); test(true, 6L, g.V().count()); test(true, 6L, g.V().id().count()); + test(true, 6L, g.V().identity().outE().identity().count()); + test(true, 6L, g.V().identity().outE().has("weight").count()); test(true, 6L, g.V().out().count()); test(true, 6L, g.V().outE().inV().count()); test(true, 6L, g.V().outE().inV().id().count()); @@ -167,6 +169,7 @@ public class SparkSingleIterationStrategyTest extends AbstractSparkTest { test(false, g.V().out().groupCount("x").cap("x")); test(false, 6L, g.V().both().groupCount("x").cap("x").select(keys).unfold().count()); test(false, g.V().outE().inV().groupCount()); + test(false, g.V().outE().unfold().inV().groupCount()); test(false, g.V().outE().inV().groupCount().by("name")); test(false, g.V().outE().inV().tree()); test(false, g.V().outE().inV().id().tree());