This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new a948102  Got order() working in both Pipes and Beam. It was a bit 
crazy in beam as it requires a reduce and unfold. Fortunately, the 
BarrierFunction in machine-core was sufficiently spec'd for the job. This gives 
me confidence that all barriers will come naturally from here.
a948102 is described below

commit a9481022e162656716f03516fcdf56a77722c456
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Thu Mar 28 08:40:31 2019 -0600

    Got order() working in both Pipes and Beam. It was a bit crazy in beam as 
it requires a reduce and unfold. Fortunately, the BarrierFunction in 
machine-core was sufficiently spec'd for the job. This gives me confidence that 
all barriers will come naturally from here.
---
 .../tinkerpop/language/gremlin/Traversal.java      |   2 +
 .../language/gremlin/TraversalSource.java          |   4 +-
 .../language/gremlin/common/CommonTraversal.java   |   5 +
 .../language/gremlin/core/CoreTraversal.java       |   5 +
 .../machine/bytecode/compiler/CommonCompiler.java  |   5 +
 .../machine/function/BarrierFunction.java          |   2 +
 .../machine/function/barrier/JoinBarrier.java      |   6 ++
 .../{StallBarrier.java => OrderBarrier.java}       |  32 ++++---
 .../machine/function/barrier/StallBarrier.java     |   7 +-
 .../machine/function/flatmap/UnfoldFlatMap.java    |   2 +-
 .../machine/processor/beam/BarrierFn.java          | 105 +++++++++++++++++++++
 .../machine/processor/beam/io/BarrierCoder.java    |  63 +++++++++++++
 .../processor/beam/io/TraverserSetCoder.java       |  63 +++++++++++++
 .../processor/beam/sideeffect/InMemoryBarrier.java |  59 ++++++++++++
 .../processor/beam/sideeffect/InMemoryReducer.java |   1 -
 .../machine/processor/beam/util/TopologyUtil.java  |   7 ++
 .../tinkerpop/machine/processor/beam/BeamTest.java |  17 ++++
 .../machine/processor/pipes/BarrierStep.java       |   2 +-
 .../machine/processor/pipes/PipesTest.java         |  16 ++++
 19 files changed, 383 insertions(+), 20 deletions(-)

