This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 4c141f6  Finally figured out the generalized branch model. Its a 
little different than TP3 and it accounts for stream ring theory stuff. 
However, what makes it really special is that it lends itself easily to either 
a pull-based processor (Pipes) or a push-based processor (Beam). Beam is now 
generalized where it can take a BranchFunction (regardless of instance) and 
create an appropriate split/merge topology.
4c141f6 is described below

commit 4c141f6a39a4a04875f0b23748a7d7d6fab6e480
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Wed Mar 13 07:28:06 2019 -0600

    Finally figured out the generalized branch model. Its a little different 
than TP3 and it accounts for stream ring theory stuff. However, what makes it 
really special is that it lends itself easily to either a pull-based processor 
(Pipes) or a push-based processor (Beam). Beam is now generalized where it can 
take a BranchFunction (regardless of instance) and create an appropriate 
split/merge topology.
---
 .../org/apache/tinkerpop/language/Symbols.java     |  1 +
 .../org/apache/tinkerpop/language/Traversal.java   | 14 +++-
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   |  6 ++
 .../tinkerpop/machine/bytecode/Compilation.java    |  6 ++
 .../machine/functions/BranchFunction.java          |  9 ++-
 .../machine/functions/InternalFunction.java        |  6 --
 .../machine/functions/branch/ChooseBranch.java     | 86 ++++++++++++++++++++++
 .../machine/functions/branch/RepeatBranch.java     | 19 +++--
 .../machine/functions/branch/UnionBranch.java      | 27 +++++--
 .../branch/selector/HasNextSelector.java}          | 22 +++---
 .../selector/Selector.java}                        |  9 ++-
 .../selector/TrueSelector.java}                    | 17 ++++-
 .../machine/functions/filter/FilterFilter.java     |  4 -
 .../tinkerpop/machine/functions/map/MapMap.java    |  5 --
 .../machine/functions/reduce/GroupCountReduce.java |  4 -
 .../tinkerpop/machine/traversers/Traverser.java    |  2 +-
 .../org/apache/tinkerpop/util/NumberHelper.java    |  2 +-
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 30 +++++---
 .../apache/tinkerpop/machine/beam/BranchFn.java    | 21 +++++-
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  4 +-
 .../apache/tinkerpop/machine/pipes/BranchStep.java |  6 +-
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  |  2 +-
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |  9 +--
 23 files changed, 229 insertions(+), 82 deletions(-)

diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
index a11321f..efeeaad 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java
@@ -34,6 +34,7 @@ public final class Symbols {
 
 
     // INSTRUCTION OPS
+    public static final String CHOOSE_IF_THEN_ELSE = "chooseIfThenElse";
     public static final String COUNT = "count";
     public static final String FILTER = "filter";
     public static final String GROUP_COUNT = "groupCount";
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java 
b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
index fdcc8d4..9a17bc5 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java
@@ -63,6 +63,11 @@ public class Traversal<C, S, E> implements Iterator<E> {
         return this;
     }
 
+    public <R> Traversal<C, S, R> choose(final Traversal<C, E, ?> predicate, 
final Traversal<C, S, R> trueTraversal, final Traversal<C, S, R> 
falseTraversal) {
+        this.bytecode.addInstruction(this.currentCoefficient, 
Symbols.CHOOSE_IF_THEN_ELSE, predicate, trueTraversal, falseTraversal);
+        return (Traversal) this;
+    }
+
     public Traversal<C, S, Long> count() {
         this.bytecode.addInstruction(this.currentCoefficient, Symbols.COUNT);
         return (Traversal) this;
@@ -158,8 +163,13 @@ public class Traversal<C, S, E> implements Iterator<E> {
         return (Traversal) this;
     }
 
-    public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversal, 
Traversal<C, E, R>... traversals) {
-        this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, 
traversal, traversals);
+    public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversalA, 
final Traversal<C, E, R> traversalB) {
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, 
traversalA, traversalB);
+        return (Traversal) this;
+    }
+
+    public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversalA, 
final Traversal<C, E, R> traversalB, final Traversal<C, E, R> traversalC) {
+        this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, 
traversalA, traversalB, traversalC);
         return (Traversal) this;
     }
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index eea1df5..b7110a3 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.bytecode;
 import org.apache.tinkerpop.language.Symbols;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.CFunction;
