This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new 744ae44 added a repeat() optimization where if there are not start or end predicates, then certain data paths and functions can be ignored from the stream. 744ae44 is described below commit 744ae448774c3ef6f9a20e2312348294eac4d43f Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Sun Mar 17 06:56:53 2019 -0600 added a repeat() optimization where if there are not start or end predicates, then certain data paths and functions can be ignored from the stream. --- .../machine/function/branch/RepeatBranch.java | 18 ++++++ .../tinkerpop/machine/beam/util/TopologyUtil.java | 24 +++++--- .../apache/tinkerpop/machine/pipes/RepeatStep.java | 72 ++++++++++++---------- 3 files changed, 72 insertions(+), 42 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java b/java/core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java index b60104a..07ba398 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java @@ -41,6 +41,8 @@ public class RepeatBranch<C, S> extends AbstractFunction<C> implements BranchFun private Compilation<C, S, ?> emitCompilation; private int untilLocation = 0; private int emitLocation = 0; + private boolean hasStartPredicates = false; + private boolean hasEndPredicates = false; public RepeatBranch(final Coefficient<C> coefficient, final Set<String> labels, final List<Object> arguments) { super(coefficient, labels); @@ -51,9 +53,17 @@ public class RepeatBranch<C, S> extends AbstractFunction<C> implements BranchFun if ('e' == type) { this.emitCompilation = (Compilation<C, S, ?>) arguments.get(i + 1); this.emitLocation = location++; + if (this.emitLocation < 3) + this.hasStartPredicates = true; + else + this.hasEndPredicates = true; } else if ('u' == type) { this.untilCompilation = (Compilation<C, S, ?>) arguments.get(i + 1); this.untilLocation = location++; + if (this.untilLocation < 3) + this.hasStartPredicates = true; + else + this.hasEndPredicates = true; } else { this.repeatCompilation = (Compilation<C, S, S>) arguments.get(i + 1); location = 3; @@ -88,6 +98,14 @@ public class RepeatBranch<C, S> extends AbstractFunction<C> implements BranchFun return this.untilLocation; } + public boolean hasStartPredicates() { + return this.hasStartPredicates; + } + + public boolean hasEndPredicates() { + return this.hasEndPredicates; + } + public Map<Character, Compilation<C, S, S>> getCompilations() { return this.compilations; } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java index a0d3a01..1c555a1 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java @@ -86,19 +86,23 @@ public class TopologyUtil { final TupleTag<Traverser<C, S>> repeatLoop = new TupleTag<>(); sink = source; for (int i = 0; i < Beam.MAX_REPETIONS; i++) { - final RepeatStartFn<C, S> startFn = new RepeatStartFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 1); - PCollectionTuple outputs = (PCollectionTuple) sink.apply(ParDo.of(startFn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); - outputs.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); - repeatSinks.add(outputs.get(repeatDone)); - sink = outputs.get(repeatLoop); + if (repeatFunction.hasStartPredicates()) { + final RepeatStartFn<C, S> startFn = new RepeatStartFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 1); + final PCollectionTuple outputs = (PCollectionTuple) sink.apply(ParDo.of(startFn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); + outputs.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); + repeatSinks.add(outputs.get(repeatDone)); + sink = outputs.get(repeatLoop); + } for (final CFunction<C> ff : repeatFunction.getRepeat().getFunctions()) { sink = TopologyUtil.extend(sink, ff, traverserFactory); } - final RepeatEndFn<C, S> endFn = new RepeatEndFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 1); - outputs = (PCollectionTuple) sink.apply(ParDo.of(endFn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); - outputs.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); - repeatSinks.add(outputs.get(repeatDone)); - sink = outputs.get(repeatLoop); + if (repeatFunction.hasEndPredicates()) { + final RepeatEndFn<C, S> endFn = new RepeatEndFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 1); + final PCollectionTuple outputs = (PCollectionTuple) sink.apply(ParDo.of(endFn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); + outputs.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); + repeatSinks.add(outputs.get(repeatDone)); + sink = outputs.get(repeatLoop); + } } sink = PCollectionList.of(repeatSinks).apply(Flatten.pCollections()); } else if (function instanceof BranchFunction) { diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java index b67d0b8..215637c 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java @@ -35,6 +35,8 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> { private final Compilation<C, S, S> repeat; private TraverserSet<C, S> outputTraversers = new TraverserSet<>(); private TraverserSet<C, S> inputTraversers = new TraverserSet<>(); + private final boolean hasStartPredicates; + private final boolean hasEndPredicates; RepeatStep(final Step<C, ?, S> previousStep, final RepeatBranch<C, S> repeatFunction) { super(previousStep, repeatFunction); @@ -43,6 +45,8 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> { this.repeat = repeatFunction.getRepeat(); this.untilLocation = repeatFunction.getUntilLocation(); this.emitLocation = repeatFunction.getEmitLocation(); + this.hasStartPredicates = repeatFunction.hasStartPredicates(); + this.hasEndPredicates = repeatFunction.hasEndPredicates(); } @Override @@ -58,24 +62,26 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> { } private final void stageInput() { - final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove(); - if (1 == this.untilLocation) { - if (this.untilCompilation.filterTraverser(traverser.clone())) { - this.outputTraversers.add(traverser); - } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser.clone())) { - this.outputTraversers.add(traverser); - this.repeat.addTraverser(traverser); - } else - this.repeat.addTraverser(traverser); - } else if (1 == this.emitLocation) { - if (this.emitCompilation.filterTraverser(traverser.clone())) - this.outputTraversers.add(traverser); - if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser.clone())) - this.outputTraversers.add(traverser); - else - this.repeat.addTraverser(traverser); + if (this.hasStartPredicates) { + final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove(); + if (1 == this.untilLocation) { + if (this.untilCompilation.filterTraverser(traverser.clone())) { + this.outputTraversers.add(traverser); + } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser.clone())) { + this.outputTraversers.add(traverser); + this.repeat.addTraverser(traverser); + } else + this.repeat.addTraverser(traverser); + } else if (1 == this.emitLocation) { + if (this.emitCompilation.filterTraverser(traverser.clone())) + this.outputTraversers.add(traverser); + if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser.clone())) + this.outputTraversers.add(traverser); + else + this.repeat.addTraverser(traverser); + } } else { - this.repeat.addTraverser(traverser); + this.repeat.addTraverser(this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove()); } } @@ -84,21 +90,23 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> { this.stageInput(); if (this.repeat.getProcessor().hasNext()) { final Traverser<C, S> traverser = this.repeat.getProcessor().next(); - if (3 == this.untilLocation) { - if (this.untilCompilation.filterTraverser(traverser.clone())) { - this.outputTraversers.add(traverser); - } else if (4 == this.emitLocation && this.emitCompilation.filterTraverser(traverser.clone())) { - this.outputTraversers.add(traverser); - this.inputTraversers.add(traverser); - } else - this.inputTraversers.add(traverser); - } else if (3 == this.emitLocation) { - if (this.emitCompilation.filterTraverser(traverser.clone())) - this.outputTraversers.add(traverser); - if (4 == this.untilLocation && this.untilCompilation.filterTraverser(traverser.clone())) - this.outputTraversers.add(traverser); - else - this.inputTraversers.add(traverser); + if (this.hasEndPredicates) { + if (3 == this.untilLocation) { + if (this.untilCompilation.filterTraverser(traverser.clone())) { + this.outputTraversers.add(traverser); + } else if (4 == this.emitLocation && this.emitCompilation.filterTraverser(traverser.clone())) { + this.outputTraversers.add(traverser); + this.inputTraversers.add(traverser); + } else + this.inputTraversers.add(traverser); + } else if (3 == this.emitLocation) { + if (this.emitCompilation.filterTraverser(traverser.clone())) + this.outputTraversers.add(traverser); + if (4 == this.untilLocation && this.untilCompilation.filterTraverser(traverser.clone())) + this.outputTraversers.add(traverser); + else + this.inputTraversers.add(traverser); + } } else { this.inputTraversers.add(traverser); }