diff --git 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
index a28e626..b8d478c 100644
--- 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
+++ 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
@@ -86,6 +86,8 @@ public interface Traversal<C, S, E> extends Iterator<E> {
 
     public <R> Traversal<C, S, R> map(final Traversal<C, E, R> mapTraversal);
 
+    public Traversal<C, S, E> order();
+
     public Traversal<C, S, Path> path(final String... labels);
 
     public Traversal<C, S, E> repeat(final Traversal<C, E, E> repeatTraversal);
diff --git 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
index e8c0bd1..37a57f5 100644
--- 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
+++ 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalSource.java
@@ -56,7 +56,7 @@ public class TraversalSource<C> implements Cloneable {
     public TraversalSource<C> withCoefficient(final Class<? extends 
Coefficient<C>> coefficient) {
         final TraversalSource<C> clone = this.clone();
         clone.bytecode.addUniqueSourceInstruction(Symbols.WITH_COEFFICIENT, 
coefficient);
-        clone.coefficient = BytecodeUtil.getCoefficient(clone.bytecode).get();
+        clone.coefficient = BytecodeUtil.getCoefficient(clone.bytecode).get(); 
// previously line guarantees existence
         return clone;
     }
 
@@ -104,7 +104,7 @@ public class TraversalSource<C> implements Cloneable {
 
     //
 
-    private final void prepareSourceCode() {
+    private void prepareSourceCode() {
         if (!this.registered) {
             this.registered = true;
             this.bytecode = this.machine.register(this.bytecode);
diff --git 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java
 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java
index 738d1be..d6adf91 100644
--- 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java
+++ 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java
@@ -192,6 +192,11 @@ public class CommonTraversal<C, S, E> extends 
AbstractTraversal<C, S, E> {
     }
 
     @Override
+    public Traversal<C, S, E> order() {
+        return this.addInstruction(Symbols.ORDER);
+    }
+
+    @Override
     public Traversal<C, S, Path> path(final String... labels) {
         return this.addInstruction(Symbols.PATH, 
TraversalUtil.addObjects(labels, "|"));
     }
diff --git 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java
 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java
index b247a09..41f605a 100644
--- 
a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java
+++ 
b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java
@@ -221,6 +221,11 @@ public class CoreTraversal<C, S, E> extends 
AbstractTraversal<C, S, E> {
     }
 
     @Override
+    public Traversal<C, S, E> order() {
+        throw new IllegalStateException("Unimplemented");
+    }
+
+    @Override
     public Traversal<C, S, Path> path(final String... labels) {
         return this.addInstruction(Symbols.PATH, 
TraversalUtil.addObjects(labels, "|"));
     }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/CommonCompiler.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/CommonCompiler.java
index 8fc29f2..bf86873 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/CommonCompiler.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/CommonCompiler.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.bytecode.compiler;
 import org.apache.tinkerpop.machine.bytecode.Instruction;
 import org.apache.tinkerpop.machine.function.CFunction;
 import org.apache.tinkerpop.machine.function.barrier.JoinBarrier;
+import org.apache.tinkerpop.machine.function.barrier.OrderBarrier;
 import org.apache.tinkerpop.machine.function.barrier.StallBarrier;
 import org.apache.tinkerpop.machine.function.branch.BranchBranch;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
@@ -78,6 +79,7 @@ public final class CommonCompiler implements BytecodeCompiler 
{
         put(Symbols.JOIN, FunctionType.BARRIER);
         put(Symbols.LOOPS, FunctionType.MAP);
         put(Symbols.MAP, FunctionType.MAP);
+        put(Symbols.ORDER, FunctionType.BARRIER);
         put(Symbols.PATH, FunctionType.MAP);
         put(Symbols.REDUCE, FunctionType.REDUCE);
         put(Symbols.REPEAT, FunctionType.BRANCH);
@@ -122,6 +124,8 @@ public final class CommonCompiler implements 
BytecodeCompiler {
                 return LoopsMap.compile(instruction);
             case Symbols.MAP:
                 return MapMap.compile(instruction);
+            case Symbols.ORDER:
+                return OrderBarrier.compile(instruction);
             case Symbols.PATH:
                 return PathMap.compile(instruction);
             case Symbols.REDUCE:
@@ -191,6 +195,7 @@ public final class CommonCompiler implements 
BytecodeCompiler {
         public static final String JOIN = "join";
         public static final String LOOPS = "loops";
         public static final String MAP = "map";
+        public static final String ORDER = "order";
         public static final String PATH = "path";
         public static final String REDUCE = "reduce";
         public static final String REPEAT = "repeat";
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BarrierFunction.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BarrierFunction.java
index 3ec70c8..c41076e 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BarrierFunction.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/BarrierFunction.java
@@ -30,6 +30,8 @@ public interface BarrierFunction<C, S, E, B> extends 
BiFunction<Traverser<C, S>,
 
     public B getInitialValue();
 
+    public B merge(final B barrierA, final B barrierB);
+
     public Iterator<E> createIterator(final B barrier);
 
     public boolean returnsTraversers();
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java
index 7e5d939..85c8dcf 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/JoinBarrier.java
@@ -59,6 +59,12 @@ public final class JoinBarrier<C, K, V> extends 
AbstractFunction<C> implements B
     }
 
     @Override
+    public List<Map<K, V>> merge(final List<Map<K, V>> barrierA, final 
List<Map<K, V>> barrierB) {
+        barrierA.addAll(barrierB);
+        return barrierA; // TODO: unchecked in distributed Beam .. may be 
completely off
+    }
+
+    @Override
     public Iterator<Map<K, V>> createIterator(final List<Map<K, V>> barrier) {
         return barrier.iterator();
     }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java
similarity index 77%
copy from 
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java
copy to 
java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java
index a54bfed..fd3dc20 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java
@@ -25,35 +25,32 @@ import 
org.apache.tinkerpop.machine.function.BarrierFunction;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserSet;
 
+import java.util.Comparator;
 import java.util.Iterator;
-import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class StallBarrier<C, S> extends AbstractFunction<C> implements 
BarrierFunction<C, S, Traverser<C, S>, TraverserSet<C, S>> {
+public final class OrderBarrier<C, S> extends AbstractFunction<C> implements 
BarrierFunction<C, S, Traverser<C, S>, TraverserSet<C, S>> {
 
-    private final int drainThreshold;
-
-    private StallBarrier(final Coefficient<C> coefficient, final String label, 
final int drainThreshold) {
+    private OrderBarrier(final Coefficient<C> coefficient, final String label) 
{
         super(coefficient, label);
-        this.drainThreshold = drainThreshold;
     }
 
-
     @Override
-    public TraverserSet<C, S> apply(final Traverser<C, S> traverser, final 
TraverserSet<C, S> traverserSet) {
-        traverserSet.add(traverser);
-        return traverserSet;
+    public TraverserSet<C, S> getInitialValue() {
+        return new TraverserSet<>();
     }
 
     @Override
-    public TraverserSet<C, S> getInitialValue() {
-        return new TraverserSet<>();
+    public TraverserSet<C, S> merge(final TraverserSet<C, S> barrierA, final 
TraverserSet<C, S> barrierB) {
+        barrierA.addAll(barrierB);
+        return barrierA;
     }
 
     @Override
     public Iterator<Traverser<C, S>> createIterator(final TraverserSet<C, S> 
barrier) {
+        barrier.sort(Comparator.comparingLong(t -> (Long) t.object()));
         return barrier.iterator();
     }
 
@@ -62,7 +59,14 @@ public final class StallBarrier<C, S> extends 
AbstractFunction<C> implements Bar
         return true;
     }
 
-    public static <C, S> StallBarrier<C, S> compile(final Instruction<C> 
instruction) {
-        return new StallBarrier<>(instruction.coefficient(), 
instruction.label(), 1000); // TODO
+
+    @Override
+    public TraverserSet<C, S> apply(final Traverser<C, S> traverser, final 
TraverserSet<C, S> traverserSet) {
+        traverserSet.add(traverser);
+        return traverserSet;
+    }
+
+    public static <C, S> OrderBarrier<C, S> compile(final Instruction<C> 
instruction) {
+        return new OrderBarrier<>(instruction.coefficient(), 
instruction.label());
     }
 }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java
index a54bfed..a4ec204 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/StallBarrier.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserSet;
 
 import java.util.Iterator;
-import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -48,6 +47,12 @@ public final class StallBarrier<C, S> extends 
AbstractFunction<C> implements Bar
     }
 
     @Override
+    public TraverserSet<C, S> merge(final TraverserSet<C, S> barrierA, final 
TraverserSet<C, S> barrierB) {
+        barrierA.addAll(barrierB);
+        return barrierA;
+    }
+
+    @Override
     public TraverserSet<C, S> getInitialValue() {
         return new TraverserSet<>();
     }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
index 2b57523..cb44d61 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
@@ -36,7 +36,7 @@ import java.util.Set;
  */
 public final class UnfoldFlatMap<C, S, E> extends AbstractFunction<C> 
implements FlatMapFunction<C, S, E> {
 
-    private UnfoldFlatMap(final Coefficient<C> coefficient, final String 
label) {
+    public UnfoldFlatMap(final Coefficient<C> coefficient, final String label) 
{
         super(coefficient, label);
     }
 
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java
new file mode 100644
index 0000000..d736c60
--- /dev/null
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.beam;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.tinkerpop.machine.function.BarrierFunction;
+import org.apache.tinkerpop.machine.processor.beam.io.BarrierCoder;
+import org.apache.tinkerpop.machine.processor.beam.io.TraverserSetCoder;
+import org.apache.tinkerpop.machine.processor.beam.sideeffect.InMemoryBarrier;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.traverser.TraverserFactory;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
+
+import java.util.Iterator;
+
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class BarrierFn<C, S, E, B> extends Combine.CombineFn<Traverser<C, S>, 
InMemoryBarrier<C, S, E, B>, B> implements Fn {
+
+    private final BarrierFunction<C, S, E, B> barrierFunction;
+
+    public BarrierFn(final BarrierFunction<C, S, E, B> barrierFunction) {
+        this.barrierFunction = barrierFunction;
+    }
+
+    @Override
+    public InMemoryBarrier<C, S, E, B> createAccumulator() {
+        return new InMemoryBarrier<>(this.barrierFunction);
+    }
+
+    @Override
+    public InMemoryBarrier<C, S, E, B> addInput(final InMemoryBarrier<C, S, E, 
B> accumulator, final Traverser<C, S> input) {
+        accumulator.addInput(input);
+        return accumulator;
+    }
+
+    @Override
+    public InMemoryBarrier<C, S, E, B> mergeAccumulators(final 
Iterable<InMemoryBarrier<C, S, E, B>> accumulators) {
+        B barrier = this.barrierFunction.getInitialValue();
+        for (final InMemoryBarrier<C, S, E, B> accumulator : accumulators) {
+            barrier = this.barrierFunction.merge(barrier, 
accumulator.extractOutput());
+        }
+        return new InMemoryBarrier<>(barrier, this.barrierFunction);
+    }
+
+    @Override
+    public B extractOutput(final InMemoryBarrier<C, S, E, B> accumulator) {
+        return accumulator.extractOutput();
+    }
+
+    @Override
+    public Coder<InMemoryBarrier<C, S, E, B>> getAccumulatorCoder(final 
CoderRegistry registry, final Coder<Traverser<C, S>> inputCoder) throws 
CannotProvideCoderException {
+        return new BarrierCoder<>();
+    }
+
+    @Override
+    public Coder<B> getDefaultOutputCoder(final CoderRegistry registry, final 
Coder<Traverser<C, S>> inputCoder) throws CannotProvideCoderException {
+        return (Coder<B>) new TraverserSetCoder<C, S>();
+    }
+
+    public static class BarrierIterateFn<C, S, E, B> extends DoFn<B, 
Traverser<C, S>> {
+
+        private final BarrierFunction<C, S, E, B> barrierFunction;
+        private final TraverserFactory traverserFactory;
+
+
+        public BarrierIterateFn(final BarrierFunction<C, S, E, B> 
barrierFunction, final TraverserFactory traverserFactory) {
+            this.barrierFunction = barrierFunction;
+            this.traverserFactory = traverserFactory;
+        }
+
+        @ProcessElement
+        public void processElement(final @Element B barrier, final 
OutputReceiver<Traverser<C, S>> output) {
+            final B local = 
this.barrierFunction.merge(this.barrierFunction.getInitialValue(), barrier);
+            final Iterator<Traverser<C, S>> iterator = 
this.barrierFunction.returnsTraversers() ?
+                    (Iterator<Traverser<C, S>>) 
this.barrierFunction.createIterator(local) :
+                    
IteratorUtils.map(this.barrierFunction.createIterator(local),
+                            e -> 
this.traverserFactory.create(this.barrierFunction, e));
+            while (iterator.hasNext())
+                output.output(iterator.next());
+        }
+    }
+}
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/BarrierCoder.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/BarrierCoder.java
new file mode 100644
index 0000000..6d422f3
--- /dev/null
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/BarrierCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.beam.io;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.tinkerpop.machine.processor.beam.sideeffect.InMemoryBarrier;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class BarrierCoder<C, S, E, B> extends Coder<InMemoryBarrier<C, S, E, 
B>> {
+
+    @Override
+    public void encode(final InMemoryBarrier<C, S, E, B> value, final 
OutputStream outStream) throws CoderException, IOException {
+        ObjectOutputStream outputStream = new ObjectOutputStream(outStream);
+        outputStream.writeObject(value);
+    }
+
+    @Override
+    public InMemoryBarrier<C, S, E, B> decode(InputStream inStream) throws 
CoderException, IOException {
+        try {
+            ObjectInputStream inputStream = new ObjectInputStream(inStream);
+            return (InMemoryBarrier<C, S, E, B>) inputStream.readObject();
+        } catch (final ClassNotFoundException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+
+    }
+}
\ No newline at end of file
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/TraverserSetCoder.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/TraverserSetCoder.java
new file mode 100644
index 0000000..a79e68f
--- /dev/null
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/io/TraverserSetCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.beam.io;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.tinkerpop.machine.traverser.TraverserSet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TraverserSetCoder<C, S> extends Coder<TraverserSet<C, S>> {
+
+    @Override
+    public void encode(final TraverserSet<C, S> value, final OutputStream 
outStream) throws CoderException, IOException {
+        ObjectOutputStream outputStream = new ObjectOutputStream(outStream);
+        outputStream.writeObject(value);
+    }
+
+    @Override
+    public TraverserSet<C, S> decode(InputStream inStream) throws 
CoderException, IOException {
+        try {
+            ObjectInputStream inputStream = new ObjectInputStream(inStream);
+            return (TraverserSet<C, S>) inputStream.readObject();
+        } catch (final ClassNotFoundException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+
+    }
+}
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryBarrier.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryBarrier.java
new file mode 100644
index 0000000..d381a63
--- /dev/null
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryBarrier.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.beam.sideeffect;
+
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.tinkerpop.machine.function.BarrierFunction;
+import org.apache.tinkerpop.machine.traverser.Traverser;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InMemoryBarrier<C, S, E, B> implements 
Combine.AccumulatingCombineFn.Accumulator<Traverser<C, S>, InMemoryBarrier<C, 
S, E, B>, B>, Serializable {
+
+    private B barrier;
+    private BarrierFunction<C, S, E, B> barrierFunction;
+
+    public InMemoryBarrier(final B value, final BarrierFunction<C, S, E, B> 
barrierFunction) {
+        this.barrier = value;
+        this.barrierFunction = barrierFunction;
+    }
+
+    public InMemoryBarrier(final BarrierFunction<C, S, E, B> barrierFunction) {
+        this(barrierFunction.getInitialValue(), barrierFunction);
+    }
+
+    @Override
+    public void addInput(final Traverser<C, S> input) {
+        this.barrierFunction.apply(input, this.barrier);
+    }
+
+    @Override
+    public void mergeAccumulator(final InMemoryBarrier<C, S, E, B> other) {
+        this.barrierFunction.merge(this.barrier, other.barrier);
+    }
+
+    @Override
+    public B extractOutput() {
+        return this.barrier;
+    }
+
+}
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryReducer.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryReducer.java
index bb01baf..0ead6f1 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryReducer.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/sideeffect/InMemoryReducer.java
@@ -38,7 +38,6 @@ public class InMemoryReducer<C, S, E> implements 
Combine.AccumulatingCombineFn.A
     private final TraverserFactory<C> traverserFactory;
 
     public InMemoryReducer(final ReduceFunction<C, S, E> reduceFunction, final 
TraverserFactory<C> traverserFactory) {
-        super();
         this.value = reduceFunction.getInitialValue();
         this.reduceFunction = reduceFunction;
         this.traverserFactory = traverserFactory;
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
index f3ccf45..aa67878 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
+import org.apache.tinkerpop.machine.function.BarrierFunction;
 import org.apache.tinkerpop.machine.function.BranchFunction;
 import org.apache.tinkerpop.machine.function.CFunction;
 import org.apache.tinkerpop.machine.function.FilterFunction;
@@ -35,6 +36,7 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.function.ReduceFunction;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.processor.beam.BarrierFn;
 import org.apache.tinkerpop.machine.processor.beam.Beam;
 import org.apache.tinkerpop.machine.processor.beam.BranchFn;
 import org.apache.tinkerpop.machine.processor.beam.FilterFn;
@@ -46,6 +48,7 @@ import 
org.apache.tinkerpop.machine.processor.beam.RepeatDeadEndFn;
 import org.apache.tinkerpop.machine.processor.beam.RepeatEndFn;
 import org.apache.tinkerpop.machine.processor.beam.RepeatStartFn;
 import org.apache.tinkerpop.machine.processor.beam.io.TraverserCoder;
+import org.apache.tinkerpop.machine.processor.beam.io.TraverserSetCoder;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
 
@@ -81,6 +84,10 @@ public final class TopologyUtil {
             sink = source.apply(ParDo.of(new InitialFn<>((InitialFunction<C, 
S>) function, traverserFactory)));
         } else if (function instanceof ReduceFunction) {
             sink = source.apply(Combine.globally(new 
ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory)));
+        } else if (function instanceof BarrierFunction) {
+            sink = source.apply(Combine.globally(new 
BarrierFn<>((BarrierFunction<C, S, E, M>) function)));
+            sink.setCoder(new TraverserSetCoder<>()); // TODO: generalize to 
any Barrier (this will be hard)
+            sink = (PCollection) sink.apply(ParDo.of(new 
BarrierFn.BarrierIterateFn<>((BarrierFunction<C, S, E, M>) function, 
traverserFactory)));
         } else if (function instanceof RepeatBranch) {
             final RepeatBranch<C, S> repeatFunction = (RepeatBranch<C, S>) 
function;
             final List<PCollection<Traverser<C, S>>> repeatOutputs = new 
ArrayList<>();
diff --git 
a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
 
b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
index f3471c5..63b24cb 100644
--- 
a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
+++ 
b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/BeamTest.java
@@ -70,6 +70,23 @@ public class BeamTest {
     }
 
     @Test
+    public void testOrder() {
+        final Machine machine = LocalMachine.open();
+        final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
+                .withCoefficient(LongCoefficient.class)
+                .withProcessor(BeamProcessor.class)
+                .withStrategy(IdentityStrategy.class);
+
+        Traversal<Long, ?, ?> traversal = g.inject(7L, 3L, 5L, 20L, 1L, 
2L).incr().order().incr().barrier().incr();
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        g.close();
+    }
+
+    @Test
     public void shouldWork() {
         final MachineServer server = new MachineServer(7777);
         final Machine machine = RemoteMachine.open(6666, "localhost", 7777);
diff --git 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BarrierStep.java
 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BarrierStep.java
index be65f49..cbc9c06 100644
--- 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BarrierStep.java
+++ 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BarrierStep.java
@@ -31,7 +31,7 @@ import java.util.Iterator;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-class BarrierStep<C, S, E, B> extends AbstractStep<C, S, E> {
+final class BarrierStep<C, S, E, B> extends AbstractStep<C, S, E> {
 
     private final Barrier<B> barrier;
     private final BarrierFunction<C, S, E, B> barrierFunction;
diff --git 
a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
 
b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
index 5f3c20a..b3b2f93 100644
--- 
a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
+++ 
b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
@@ -62,6 +62,22 @@ public class PipesTest {
     }
 
     @Test
+    public void testOrder() {
+        final Machine machine = LocalMachine.open();
+        final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
+                .withCoefficient(LongCoefficient.class)
+                .withProcessor(PipesProcessor.class)
+                .withStrategy(IdentityStrategy.class);
+
+        Traversal<Long, ?, ?> traversal = 
g.inject(7L,3L,5L,20L,1L,2L).incr().order();
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+    }
+
+    @Test
     public void shouldWork() {
         final MachineServer server = new MachineServer(7777);
         final Machine machine = RemoteMachine.open(6666, "localhost", 7777);

Reply via email to