+import org.apache.tinkerpop.machine.functions.branch.ChooseBranch;
 import org.apache.tinkerpop.machine.functions.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.functions.branch.UnionBranch;
 import org.apache.tinkerpop.machine.functions.filter.FilterFilter;
@@ -136,6 +137,11 @@ public final class BytecodeUtil {
         final Coefficient<C> coefficient = instruction.coefficient();
         final Set<String> labels = instruction.labels();
         switch (op) {
+            case Symbols.CHOOSE_IF_THEN_ELSE:
+                return new ChooseBranch<>(coefficient, labels,
+                        Compilation.compileOne(instruction.args()[0]),
+                        Compilation.compileOne(instruction.args()[1]),
+                        Compilation.compileOne(instruction.args()[2]));
             case Symbols.COUNT:
                 return new CountReduce<>(coefficient, labels);
             case Symbols.FILTER:
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java
index f973a8f..f4faf36 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java
@@ -93,6 +93,12 @@ public final class Compilation<C, S, E> implements 
Serializable {
         return this.processor.hasNext();
     }
 
+    public Processor<C, S, E> addTraverser(final Traverser<C, S> traverser) {
+        this.prepareProcessor();
+        this.processor.addStart(traverser);
+        return this.processor;
+    }
+
     @Override
     public String toString() {
         return this.functions.toString();
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
index 8f7965f..159b2c8 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
@@ -18,14 +18,21 @@
  */
 package org.apache.tinkerpop.machine.functions;
 
+import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface BranchFunction<C, S, E> extends Function<Traverser<C, S>, 
Iterator<Traverser<C, E>>>, InternalFunction<C> {
+public interface BranchFunction<C, S, E, M> extends Function<Traverser<C, S>, 
Iterator<Traverser<C, E>>>, InternalFunction<C> {
 
+    public Selector<C, S, M> getBranchSelector();
+
+    public Map<M, List<Compilation<C, S, E>>> getBranches();
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java
index 9481137..1cf77f6 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java
@@ -18,15 +18,9 @@
  */
 package org.apache.tinkerpop.machine.functions;
 
-import org.apache.tinkerpop.machine.bytecode.Compilation;
-
-import java.util.List;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public interface InternalFunction<C> extends CFunction<C> {
 
-    public List<Compilation<C, ?, ?>> getInternals();
-
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java
new file mode 100644
index 0000000..c34763f
--- /dev/null
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.functions.branch;
+
+import org.apache.tinkerpop.machine.bytecode.Compilation;
+import org.apache.tinkerpop.machine.coefficients.Coefficient;
+import org.apache.tinkerpop.machine.functions.AbstractFunction;
+import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.tinkerpop.machine.functions.branch.selector.HasNextSelector;
+import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
+import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.util.StringFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ChooseBranch<C, S, E> extends AbstractFunction<C, S, 
Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E, Boolean> {
+
+    private final HasNextSelector<C, S> branchSelector;
+    private final Map<Boolean, List<Compilation<C, S, E>>> branches;
+    /////
+    private final Compilation<C, S, ?> predicate;
+    private final Compilation<C, S, E> trueBranch;
+    private final Compilation<C, S, E> falseBranch;
+
+
+    public ChooseBranch(final Coefficient<C> coefficient, final Set<String> 
labels,
+                        final Compilation<C, S, ?> predicate,
+                        final Compilation<C, S, E> trueBranch,
+                        final Compilation<C, S, E> falseBranch) {
+        super(coefficient, labels);
+        this.predicate = predicate;
+        this.trueBranch = trueBranch;
+        this.falseBranch = falseBranch;
+
+        this.branchSelector = new HasNextSelector<>(predicate);
+        this.branches = new HashMap<>();
+        this.branches.put(Boolean.TRUE, Collections.singletonList(trueBranch));
+        this.branches.put(Boolean.FALSE, 
Collections.singletonList(falseBranch));
+    }
+
+    @Override
+    public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
+        return this.predicate.filterTraverser(traverser) ?
+                this.trueBranch.addTraverser(traverser) :
+                this.falseBranch.addTraverser(traverser);
+    }
+
+    @Override
+    public String toString() {
+        return StringFactory.makeFunctionString(this, this.predicate, 
this.trueBranch, this.falseBranch);
+    }
+
+    @Override
+    public Selector<C, S, Boolean> getBranchSelector() {
+        return this.branchSelector;
+    }
+
+    @Override
+    public Map<Boolean, List<Compilation<C, S, E>>> getBranches() {
+        return this.branches;
+    }
+}
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
index c31b7c1..cdbbad1 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java
@@ -22,20 +22,21 @@ import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.IteratorUtils;
 import org.apache.tinkerpop.util.StringFactory;
 
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class RepeatBranch<C, S> extends AbstractFunction<C, S, 
Iterator<Traverser<C, S>>> implements BranchFunction<C, S, S> {
+public class RepeatBranch<C, S> extends AbstractFunction<C, S, 
Iterator<Traverser<C, S>>> implements BranchFunction<C, S, S, Boolean> {
 
     private final Compilation<C, S, S> repeat;
     private final Compilation<C, S, ?> until;
@@ -64,11 +65,6 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, 
S, Iterator<Traverse
         return StringFactory.makeFunctionString(this, this.repeat, this.until);
     }
 
-    @Override
-    public List<Compilation<C, ?, ?>> getInternals() {
-        return Arrays.asList(this.repeat, this.until);
-    }
-
     public Compilation<C, S, S> getRepeat() {
         return this.repeat;
     }
@@ -77,4 +73,13 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, 
S, Iterator<Traverse
         return this.until;
     }
 
+    @Override
+    public Selector<C, S, Boolean> getBranchSelector() {
+        return null;
+    }
+
+    @Override
+    public Map<Boolean, List<Compilation<C, S, S>>> getBranches() {
+        return null;
+    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
index 6f72b48..f10e22c 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java
@@ -22,43 +22,54 @@ import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.coefficients.Coefficient;
 import org.apache.tinkerpop.machine.functions.AbstractFunction;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
+import org.apache.tinkerpop.machine.functions.branch.selector.TrueSelector;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.MultiIterator;
 import org.apache.tinkerpop.util.StringFactory;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class UnionBranch<C, S, E> extends AbstractFunction<C, S, 
Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E> {
+public final class UnionBranch<C, S, E> extends AbstractFunction<C, S, 
Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E, Boolean> {
 
-    private final List<Compilation<C, S, E>> branches;
+    private final Map<Boolean, List<Compilation<C, S, E>>> branches;
 
 
     public UnionBranch(final Coefficient<C> coefficient, final Set<String> 
labels, final List<Compilation<C, S, E>> branches) {
         super(coefficient, labels);
-        this.branches = branches;
+        this.branches = new HashMap<>();
+        this.branches.put(Boolean.TRUE, branches);
     }
 
     @Override
     public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
         final MultiIterator<Traverser<C, E>> iterator = new MultiIterator<>();
-        for (final Compilation<C, S, E> branch : this.branches) {
-            iterator.addIterator(branch.flatMapTraverser(traverser.clone()));
+        for (final Compilation<C, S, E> branch : 
this.branches.get(Boolean.TRUE)) {
+            branch.getProcessor().addStart(traverser.clone());
+            iterator.addIterator(branch.getProcessor());
         }
         return iterator;
     }
 
     @Override
     public String toString() {
-        return StringFactory.makeFunctionString(this, this.branches);
+        return StringFactory.makeFunctionString(this, 
this.branches.values().iterator().next()); // make a flat array
     }
 
     @Override
-    public List<Compilation<C, ?, ?>> getInternals() {
-        return (List) this.branches;
+    public Selector<C, S, Boolean> getBranchSelector() {
+        return TrueSelector.instance();
+    }
+
+    @Override
+    public Map<Boolean, List<Compilation<C, S, E>>> getBranches() {
+        return this.branches;
     }
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/HasNextSelector.java
similarity index 63%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/HasNextSelector.java
index 65e8cae..09a14e4 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/HasNextSelector.java
@@ -16,22 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.beam;
+package org.apache.tinkerpop.machine.functions.branch.selector;
 
-import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.tinkerpop.machine.bytecode.Compilation;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.util.Optional;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class BranchFn<C, S, E> extends AbstractFn<C, S, E> {
+public final class HasNextSelector<C, S> implements Selector<C, S, Boolean> {
+
+    private final Compilation<C, S, ?> compilation;
 
-    public BranchFn(final BranchFunction<C, S, E> branchFunction) {
-        super(branchFunction);
+    public HasNextSelector(final Compilation<C, S, ?> compilation) {
+        this.compilation = compilation;
     }
 
-    @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        throw new IllegalStateException("Branching is implemented using 
split/merge streams in Beam");
+    @Override
+    public Optional<Boolean> from(final Traverser<C, S> traverser) {
+        return Optional.of(this.compilation.filterTraverser(traverser));
     }
-}
\ No newline at end of file
+}
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/Selector.java
similarity index 79%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/Selector.java
index 8f7965f..609d4af 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/Selector.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.functions.branch.selector;
 
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
-import java.util.Iterator;
-import java.util.function.Function;
+import java.io.Serializable;
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface BranchFunction<C, S, E> extends Function<Traverser<C, S>, 
Iterator<Traverser<C, E>>>, InternalFunction<C> {
+public interface Selector<C, S, E> extends Serializable {
 
+    public Optional<E> from(final Traverser<C, S> traverser);
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/TrueSelector.java
similarity index 67%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/TrueSelector.java
index 8f7965f..6ba2517 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/TrueSelector.java
@@ -16,16 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.functions;
+package org.apache.tinkerpop.machine.functions.branch.selector;
 
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
-import java.util.Iterator;
-import java.util.function.Function;
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface BranchFunction<C, S, E> extends Function<Traverser<C, S>, 
Iterator<Traverser<C, E>>>, InternalFunction<C> {
+public final class TrueSelector<C, S> implements Selector<C, S, Boolean> {
 
+    private static final TrueSelector INSTANCE = new TrueSelector<>();
+
+    @Override
+    public Optional<Boolean> from(final Traverser<C, S> traverser) {
+        return Optional.of(Boolean.TRUE);
+    }
+
+    public static <C, S> TrueSelector<C, S> instance() {
+        return INSTANCE;
+    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
index f9e86d4..5f551c6 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java
@@ -52,8 +52,4 @@ public final class FilterFilter<C, S> extends 
AbstractFunction<C, S, S> implemen
         return StringFactory.makeFunctionString(this, this.internalFilter);
     }
 
-    @Override
-    public List<Compilation<C, ?, ?>> getInternals() {
-        return Collections.singletonList(this.internalFilter);
-    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
index 2dd0ffe..b0d0fc5 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java
@@ -51,9 +51,4 @@ public class MapMap<C, S, E> extends AbstractFunction<C, S, 
E> implements MapFun
     public String toString() {
         return StringFactory.makeFunctionString(this, this.internalMap);
     }
-
-    @Override
-    public List<Compilation<C, ?, ?>> getInternals() {
-        return Collections.singletonList(this.internalMap);
-    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java
index 50285bf..d9f713f 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java
@@ -63,8 +63,4 @@ public class GroupCountReduce<C, S, E> extends 
AbstractFunction<C, S, Map<E, Lon
         return new HashMap<>();
     }
 
-    @Override
-    public List<Compilation<C, ?, ?>> getInternals() {
-        return Collections.singletonList(this.byCompilation);
-    }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
index e8703c0..1a4ed56 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java
@@ -59,7 +59,7 @@ public interface Traverser<C, S> extends Serializable, 
Cloneable {
         return IteratorUtils.map(function.apply(this), e -> 
this.split(function, e));
     }
 
-    public default <E> Iterator<Traverser<C, E>> branch(final 
BranchFunction<C, S, E> function) {
+    public default <E,M> Iterator<Traverser<C, E>> branch(final 
BranchFunction<C, S, E,M> function) {
         return function.apply(this);
     }
 
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java 
b/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java
index 8a2ae11..3e13af8 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java
@@ -311,7 +311,7 @@ public final class NumberHelper {
             } else /*if (clazz.equals(BigDecimal.class))*/ {
                 bits = bits < 128 ? 128 : bits;
                 fp = true;
-                break; // maxed out, no need to check remaining numbers
+                break; // maxed out, no need to from remaining numbers
             }
         }
         return determineNumberClass(bits, fp);
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index 9888804..c24f26f 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -48,7 +48,9 @@ import 
org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -80,7 +82,6 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
             final boolean branching) {
         DoFn<Traverser<C, S>, Traverser<C, E>> fn = null;
         if (function instanceof RepeatBranch) {
-            ;
             final List<PCollection> outputs = new ArrayList<>();
             final TupleTag repeatDone = new TupleTag<>();
             final TupleTag repeatLoop = new TupleTag<>();
@@ -98,17 +99,28 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
             collection = (PCollection) PCollectionList.of((Iterable) 
outputs).apply(Flatten.pCollections());
             collection.setCoder(new TraverserCoder());
         } else if (function instanceof BranchFunction) {
-            final List<Compilation<C, ?, ?>> branches = ((BranchFunction<C, ?, 
?>) function).getInternals();
-            final List<PCollection<Traverser<C, S>>> collections = new 
ArrayList<>(branches.size());
-            for (final Compilation<C, ?, ?> branch : branches) {
-                PCollection<Traverser<C, S>> branchCollection = collection;
-                for (final CFunction<C> branchFunction : 
branch.getFunctions()) {
-                    branchCollection = this.processFunction(branchCollection, 
traverserFactory, branchFunction, true);
+            final BranchFunction<C, Object, Object, Object> branchFunction = 
(BranchFunction) function;
+            final Map<Object, TupleTag> branches = new LinkedHashMap<>();
+            for (final Map.Entry<Object, List<Compilation<C, Object, Object>>> 
b : branchFunction.getBranches().entrySet()) {
+                branches.put(b.getKey(), new TupleTag());
+            }
+            final BranchFn<C, Object, Object, Object> branchFn = new 
BranchFn<>(branchFunction, branches);
+            final List<TupleTag<?>> tags = new ArrayList(branches.values());
+            PCollectionTuple collectionTuple = (PCollectionTuple) 
collection.apply(ParDo.of((DoFn) branchFn).withOutputTags(tags.get(0), 
TupleTagList.of(tags.subList(1, tags.size()))));
+            collectionTuple.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
+            final List<PCollection<Traverser<C, S>>> collections = new 
ArrayList<>();
+            for (final Map.Entry<Object, List<Compilation<C, Object, Object>>> 
b : branchFunction.getBranches().entrySet()) {
+                for (final Compilation<C, Object, Object> compilation : 
b.getValue()) {
+                    PCollection<Traverser<C, S>> branchCollection = 
collectionTuple.get(branches.get(b.getKey()));
+                    for (final CFunction<C> f : compilation.getFunctions()) {
+                        branchCollection = 
this.processFunction(branchCollection, traverserFactory, f, true);
+                    }
+
+                    collections.add(branchCollection);
                 }
-                collections.add(branchCollection);
             }
             collection = 
PCollectionList.of(collections).apply(Flatten.pCollections());
-            this.functions.add(new BranchFn<>((BranchFunction<C, S, E>) 
function));
+            this.functions.add(branchFn);
         } else if (function instanceof InitialFunction) {
             fn = new InitialFn((InitialFunction<C, S>) function, 
traverserFactory);
         } else if (function instanceof FilterFunction) {
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
index 65e8cae..2f55b5e 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java
@@ -18,20 +18,33 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.tinkerpop.machine.functions.BranchFunction;
+import org.apache.tinkerpop.machine.functions.branch.selector.Selector;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.util.Map;
+import java.util.Optional;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class BranchFn<C, S, E> extends AbstractFn<C, S, E> {
+public class BranchFn<C, S, E, M> extends AbstractFn<C, S, E> {
+
+    private final Map<M, TupleTag> branches;
+    private final Selector<C, S, M> branchSelector;
 
-    public BranchFn(final BranchFunction<C, S, E> branchFunction) {
+    public BranchFn(final BranchFunction<C, S, E, M> branchFunction, final 
Map<M, TupleTag> branches) {
         super(branchFunction);
+        this.branches = branches;
+        this.branchSelector = branchFunction.getBranchSelector();
     }
 
     @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        throw new IllegalStateException("Branching is implemented using 
split/merge streams in Beam");
+    public void processElement(final @Element Traverser<C, S> traverser, final 
MultiOutputReceiver out) {
+        final Optional<M> selector = this.branchSelector.from(traverser);
+        if (selector.isPresent())
+            
out.get(this.branches.get(selector.get())).output(traverser.clone());
+
     }
 }
\ No newline at end of file
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 364b740..510b451 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -48,12 +48,12 @@ public class BeamTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(incr()).identity().incr().count();
+        traversal = 
g.inject(10L).choose(__.is(7L),__.incr(),__.<Long>incr().incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = 
g.inject(7L).union(__.<Long>incr().incr().union(__.<Long>incr().as("b").identity().as("a"),
 __.<Long>incr().identity()), __.incr());
+        traversal = 
g.inject(7L).union(__.incr(),__.<Long>incr().incr().union(__.incr(),__.incr()));
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
index 64c2387..fec154a 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
@@ -27,12 +27,12 @@ import java.util.Iterator;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class BranchStep<C, S, E> extends AbstractStep<C, S, E> {
+public final class BranchStep<C, S, E, M> extends AbstractStep<C, S, E> {
 
-    private final BranchFunction<C, S, E> branchFunction;
+    private final BranchFunction<C, S, E, M> branchFunction;
     private Iterator<Traverser<C, E>> iterator = Collections.emptyIterator();
 
-    public BranchStep(final AbstractStep<C, ?, S> previousStep, final 
BranchFunction<C, S, E> branchFunction) {
+    public BranchStep(final AbstractStep<C, ?, S> previousStep, final 
BranchFunction<C, S, E, M> branchFunction) {
         super(previousStep, branchFunction);
         this.branchFunction = branchFunction;
     }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index 5cda654..24e0cd4 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -47,7 +47,7 @@ public final class Pipes<C, S, E> implements Processor<C, S, 
E> {
         for (final CFunction<?> function : compilation.getFunctions()) {
             final AbstractStep nextStep;
             if (function instanceof BranchFunction)
-                nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?>) function);
+                nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, 
?,?>) function);
             else if (function instanceof FilterFunction)
                 nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) 
function);
             else if (function instanceof FlatMapFunction)
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index e12c439..d51b966 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -49,17 +49,12 @@ public class PipesTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().is(14L).count().c(10L).incr().sum();
+        traversal = 
g.inject(7L).union(__.incr(),__.<Long>incr().incr().union(__.incr(),__.incr()));
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L).union(__.<Long>incr().incr().count(), 
__.<Long, Long>c(10L).incr().sum());
-        System.out.println(TraversalUtil.getBytecode(traversal));
-        System.out.println(traversal);
-        System.out.println(traversal.toList());
-        System.out.println("\n----------\n");
-        traversal = g.inject(7L).incr().filter(__.<Long>incr().incr().is(9L));
+        traversal = 
g.inject(8L).choose(__.is(7L),__.incr(),__.<Long>incr().incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());

Reply via email to