This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new a948102 Got order() working in both Pipes and Beam. It was a bit crazy in beam as it requires a reduce and unfold. Fortunately, the BarrierFunction in machine-core was sufficiently spec'd for the job. This gives me confidence that all barriers will come naturally from here. a948102 is described below commit a9481022e162656716f03516fcdf56a77722c456 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Thu Mar 28 08:40:31 2019 -0600 Got order() working in both Pipes and Beam. It was a bit crazy in beam as it requires a reduce and unfold. Fortunately, the BarrierFunction in machine-core was sufficiently spec'd for the job. This gives me confidence that all barriers will come naturally from here. --- .../tinkerpop/language/gremlin/Traversal.java | 2 + .../language/gremlin/TraversalSource.java | 4 +- .../language/gremlin/common/CommonTraversal.java | 5 + .../language/gremlin/core/CoreTraversal.java | 5 + .../machine/bytecode/compiler/CommonCompiler.java | 5 + .../machine/function/BarrierFunction.java | 2 + .../machine/function/barrier/JoinBarrier.java | 6 ++ .../{StallBarrier.java => OrderBarrier.java} | 32 ++++--- .../machine/function/barrier/StallBarrier.java | 7 +- .../machine/function/flatmap/UnfoldFlatMap.java | 2 +- .../machine/processor/beam/BarrierFn.java | 105 +++++++++++++++++++++ .../machine/processor/beam/io/BarrierCoder.java | 63 +++++++++++++ .../processor/beam/io/TraverserSetCoder.java | 63 +++++++++++++ .../processor/beam/sideeffect/InMemoryBarrier.java | 59 ++++++++++++ .../processor/beam/sideeffect/InMemoryReducer.java | 1 - .../machine/processor/beam/util/TopologyUtil.java | 7 ++ .../tinkerpop/machine/processor/beam/BeamTest.java | 17 ++++ .../machine/processor/pipes/BarrierStep.java | 2 +- .../machine/processor/pipes/PipesTest.java | 16 ++++ 19 files changed, 383 insertions(+), 20 deletions(-) diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java index a28e626..b8d478c 100644 --- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java +++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java @@ -86,6 +86,8 @@ public interface Traversal<C, S, E> extends Iterator<E> { public <R> Traversal<C, S, R> map(final Traversal<C, E, R> mapTraversal); + public Traversal<C, S, E> order(); + public Traversal<C, S, Path> path(final String... labels); public Traversal<C, S, E> repeat(final Traversal<C, E, E> repeatTraversal); diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java index e8c0bd1..37a57f5 100644 --- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java +++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java @@ -56,7 +56,7 @@ public class TraversalSource<C> implements Cloneable { public TraversalSource<C> withCoefficient(final Class<? extends Coefficient<C>> coefficient) { final TraversalSource<C> clone = this.clone(); clone.bytecode.addUniqueSourceInstruction(Symbols.WITH_COEFFICIENT, coefficient); - clone.coefficient = BytecodeUtil.getCoefficient(clone.bytecode).get(); + clone.coefficient = BytecodeUtil.getCoefficient(clone.bytecode).get(); // previously line guarantees existence return clone; } @@ -104,7 +104,7 @@ public class TraversalSource<C> implements Cloneable { // - private final void prepareSourceCode() { + private void prepareSourceCode() { if (!this.registered) { this.registered = true; this.bytecode = this.machine.register(this.bytecode); diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java index 738d1be..d6adf91 100644 --- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java +++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java @@ -192,6 +192,11 @@ public class CommonTraversal<C, S, E> extends AbstractTraversal<C, S, E> { } @Override + public Traversal<C, S, E> order() { + return this.addInstruction(Symbols.ORDER); + } + + @Override public Traversal<C, S, Path> path(final String... labels) { return this.addInstruction(Symbols.PATH, TraversalUtil.addObjects(labels, "|")); } diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java index b247a09..41f605a 100644 --- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java +++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java @@ -221,6 +221,11 @@ public class CoreTraversal<C, S, E> extends AbstractTraversal<C, S, E> { } @Override + public Traversal<C, S, E> order() { + throw new IllegalStateException("Unimplemented"); + } + + @Override public Traversal<C, S, Path> path(final String... labels) { return this.addInstruction(Symbols.PATH, TraversalUtil.addObjects(labels, "|")); } diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/CommonCompiler.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/CommonCompiler.java index 8fc29f2..bf86873 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/CommonCompiler.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/CommonCompiler.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.bytecode.compiler; import org.apache.tinkerpop.machine.bytecode.Instruction; import org.apache.tinkerpop.machine.function.CFunction; import org.apache.tinkerpop.machine.function.barrier.JoinBarrier; +import org.apache.tinkerpop.machine.function.barrier.OrderBarrier; import org.apache.tinkerpop.machine.function.barrier.StallBarrier; import org.apache.tinkerpop.machine.function.branch.BranchBranch; import org.apache.tinkerpop.machine.function.branch.RepeatBranch; @@ -78,6 +79,7 @@ public final class CommonCompiler implements BytecodeCompiler { put(Symbols.JOIN, FunctionType.BARRIER); put(Symbols.LOOPS, FunctionType.MAP); put(Symbols.MAP, FunctionType.MAP); + put(Symbols.ORDER, FunctionType.BARRIER); put(Symbols.PATH, FunctionType.MAP); put(Symbols.REDUCE, FunctionType.REDUCE); put(Symbols.REPEAT, FunctionType.BRANCH); @@ -122,6 +124,8 @@ public final class CommonCompiler implements BytecodeCompiler { return LoopsMap.compile(instruction); case Symbols.MAP: return MapMap.compile(instruction); + case Symbols.ORDER: + return OrderBarrier.compile(instruction); case Symbols.PATH: return PathMap.compile(instruction); case Symbols.REDUCE: @@ -191,6 +195,7 @@ public final class CommonCompiler implements BytecodeCompiler { public static final String JOIN = "join"; public static final String LOOPS = "loops"; public static final String MAP = "map"; + public static final String ORDER = "order"; public static final String PATH = "path"; public static final String REDUCE = "reduce"; public static final String REPEAT = "repeat"; diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BarrierFunction.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BarrierFunction.java index 3ec70c8..c41076e 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BarrierFunction.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BarrierFunction.java @@ -30,6 +30,8 @@ public interface BarrierFunction<C, S, E, B> extends BiFunction<Traverser<C, S>, public B getInitialValue(); + public B merge(final B barrierA, final B barrierB); + public Iterator<E> createIterator(final B barrier); public boolean returnsTraversers(); diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java index 7e5d939..85c8dcf 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java @@ -59,6 +59,12 @@ public final class JoinBarrier<C, K, V> extends AbstractFunction<C> implements B } @Override + public List<Map<K, V>> merge(final List<Map<K, V>> barrierA, final List<Map<K, V>> barrierB) { + barrierA.addAll(barrierB); + return barrierA; // TODO: unchecked in distributed Beam .. may be completely off + } + + @Override public Iterator<Map<K, V>> createIterator(final List<Map<K, V>> barrier) { return barrier.iterator(); } diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java similarity index 77% copy from java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java copy to java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java index a54bfed..fd3dc20 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java @@ -25,35 +25,32 @@ import org.apache.tinkerpop.machine.function.BarrierFunction; import org.apache.tinkerpop.machine.traverser.Traverser; import org.apache.tinkerpop.machine.traverser.TraverserSet; +import java.util.Comparator; import java.util.Iterator; -import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class StallBarrier<C, S> extends AbstractFunction<C> implements BarrierFunction<C, S, Traverser<C, S>, TraverserSet<C, S>> { +public final class OrderBarrier<C, S> extends AbstractFunction<C> implements BarrierFunction<C, S, Traverser<C, S>, TraverserSet<C, S>> { - private final int drainThreshold; - - private StallBarrier(final Coefficient<C> coefficient, final String label, final int drainThreshold) { + private OrderBarrier(final Coefficient<C> coefficient, final String label) { super(coefficient, label); - this.drainThreshold = drainThreshold; } - @Override - public TraverserSet<C, S> apply(final Traverser<C, S> traverser, final TraverserSet<C, S> traverserSet) { - traverserSet.add(traverser); - return traverserSet; + public TraverserSet<C, S> getInitialValue() { + return new TraverserSet<>(); } @Override - public TraverserSet<C, S> getInitialValue() { - return new TraverserSet<>(); + public TraverserSet<C, S> merge(final TraverserSet<C, S> barrierA, final TraverserSet<C, S> barrierB) { + barrierA.addAll(barrierB); + return barrierA; } @Override public Iterator<Traverser<C, S>> createIterator(final TraverserSet<C, S> barrier) { + barrier.sort(Comparator.comparingLong(t -> (Long) t.object())); return barrier.iterator(); } @@ -62,7 +59,14 @@ public final class StallBarrier<C, S> extends AbstractFunction<C> implements Bar return true; } - public static <C, S> StallBarrier<C, S> compile(final Instruction<C> instruction) { - return new StallBarrier<>(instruction.coefficient(), instruction.label(), 1000); // TODO + + @Override + public TraverserSet<C, S> apply(final Traverser<C, S> traverser, final TraverserSet<C, S> traverserSet) { + traverserSet.add(traverser); + return traverserSet; + } + + public static <C, S> OrderBarrier<C, S> compile(final Instruction<C> instruction) { + return new OrderBarrier<>(instruction.coefficient(), instruction.label()); } } diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java index a54bfed..a4ec204 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java @@ -26,7 +26,6 @@ import org.apache.tinkerpop.machine.traverser.Traverser; import org.apache.tinkerpop.machine.traverser.TraverserSet; import java.util.Iterator; -import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -48,6 +47,12 @@ public final class StallBarrier<C, S> extends AbstractFunction<C> implements Bar } @Override + public TraverserSet<C, S> merge(final TraverserSet<C, S> barrierA, final TraverserSet<C, S> barrierB) { + barrierA.addAll(barrierB); + return barrierA; + } + + @Override public TraverserSet<C, S> getInitialValue() { return new TraverserSet<>(); } diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java index 2b57523..cb44d61 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java @@ -36,7 +36,7 @@ import java.util.Set; */ public final class UnfoldFlatMap<C, S, E> extends AbstractFunction<C> implements FlatMapFunction<C, S, E> { - private UnfoldFlatMap(final Coefficient<C> coefficient, final String label) { + public UnfoldFlatMap(final Coefficient<C> coefficient, final String label) { super(coefficient, label); } diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java new file mode 100644 index 0000000..d736c60 --- /dev/null +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.beam; + +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.tinkerpop.machine.function.BarrierFunction; +import org.apache.tinkerpop.machine.processor.beam.io.BarrierCoder; +import org.apache.tinkerpop.machine.processor.beam.io.TraverserSetCoder; +import org.apache.tinkerpop.machine.processor.beam.sideeffect.InMemoryBarrier; +import org.apache.tinkerpop.machine.traverser.Traverser; +import org.apache.tinkerpop.machine.traverser.TraverserFactory; +import org.apache.tinkerpop.machine.util.IteratorUtils; + +import java.util.Iterator; + + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class BarrierFn<C, S, E, B> extends Combine.CombineFn<Traverser<C, S>, InMemoryBarrier<C, S, E, B>, B> implements Fn { + + private final BarrierFunction<C, S, E, B> barrierFunction; + + public BarrierFn(final BarrierFunction<C, S, E, B> barrierFunction) { + this.barrierFunction = barrierFunction; + } + + @Override + public InMemoryBarrier<C, S, E, B> createAccumulator() { + return new InMemoryBarrier<>(this.barrierFunction); + } + + @Override + public InMemoryBarrier<C, S, E, B> addInput(final InMemoryBarrier<C, S, E, B> accumulator, final Traverser<C, S> input) { + accumulator.addInput(input); + return accumulator; + } + + @Override + public InMemoryBarrier<C, S, E, B> mergeAccumulators(final Iterable<InMemoryBarrier<C, S, E, B>> accumulators) { + B barrier = this.barrierFunction.getInitialValue(); + for (final InMemoryBarrier<C, S, E, B> accumulator : accumulators) { + barrier = this.barrierFunction.merge(barrier, accumulator.extractOutput()); + } + return new InMemoryBarrier<>(barrier, this.barrierFunction); + } + + @Override + public B extractOutput(final InMemoryBarrier<C, S, E, B> accumulator) { + return accumulator.extractOutput(); + } + + @Override + public Coder<InMemoryBarrier<C, S, E, B>> getAccumulatorCoder(final CoderRegistry registry, final Coder<Traverser<C, S>> inputCoder) throws CannotProvideCoderException { + return new BarrierCoder<>(); + } + + @Override + public Coder<B> getDefaultOutputCoder(final CoderRegistry registry, final Coder<Traverser<C, S>> inputCoder) throws CannotProvideCoderException { + return (Coder<B>) new TraverserSetCoder<C, S>(); + } + + public static class BarrierIterateFn<C, S, E, B> extends DoFn<B, Traverser<C, S>> { + + private final BarrierFunction<C, S, E, B> barrierFunction; + private final TraverserFactory traverserFactory; + + + public BarrierIterateFn(final BarrierFunction<C, S, E, B> barrierFunction, final TraverserFactory traverserFactory) { + this.barrierFunction = barrierFunction; + this.traverserFactory = traverserFactory; + } + + @ProcessElement + public void processElement(final @Element B barrier, final OutputReceiver<Traverser<C, S>> output) { + final B local = this.barrierFunction.merge(this.barrierFunction.getInitialValue(), barrier); + final Iterator<Traverser<C, S>> iterator = this.barrierFunction.returnsTraversers() ? + (Iterator<Traverser<C, S>>) this.barrierFunction.createIterator(local) : + IteratorUtils.map(this.barrierFunction.createIterator(local), + e -> this.traverserFactory.create(this.barrierFunction, e)); + while (iterator.hasNext()) + output.output(iterator.next()); + } + } +} diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/BarrierCoder.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/BarrierCoder.java new file mode 100644 index 0000000..6d422f3 --- /dev/null +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/BarrierCoder.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.beam.io; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.tinkerpop.machine.processor.beam.sideeffect.InMemoryBarrier; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class BarrierCoder<C, S, E, B> extends Coder<InMemoryBarrier<C, S, E, B>> { + + @Override + public void encode(final InMemoryBarrier<C, S, E, B> value, final OutputStream outStream) throws CoderException, IOException { + ObjectOutputStream outputStream = new ObjectOutputStream(outStream); + outputStream.writeObject(value); + } + + @Override + public InMemoryBarrier<C, S, E, B> decode(InputStream inStream) throws CoderException, IOException { + try { + ObjectInputStream inputStream = new ObjectInputStream(inStream); + return (InMemoryBarrier<C, S, E, B>) inputStream.readObject(); + } catch (final ClassNotFoundException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + + } +} \ No newline at end of file diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/TraverserSetCoder.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/TraverserSetCoder.java new file mode 100644 index 0000000..a79e68f --- /dev/null +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/TraverserSetCoder.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.beam.io; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.tinkerpop.machine.traverser.TraverserSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TraverserSetCoder<C, S> extends Coder<TraverserSet<C, S>> { + + @Override + public void encode(final TraverserSet<C, S> value, final OutputStream outStream) throws CoderException, IOException { + ObjectOutputStream outputStream = new ObjectOutputStream(outStream); + outputStream.writeObject(value); + } + + @Override + public TraverserSet<C, S> decode(InputStream inStream) throws CoderException, IOException { + try { + ObjectInputStream inputStream = new ObjectInputStream(inStream); + return (TraverserSet<C, S>) inputStream.readObject(); + } catch (final ClassNotFoundException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + + } +} diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryBarrier.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryBarrier.java new file mode 100644 index 0000000..d381a63 --- /dev/null +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryBarrier.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.beam.sideeffect; + +import org.apache.beam.sdk.transforms.Combine; +import org.apache.tinkerpop.machine.function.BarrierFunction; +import org.apache.tinkerpop.machine.traverser.Traverser; + +import java.io.Serializable; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class InMemoryBarrier<C, S, E, B> implements Combine.AccumulatingCombineFn.Accumulator<Traverser<C, S>, InMemoryBarrier<C, S, E, B>, B>, Serializable { + + private B barrier; + private BarrierFunction<C, S, E, B> barrierFunction; + + public InMemoryBarrier(final B value, final BarrierFunction<C, S, E, B> barrierFunction) { + this.barrier = value; + this.barrierFunction = barrierFunction; + } + + public InMemoryBarrier(final BarrierFunction<C, S, E, B> barrierFunction) { + this(barrierFunction.getInitialValue(), barrierFunction); + } + + @Override + public void addInput(final Traverser<C, S> input) { + this.barrierFunction.apply(input, this.barrier); + } + + @Override + public void mergeAccumulator(final InMemoryBarrier<C, S, E, B> other) { + this.barrierFunction.merge(this.barrier, other.barrier); + } + + @Override + public B extractOutput() { + return this.barrier; + } + +} diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryReducer.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryReducer.java index bb01baf..0ead6f1 100644 --- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryReducer.java +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryReducer.java @@ -38,7 +38,6 @@ public class InMemoryReducer<C, S, E> implements Combine.AccumulatingCombineFn.A private final TraverserFactory<C> traverserFactory; public InMemoryReducer(final ReduceFunction<C, S, E> reduceFunction, final TraverserFactory<C> traverserFactory) { - super(); this.value = reduceFunction.getInitialValue(); this.reduceFunction = reduceFunction; this.traverserFactory = traverserFactory; diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java index f3ccf45..aa67878 100644 --- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; +import org.apache.tinkerpop.machine.function.BarrierFunction; import org.apache.tinkerpop.machine.function.BranchFunction; import org.apache.tinkerpop.machine.function.CFunction; import org.apache.tinkerpop.machine.function.FilterFunction; @@ -35,6 +36,7 @@ import org.apache.tinkerpop.machine.function.InitialFunction; import org.apache.tinkerpop.machine.function.MapFunction; import org.apache.tinkerpop.machine.function.ReduceFunction; import org.apache.tinkerpop.machine.function.branch.RepeatBranch; +import org.apache.tinkerpop.machine.processor.beam.BarrierFn; import org.apache.tinkerpop.machine.processor.beam.Beam; import org.apache.tinkerpop.machine.processor.beam.BranchFn; import org.apache.tinkerpop.machine.processor.beam.FilterFn; @@ -46,6 +48,7 @@ import org.apache.tinkerpop.machine.processor.beam.RepeatDeadEndFn; import org.apache.tinkerpop.machine.processor.beam.RepeatEndFn; import org.apache.tinkerpop.machine.processor.beam.RepeatStartFn; import org.apache.tinkerpop.machine.processor.beam.io.TraverserCoder; +import org.apache.tinkerpop.machine.processor.beam.io.TraverserSetCoder; import org.apache.tinkerpop.machine.traverser.Traverser; import org.apache.tinkerpop.machine.traverser.TraverserFactory; @@ -81,6 +84,10 @@ public final class TopologyUtil { sink = source.apply(ParDo.of(new InitialFn<>((InitialFunction<C, S>) function, traverserFactory))); } else if (function instanceof ReduceFunction) { sink = source.apply(Combine.globally(new ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory))); + } else if (function instanceof BarrierFunction) { + sink = source.apply(Combine.globally(new BarrierFn<>((BarrierFunction<C, S, E, M>) function))); + sink.setCoder(new TraverserSetCoder<>()); // TODO: generalize to any Barrier (this will be hard) + sink = (PCollection) sink.apply(ParDo.of(new BarrierFn.BarrierIterateFn<>((BarrierFunction<C, S, E, M>) function, traverserFactory))); } else if (function instanceof RepeatBranch) { final RepeatBranch<C, S> repeatFunction = (RepeatBranch<C, S>) function; final List<PCollection<Traverser<C, S>>> repeatOutputs = new ArrayList<>(); diff --git a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java index f3471c5..63b24cb 100644 --- a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java +++ b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java @@ -70,6 +70,23 @@ public class BeamTest { } @Test + public void testOrder() { + final Machine machine = LocalMachine.open(); + final TraversalSource<Long> g = Gremlin.<Long>traversal(machine) + .withCoefficient(LongCoefficient.class) + .withProcessor(BeamProcessor.class) + .withStrategy(IdentityStrategy.class); + + Traversal<Long, ?, ?> traversal = g.inject(7L, 3L, 5L, 20L, 1L, 2L).incr().order().incr().barrier().incr(); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + g.close(); + } + + @Test public void shouldWork() { final MachineServer server = new MachineServer(7777); final Machine machine = RemoteMachine.open(6666, "localhost", 7777); diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BarrierStep.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BarrierStep.java index be65f49..cbc9c06 100644 --- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BarrierStep.java +++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BarrierStep.java @@ -31,7 +31,7 @@ import java.util.Iterator; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -class BarrierStep<C, S, E, B> extends AbstractStep<C, S, E> { +final class BarrierStep<C, S, E, B> extends AbstractStep<C, S, E> { private final Barrier<B> barrier; private final BarrierFunction<C, S, E, B> barrierFunction; diff --git a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java index 5f3c20a..b3b2f93 100644 --- a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java +++ b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java @@ -62,6 +62,22 @@ public class PipesTest { } @Test + public void testOrder() { + final Machine machine = LocalMachine.open(); + final TraversalSource<Long> g = Gremlin.<Long>traversal(machine) + .withCoefficient(LongCoefficient.class) + .withProcessor(PipesProcessor.class) + .withStrategy(IdentityStrategy.class); + + Traversal<Long, ?, ?> traversal = g.inject(7L,3L,5L,20L,1L,2L).incr().order(); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + } + + @Test public void shouldWork() { final MachineServer server = new MachineServer(7777); final Machine machine = RemoteMachine.open(6666, "localhost", 7777);