This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 744ae44  added a repeat() optimization where if there are not start or 
end predicates, then certain data paths and functions can be ignored from the 
stream.
744ae44 is described below

commit 744ae448774c3ef6f9a20e2312348294eac4d43f
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Sun Mar 17 06:56:53 2019 -0600

    added a repeat() optimization where if there are not start or end 
predicates, then certain data paths and functions can be ignored from the 
stream.
---
 .../machine/function/branch/RepeatBranch.java      | 18 ++++++
 .../tinkerpop/machine/beam/util/TopologyUtil.java  | 24 +++++---
 .../apache/tinkerpop/machine/pipes/RepeatStep.java | 72 ++++++++++++----------
 3 files changed, 72 insertions(+), 42 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
index b60104a..07ba398 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
@@ -41,6 +41,8 @@ public class RepeatBranch<C, S> extends AbstractFunction<C> 
implements BranchFun
     private Compilation<C, S, ?> emitCompilation;
     private int untilLocation = 0;
     private int emitLocation = 0;
+    private boolean hasStartPredicates = false;
+    private boolean hasEndPredicates = false;
 
     public RepeatBranch(final Coefficient<C> coefficient, final Set<String> 
labels, final List<Object> arguments) {
         super(coefficient, labels);
@@ -51,9 +53,17 @@ public class RepeatBranch<C, S> extends AbstractFunction<C> 
implements BranchFun
             if ('e' == type) {
                 this.emitCompilation = (Compilation<C, S, ?>) arguments.get(i 
+ 1);
                 this.emitLocation = location++;
+                if (this.emitLocation < 3)
+                    this.hasStartPredicates = true;
+                else
+                    this.hasEndPredicates = true;
             } else if ('u' == type) {
                 this.untilCompilation = (Compilation<C, S, ?>) arguments.get(i 
+ 1);
                 this.untilLocation = location++;
+                if (this.untilLocation < 3)
+                    this.hasStartPredicates = true;
+                else
+                    this.hasEndPredicates = true;
             } else {
                 this.repeatCompilation = (Compilation<C, S, S>) 
arguments.get(i + 1);
                 location = 3;
@@ -88,6 +98,14 @@ public class RepeatBranch<C, S> extends AbstractFunction<C> 
implements BranchFun
         return this.untilLocation;
     }
 
+    public boolean hasStartPredicates() {
+        return this.hasStartPredicates;
+    }
+
+    public boolean hasEndPredicates() {
+        return this.hasEndPredicates;
+    }
+
     public Map<Character, Compilation<C, S, S>> getCompilations() {
         return this.compilations;
     }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
index a0d3a01..1c555a1 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
@@ -86,19 +86,23 @@ public class TopologyUtil {
             final TupleTag<Traverser<C, S>> repeatLoop = new TupleTag<>();
             sink = source;
             for (int i = 0; i < Beam.MAX_REPETIONS; i++) {
-                final RepeatStartFn<C, S> startFn = new 
RepeatStartFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS 
- 1);
-                PCollectionTuple outputs = (PCollectionTuple) 
sink.apply(ParDo.of(startFn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
-                outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
-                repeatSinks.add(outputs.get(repeatDone));
-                sink = outputs.get(repeatLoop);
+                if (repeatFunction.hasStartPredicates()) {
+                    final RepeatStartFn<C, S> startFn = new 
RepeatStartFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS 
- 1);
+                    final PCollectionTuple outputs = (PCollectionTuple) 
sink.apply(ParDo.of(startFn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
+                    outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
+                    repeatSinks.add(outputs.get(repeatDone));
+                    sink = outputs.get(repeatLoop);
+                }
                 for (final CFunction<C> ff : 
repeatFunction.getRepeat().getFunctions()) {
                     sink = TopologyUtil.extend(sink, ff, traverserFactory);
                 }
-                final RepeatEndFn<C, S> endFn = new 
RepeatEndFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 
1);
-                outputs = (PCollectionTuple) 
sink.apply(ParDo.of(endFn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
-                outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
-                repeatSinks.add(outputs.get(repeatDone));
-                sink = outputs.get(repeatLoop);
+                if (repeatFunction.hasEndPredicates()) {
+                    final RepeatEndFn<C, S> endFn = new 
RepeatEndFn<>(repeatFunction, repeatDone, repeatLoop, i == Beam.MAX_REPETIONS - 
1);
+                    final PCollectionTuple outputs = (PCollectionTuple) 
sink.apply(ParDo.of(endFn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
+                    outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
+                    repeatSinks.add(outputs.get(repeatDone));
+                    sink = outputs.get(repeatLoop);
+                }
             }
             sink = 
PCollectionList.of(repeatSinks).apply(Flatten.pCollections());
         } else if (function instanceof BranchFunction) {
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
index b67d0b8..215637c 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
@@ -35,6 +35,8 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
     private final Compilation<C, S, S> repeat;
     private TraverserSet<C, S> outputTraversers = new TraverserSet<>();
     private TraverserSet<C, S> inputTraversers = new TraverserSet<>();
+    private final boolean hasStartPredicates;
+    private final boolean hasEndPredicates;
 
     RepeatStep(final Step<C, ?, S> previousStep, final RepeatBranch<C, S> 
repeatFunction) {
         super(previousStep, repeatFunction);
@@ -43,6 +45,8 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
         this.repeat = repeatFunction.getRepeat();
         this.untilLocation = repeatFunction.getUntilLocation();
         this.emitLocation = repeatFunction.getEmitLocation();
+        this.hasStartPredicates = repeatFunction.hasStartPredicates();
+        this.hasEndPredicates = repeatFunction.hasEndPredicates();
     }
 
     @Override
@@ -58,24 +62,26 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
     }
 
     private final void stageInput() {
-        final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? 
this.previousStep.next() : this.inputTraversers.remove();
-        if (1 == this.untilLocation) {
-            if (this.untilCompilation.filterTraverser(traverser.clone())) {
-                this.outputTraversers.add(traverser);
-            } else if (2 == this.emitLocation && 
this.emitCompilation.filterTraverser(traverser.clone())) {
-                this.outputTraversers.add(traverser);
-                this.repeat.addTraverser(traverser);
-            } else
-                this.repeat.addTraverser(traverser);
-        } else if (1 == this.emitLocation) {
-            if (this.emitCompilation.filterTraverser(traverser.clone()))
-                this.outputTraversers.add(traverser);
-            if (2 == this.untilLocation && 
this.untilCompilation.filterTraverser(traverser.clone()))
-                this.outputTraversers.add(traverser);
-            else
-                this.repeat.addTraverser(traverser);
+        if (this.hasStartPredicates) {
+            final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? 
this.previousStep.next() : this.inputTraversers.remove();
+            if (1 == this.untilLocation) {
+                if (this.untilCompilation.filterTraverser(traverser.clone())) {
+                    this.outputTraversers.add(traverser);
+                } else if (2 == this.emitLocation && 
this.emitCompilation.filterTraverser(traverser.clone())) {
+                    this.outputTraversers.add(traverser);
+                    this.repeat.addTraverser(traverser);
+                } else
+                    this.repeat.addTraverser(traverser);
+            } else if (1 == this.emitLocation) {
+                if (this.emitCompilation.filterTraverser(traverser.clone()))
+                    this.outputTraversers.add(traverser);
+                if (2 == this.untilLocation && 
this.untilCompilation.filterTraverser(traverser.clone()))
+                    this.outputTraversers.add(traverser);
+                else
+                    this.repeat.addTraverser(traverser);
+            }
         } else {
-            this.repeat.addTraverser(traverser);
+            this.repeat.addTraverser(this.inputTraversers.isEmpty() ? 
this.previousStep.next() : this.inputTraversers.remove());
         }
     }
 
@@ -84,21 +90,23 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
             this.stageInput();
             if (this.repeat.getProcessor().hasNext()) {
                 final Traverser<C, S> traverser = 
this.repeat.getProcessor().next();
-                if (3 == this.untilLocation) {
-                    if 
(this.untilCompilation.filterTraverser(traverser.clone())) {
-                        this.outputTraversers.add(traverser);
-                    } else if (4 == this.emitLocation && 
this.emitCompilation.filterTraverser(traverser.clone())) {
-                        this.outputTraversers.add(traverser);
-                        this.inputTraversers.add(traverser);
-                    } else
-                        this.inputTraversers.add(traverser);
-                } else if (3 == this.emitLocation) {
-                    if 
(this.emitCompilation.filterTraverser(traverser.clone()))
-                        this.outputTraversers.add(traverser);
-                    if (4 == this.untilLocation && 
this.untilCompilation.filterTraverser(traverser.clone()))
-                        this.outputTraversers.add(traverser);
-                    else
-                        this.inputTraversers.add(traverser);
+                if (this.hasEndPredicates) {
+                    if (3 == this.untilLocation) {
+                        if 
(this.untilCompilation.filterTraverser(traverser.clone())) {
+                            this.outputTraversers.add(traverser);
+                        } else if (4 == this.emitLocation && 
this.emitCompilation.filterTraverser(traverser.clone())) {
+                            this.outputTraversers.add(traverser);
+                            this.inputTraversers.add(traverser);
+                        } else
+                            this.inputTraversers.add(traverser);
+                    } else if (3 == this.emitLocation) {
+                        if 
(this.emitCompilation.filterTraverser(traverser.clone()))
+                            this.outputTraversers.add(traverser);
+                        if (4 == this.untilLocation && 
this.untilCompilation.filterTraverser(traverser.clone()))
+                            this.outputTraversers.add(traverser);
+                        else
+                            this.inputTraversers.add(traverser);
+                    }
                 } else {
                     this.inputTraversers.add(traverser);
                 }

Reply via email to