This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new 4c141f6 Finally figured out the generalized branch model. Its a little different than TP3 and it accounts for stream ring theory stuff. However, what makes it really special is that it lends itself easily to either a pull-based processor (Pipes) or a push-based processor (Beam). Beam is now generalized where it can take a BranchFunction (regardless of instance) and create an appropriate split/merge topology. 4c141f6 is described below commit 4c141f6a39a4a04875f0b23748a7d7d6fab6e480 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Wed Mar 13 07:28:06 2019 -0600 Finally figured out the generalized branch model. Its a little different than TP3 and it accounts for stream ring theory stuff. However, what makes it really special is that it lends itself easily to either a pull-based processor (Pipes) or a push-based processor (Beam). Beam is now generalized where it can take a BranchFunction (regardless of instance) and create an appropriate split/merge topology. --- .../org/apache/tinkerpop/language/Symbols.java | 1 + .../org/apache/tinkerpop/language/Traversal.java | 14 +++- .../tinkerpop/machine/bytecode/BytecodeUtil.java | 6 ++ .../tinkerpop/machine/bytecode/Compilation.java | 6 ++ .../machine/functions/BranchFunction.java | 9 ++- .../machine/functions/InternalFunction.java | 6 -- .../machine/functions/branch/ChooseBranch.java | 86 ++++++++++++++++++++++ .../machine/functions/branch/RepeatBranch.java | 19 +++-- .../machine/functions/branch/UnionBranch.java | 27 +++++-- .../branch/selector/HasNextSelector.java} | 22 +++--- .../selector/Selector.java} | 9 ++- .../selector/TrueSelector.java} | 17 ++++- .../machine/functions/filter/FilterFilter.java | 4 - .../tinkerpop/machine/functions/map/MapMap.java | 5 -- .../machine/functions/reduce/GroupCountReduce.java | 4 - .../tinkerpop/machine/traversers/Traverser.java | 2 +- .../org/apache/tinkerpop/util/NumberHelper.java | 2 +- .../org/apache/tinkerpop/machine/beam/Beam.java | 30 +++++--- .../apache/tinkerpop/machine/beam/BranchFn.java | 21 +++++- .../apache/tinkerpop/machine/beam/BeamTest.java | 4 +- .../apache/tinkerpop/machine/pipes/BranchStep.java | 6 +- .../org/apache/tinkerpop/machine/pipes/Pipes.java | 2 +- .../apache/tinkerpop/machine/pipes/PipesTest.java | 9 +-- 23 files changed, 229 insertions(+), 82 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java index a11321f..efeeaad 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Symbols.java @@ -34,6 +34,7 @@ public final class Symbols { // INSTRUCTION OPS + public static final String CHOOSE_IF_THEN_ELSE = "chooseIfThenElse"; public static final String COUNT = "count"; public static final String FILTER = "filter"; public static final String GROUP_COUNT = "groupCount"; diff --git a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java index fdcc8d4..9a17bc5 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java +++ b/java/core/src/main/java/org/apache/tinkerpop/language/Traversal.java @@ -63,6 +63,11 @@ public class Traversal<C, S, E> implements Iterator<E> { return this; } + public <R> Traversal<C, S, R> choose(final Traversal<C, E, ?> predicate, final Traversal<C, S, R> trueTraversal, final Traversal<C, S, R> falseTraversal) { + this.bytecode.addInstruction(this.currentCoefficient, Symbols.CHOOSE_IF_THEN_ELSE, predicate, trueTraversal, falseTraversal); + return (Traversal) this; + } + public Traversal<C, S, Long> count() { this.bytecode.addInstruction(this.currentCoefficient, Symbols.COUNT); return (Traversal) this; @@ -158,8 +163,13 @@ public class Traversal<C, S, E> implements Iterator<E> { return (Traversal) this; } - public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversal, Traversal<C, E, R>... traversals) { - this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, traversal, traversals); + public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversalA, final Traversal<C, E, R> traversalB) { + this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, traversalA, traversalB); + return (Traversal) this; + } + + public <R> Traversal<C, S, R> union(final Traversal<C, E, R> traversalA, final Traversal<C, E, R> traversalB, final Traversal<C, E, R> traversalC) { + this.bytecode.addInstruction(this.currentCoefficient, Symbols.UNION, traversalA, traversalB, traversalC); return (Traversal) this; } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java index eea1df5..b7110a3 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.bytecode; import org.apache.tinkerpop.language.Symbols; import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.functions.CFunction; +import org.apache.tinkerpop.machine.functions.branch.ChooseBranch; import org.apache.tinkerpop.machine.functions.branch.RepeatBranch; import org.apache.tinkerpop.machine.functions.branch.UnionBranch; import org.apache.tinkerpop.machine.functions.filter.FilterFilter; @@ -136,6 +137,11 @@ public final class BytecodeUtil { final Coefficient<C> coefficient = instruction.coefficient(); final Set<String> labels = instruction.labels(); switch (op) { + case Symbols.CHOOSE_IF_THEN_ELSE: + return new ChooseBranch<>(coefficient, labels, + Compilation.compileOne(instruction.args()[0]), + Compilation.compileOne(instruction.args()[1]), + Compilation.compileOne(instruction.args()[2])); case Symbols.COUNT: return new CountReduce<>(coefficient, labels); case Symbols.FILTER: diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java index f973a8f..f4faf36 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/Compilation.java @@ -93,6 +93,12 @@ public final class Compilation<C, S, E> implements Serializable { return this.processor.hasNext(); } + public Processor<C, S, E> addTraverser(final Traverser<C, S> traverser) { + this.prepareProcessor(); + this.processor.addStart(traverser); + return this.processor; + } + @Override public String toString() { return this.functions.toString(); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java index 8f7965f..159b2c8 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java @@ -18,14 +18,21 @@ */ package org.apache.tinkerpop.machine.functions; +import org.apache.tinkerpop.machine.bytecode.Compilation; +import org.apache.tinkerpop.machine.functions.branch.selector.Selector; import org.apache.tinkerpop.machine.traversers.Traverser; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.function.Function; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface BranchFunction<C, S, E> extends Function<Traverser<C, S>, Iterator<Traverser<C, E>>>, InternalFunction<C> { +public interface BranchFunction<C, S, E, M> extends Function<Traverser<C, S>, Iterator<Traverser<C, E>>>, InternalFunction<C> { + public Selector<C, S, M> getBranchSelector(); + + public Map<M, List<Compilation<C, S, E>>> getBranches(); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java index 9481137..1cf77f6 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/InternalFunction.java @@ -18,15 +18,9 @@ */ package org.apache.tinkerpop.machine.functions; -import org.apache.tinkerpop.machine.bytecode.Compilation; - -import java.util.List; - /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public interface InternalFunction<C> extends CFunction<C> { - public List<Compilation<C, ?, ?>> getInternals(); - } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java new file mode 100644 index 0000000..c34763f --- /dev/null +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/ChooseBranch.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.functions.branch; + +import org.apache.tinkerpop.machine.bytecode.Compilation; +import org.apache.tinkerpop.machine.coefficients.Coefficient; +import org.apache.tinkerpop.machine.functions.AbstractFunction; +import org.apache.tinkerpop.machine.functions.BranchFunction; +import org.apache.tinkerpop.machine.functions.branch.selector.HasNextSelector; +import org.apache.tinkerpop.machine.functions.branch.selector.Selector; +import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.util.StringFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class ChooseBranch<C, S, E> extends AbstractFunction<C, S, Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E, Boolean> { + + private final HasNextSelector<C, S> branchSelector; + private final Map<Boolean, List<Compilation<C, S, E>>> branches; + ///// + private final Compilation<C, S, ?> predicate; + private final Compilation<C, S, E> trueBranch; + private final Compilation<C, S, E> falseBranch; + + + public ChooseBranch(final Coefficient<C> coefficient, final Set<String> labels, + final Compilation<C, S, ?> predicate, + final Compilation<C, S, E> trueBranch, + final Compilation<C, S, E> falseBranch) { + super(coefficient, labels); + this.predicate = predicate; + this.trueBranch = trueBranch; + this.falseBranch = falseBranch; + + this.branchSelector = new HasNextSelector<>(predicate); + this.branches = new HashMap<>(); + this.branches.put(Boolean.TRUE, Collections.singletonList(trueBranch)); + this.branches.put(Boolean.FALSE, Collections.singletonList(falseBranch)); + } + + @Override + public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) { + return this.predicate.filterTraverser(traverser) ? + this.trueBranch.addTraverser(traverser) : + this.falseBranch.addTraverser(traverser); + } + + @Override + public String toString() { + return StringFactory.makeFunctionString(this, this.predicate, this.trueBranch, this.falseBranch); + } + + @Override + public Selector<C, S, Boolean> getBranchSelector() { + return this.branchSelector; + } + + @Override + public Map<Boolean, List<Compilation<C, S, E>>> getBranches() { + return this.branches; + } +} diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java index c31b7c1..cdbbad1 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/RepeatBranch.java @@ -22,20 +22,21 @@ import org.apache.tinkerpop.machine.bytecode.Compilation; import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.functions.AbstractFunction; import org.apache.tinkerpop.machine.functions.BranchFunction; +import org.apache.tinkerpop.machine.functions.branch.selector.Selector; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.IteratorUtils; import org.apache.tinkerpop.util.StringFactory; -import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class RepeatBranch<C, S> extends AbstractFunction<C, S, Iterator<Traverser<C, S>>> implements BranchFunction<C, S, S> { +public class RepeatBranch<C, S> extends AbstractFunction<C, S, Iterator<Traverser<C, S>>> implements BranchFunction<C, S, S, Boolean> { private final Compilation<C, S, S> repeat; private final Compilation<C, S, ?> until; @@ -64,11 +65,6 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, S, Iterator<Traverse return StringFactory.makeFunctionString(this, this.repeat, this.until); } - @Override - public List<Compilation<C, ?, ?>> getInternals() { - return Arrays.asList(this.repeat, this.until); - } - public Compilation<C, S, S> getRepeat() { return this.repeat; } @@ -77,4 +73,13 @@ public class RepeatBranch<C, S> extends AbstractFunction<C, S, Iterator<Traverse return this.until; } + @Override + public Selector<C, S, Boolean> getBranchSelector() { + return null; + } + + @Override + public Map<Boolean, List<Compilation<C, S, S>>> getBranches() { + return null; + } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java index 6f72b48..f10e22c 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/UnionBranch.java @@ -22,43 +22,54 @@ import org.apache.tinkerpop.machine.bytecode.Compilation; import org.apache.tinkerpop.machine.coefficients.Coefficient; import org.apache.tinkerpop.machine.functions.AbstractFunction; import org.apache.tinkerpop.machine.functions.BranchFunction; +import org.apache.tinkerpop.machine.functions.branch.selector.Selector; +import org.apache.tinkerpop.machine.functions.branch.selector.TrueSelector; import org.apache.tinkerpop.machine.traversers.Traverser; import org.apache.tinkerpop.util.MultiIterator; import org.apache.tinkerpop.util.StringFactory; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class UnionBranch<C, S, E> extends AbstractFunction<C, S, Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E> { +public final class UnionBranch<C, S, E> extends AbstractFunction<C, S, Iterator<Traverser<C, E>>> implements BranchFunction<C, S, E, Boolean> { - private final List<Compilation<C, S, E>> branches; + private final Map<Boolean, List<Compilation<C, S, E>>> branches; public UnionBranch(final Coefficient<C> coefficient, final Set<String> labels, final List<Compilation<C, S, E>> branches) { super(coefficient, labels); - this.branches = branches; + this.branches = new HashMap<>(); + this.branches.put(Boolean.TRUE, branches); } @Override public Iterator<Traverser<C, E>> apply(final Traverser<C, S> traverser) { final MultiIterator<Traverser<C, E>> iterator = new MultiIterator<>(); - for (final Compilation<C, S, E> branch : this.branches) { - iterator.addIterator(branch.flatMapTraverser(traverser.clone())); + for (final Compilation<C, S, E> branch : this.branches.get(Boolean.TRUE)) { + branch.getProcessor().addStart(traverser.clone()); + iterator.addIterator(branch.getProcessor()); } return iterator; } @Override public String toString() { - return StringFactory.makeFunctionString(this, this.branches); + return StringFactory.makeFunctionString(this, this.branches.values().iterator().next()); // make a flat array } @Override - public List<Compilation<C, ?, ?>> getInternals() { - return (List) this.branches; + public Selector<C, S, Boolean> getBranchSelector() { + return TrueSelector.instance(); + } + + @Override + public Map<Boolean, List<Compilation<C, S, E>>> getBranches() { + return this.branches; } } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/HasNextSelector.java similarity index 63% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/HasNextSelector.java index 65e8cae..09a14e4 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/HasNextSelector.java @@ -16,22 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.beam; +package org.apache.tinkerpop.machine.functions.branch.selector; -import org.apache.tinkerpop.machine.functions.BranchFunction; +import org.apache.tinkerpop.machine.bytecode.Compilation; import org.apache.tinkerpop.machine.traversers.Traverser; +import java.util.Optional; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class BranchFn<C, S, E> extends AbstractFn<C, S, E> { +public final class HasNextSelector<C, S> implements Selector<C, S, Boolean> { + + private final Compilation<C, S, ?> compilation; - public BranchFn(final BranchFunction<C, S, E> branchFunction) { - super(branchFunction); + public HasNextSelector(final Compilation<C, S, ?> compilation) { + this.compilation = compilation; } - @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - throw new IllegalStateException("Branching is implemented using split/merge streams in Beam"); + @Override + public Optional<Boolean> from(final Traverser<C, S> traverser) { + return Optional.of(this.compilation.filterTraverser(traverser)); } -} \ No newline at end of file +} diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/Selector.java similarity index 79% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/Selector.java index 8f7965f..609d4af 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/Selector.java @@ -16,16 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; +package org.apache.tinkerpop.machine.functions.branch.selector; import org.apache.tinkerpop.machine.traversers.Traverser; -import java.util.Iterator; -import java.util.function.Function; +import java.io.Serializable; +import java.util.Optional; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface BranchFunction<C, S, E> extends Function<Traverser<C, S>, Iterator<Traverser<C, E>>>, InternalFunction<C> { +public interface Selector<C, S, E> extends Serializable { + public Optional<E> from(final Traverser<C, S> traverser); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/TrueSelector.java similarity index 67% copy from java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/TrueSelector.java index 8f7965f..6ba2517 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/BranchFunction.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/branch/selector/TrueSelector.java @@ -16,16 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.functions; +package org.apache.tinkerpop.machine.functions.branch.selector; import org.apache.tinkerpop.machine.traversers.Traverser; -import java.util.Iterator; -import java.util.function.Function; +import java.util.Optional; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface BranchFunction<C, S, E> extends Function<Traverser<C, S>, Iterator<Traverser<C, E>>>, InternalFunction<C> { +public final class TrueSelector<C, S> implements Selector<C, S, Boolean> { + private static final TrueSelector INSTANCE = new TrueSelector<>(); + + @Override + public Optional<Boolean> from(final Traverser<C, S> traverser) { + return Optional.of(Boolean.TRUE); + } + + public static <C, S> TrueSelector<C, S> instance() { + return INSTANCE; + } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java index f9e86d4..5f551c6 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/filter/FilterFilter.java @@ -52,8 +52,4 @@ public final class FilterFilter<C, S> extends AbstractFunction<C, S, S> implemen return StringFactory.makeFunctionString(this, this.internalFilter); } - @Override - public List<Compilation<C, ?, ?>> getInternals() { - return Collections.singletonList(this.internalFilter); - } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java index 2dd0ffe..b0d0fc5 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/map/MapMap.java @@ -51,9 +51,4 @@ public class MapMap<C, S, E> extends AbstractFunction<C, S, E> implements MapFun public String toString() { return StringFactory.makeFunctionString(this, this.internalMap); } - - @Override - public List<Compilation<C, ?, ?>> getInternals() { - return Collections.singletonList(this.internalMap); - } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java index 50285bf..d9f713f 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/functions/reduce/GroupCountReduce.java @@ -63,8 +63,4 @@ public class GroupCountReduce<C, S, E> extends AbstractFunction<C, S, Map<E, Lon return new HashMap<>(); } - @Override - public List<Compilation<C, ?, ?>> getInternals() { - return Collections.singletonList(this.byCompilation); - } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java index e8703c0..1a4ed56 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/Traverser.java @@ -59,7 +59,7 @@ public interface Traverser<C, S> extends Serializable, Cloneable { return IteratorUtils.map(function.apply(this), e -> this.split(function, e)); } - public default <E> Iterator<Traverser<C, E>> branch(final BranchFunction<C, S, E> function) { + public default <E,M> Iterator<Traverser<C, E>> branch(final BranchFunction<C, S, E,M> function) { return function.apply(this); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java b/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java index 8a2ae11..3e13af8 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java +++ b/java/core/src/main/java/org/apache/tinkerpop/util/NumberHelper.java @@ -311,7 +311,7 @@ public final class NumberHelper { } else /*if (clazz.equals(BigDecimal.class))*/ { bits = bits < 128 ? 128 : bits; fp = true; - break; // maxed out, no need to check remaining numbers + break; // maxed out, no need to from remaining numbers } } return determineNumberClass(bits, fp); diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java index 9888804..c24f26f 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java @@ -48,7 +48,9 @@ import org.apache.tinkerpop.machine.traversers.TraverserFactory; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -80,7 +82,6 @@ public class Beam<C, S, E> implements Processor<C, S, E> { final boolean branching) { DoFn<Traverser<C, S>, Traverser<C, E>> fn = null; if (function instanceof RepeatBranch) { - ; final List<PCollection> outputs = new ArrayList<>(); final TupleTag repeatDone = new TupleTag<>(); final TupleTag repeatLoop = new TupleTag<>(); @@ -98,17 +99,28 @@ public class Beam<C, S, E> implements Processor<C, S, E> { collection = (PCollection) PCollectionList.of((Iterable) outputs).apply(Flatten.pCollections()); collection.setCoder(new TraverserCoder()); } else if (function instanceof BranchFunction) { - final List<Compilation<C, ?, ?>> branches = ((BranchFunction<C, ?, ?>) function).getInternals(); - final List<PCollection<Traverser<C, S>>> collections = new ArrayList<>(branches.size()); - for (final Compilation<C, ?, ?> branch : branches) { - PCollection<Traverser<C, S>> branchCollection = collection; - for (final CFunction<C> branchFunction : branch.getFunctions()) { - branchCollection = this.processFunction(branchCollection, traverserFactory, branchFunction, true); + final BranchFunction<C, Object, Object, Object> branchFunction = (BranchFunction) function; + final Map<Object, TupleTag> branches = new LinkedHashMap<>(); + for (final Map.Entry<Object, List<Compilation<C, Object, Object>>> b : branchFunction.getBranches().entrySet()) { + branches.put(b.getKey(), new TupleTag()); + } + final BranchFn<C, Object, Object, Object> branchFn = new BranchFn<>(branchFunction, branches); + final List<TupleTag<?>> tags = new ArrayList(branches.values()); + PCollectionTuple collectionTuple = (PCollectionTuple) collection.apply(ParDo.of((DoFn) branchFn).withOutputTags(tags.get(0), TupleTagList.of(tags.subList(1, tags.size())))); + collectionTuple.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); + final List<PCollection<Traverser<C, S>>> collections = new ArrayList<>(); + for (final Map.Entry<Object, List<Compilation<C, Object, Object>>> b : branchFunction.getBranches().entrySet()) { + for (final Compilation<C, Object, Object> compilation : b.getValue()) { + PCollection<Traverser<C, S>> branchCollection = collectionTuple.get(branches.get(b.getKey())); + for (final CFunction<C> f : compilation.getFunctions()) { + branchCollection = this.processFunction(branchCollection, traverserFactory, f, true); + } + + collections.add(branchCollection); } - collections.add(branchCollection); } collection = PCollectionList.of(collections).apply(Flatten.pCollections()); - this.functions.add(new BranchFn<>((BranchFunction<C, S, E>) function)); + this.functions.add(branchFn); } else if (function instanceof InitialFunction) { fn = new InitialFn((InitialFunction<C, S>) function, traverserFactory); } else if (function instanceof FilterFunction) { diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java index 65e8cae..2f55b5e 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/BranchFn.java @@ -18,20 +18,33 @@ */ package org.apache.tinkerpop.machine.beam; +import org.apache.beam.sdk.values.TupleTag; import org.apache.tinkerpop.machine.functions.BranchFunction; +import org.apache.tinkerpop.machine.functions.branch.selector.Selector; import org.apache.tinkerpop.machine.traversers.Traverser; +import java.util.Map; +import java.util.Optional; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class BranchFn<C, S, E> extends AbstractFn<C, S, E> { +public class BranchFn<C, S, E, M> extends AbstractFn<C, S, E> { + + private final Map<M, TupleTag> branches; + private final Selector<C, S, M> branchSelector; - public BranchFn(final BranchFunction<C, S, E> branchFunction) { + public BranchFn(final BranchFunction<C, S, E, M> branchFunction, final Map<M, TupleTag> branches) { super(branchFunction); + this.branches = branches; + this.branchSelector = branchFunction.getBranchSelector(); } @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - throw new IllegalStateException("Branching is implemented using split/merge streams in Beam"); + public void processElement(final @Element Traverser<C, S> traverser, final MultiOutputReceiver out) { + final Optional<M> selector = this.branchSelector.from(traverser); + if (selector.isPresent()) + out.get(this.branches.get(selector.get())).output(traverser.clone()); + } } \ No newline at end of file diff --git a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java index 364b740..510b451 100644 --- a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java +++ b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java @@ -48,12 +48,12 @@ public class BeamTest { System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(incr()).identity().incr().count(); + traversal = g.inject(10L).choose(__.is(7L),__.incr(),__.<Long>incr().incr()); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L).union(__.<Long>incr().incr().union(__.<Long>incr().as("b").identity().as("a"), __.<Long>incr().identity()), __.incr()); + traversal = g.inject(7L).union(__.incr(),__.<Long>incr().incr().union(__.incr(),__.incr())); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java index 64c2387..fec154a 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java @@ -27,12 +27,12 @@ import java.util.Iterator; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class BranchStep<C, S, E> extends AbstractStep<C, S, E> { +public final class BranchStep<C, S, E, M> extends AbstractStep<C, S, E> { - private final BranchFunction<C, S, E> branchFunction; + private final BranchFunction<C, S, E, M> branchFunction; private Iterator<Traverser<C, E>> iterator = Collections.emptyIterator(); - public BranchStep(final AbstractStep<C, ?, S> previousStep, final BranchFunction<C, S, E> branchFunction) { + public BranchStep(final AbstractStep<C, ?, S> previousStep, final BranchFunction<C, S, E, M> branchFunction) { super(previousStep, branchFunction); this.branchFunction = branchFunction; } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java index 5cda654..24e0cd4 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java @@ -47,7 +47,7 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> { for (final CFunction<?> function : compilation.getFunctions()) { final AbstractStep nextStep; if (function instanceof BranchFunction) - nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, ?>) function); + nextStep = new BranchStep(previousStep, (BranchFunction<C, ?, ?,?>) function); else if (function instanceof FilterFunction) nextStep = new FilterStep(previousStep, (FilterFunction<C, ?>) function); else if (function instanceof FlatMapFunction) diff --git a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java index e12c439..d51b966 100644 --- a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java +++ b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java @@ -49,17 +49,12 @@ public class PipesTest { System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().is(14L).count().c(10L).incr().sum(); + traversal = g.inject(7L).union(__.incr(),__.<Long>incr().incr().union(__.incr(),__.incr())); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L).union(__.<Long>incr().incr().count(), __.<Long, Long>c(10L).incr().sum()); - System.out.println(TraversalUtil.getBytecode(traversal)); - System.out.println(traversal); - System.out.println(traversal.toList()); - System.out.println("\n----------\n"); - traversal = g.inject(7L).incr().filter(__.<Long>incr().incr().is(9L)); + traversal = g.inject(8L).choose(__.is(7L),__.incr(),__.<Long>incr().incr()); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList());