This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 22b7903  cleanup on Beam and order().by() instruction.
22b7903 is described below

commit 22b7903eb4f79e6bd172f33ef0de117c8f958566
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Thu Mar 28 11:06:43 2019 -0600

    cleanup on Beam and order().by() instruction.
---
 .../tinkerpop/machine/function/barrier/OrderBarrier.java     |  5 +++++
 .../org/apache/tinkerpop/machine/util/MultiComparator.java   | 12 ++++++++++++
 .../apache/tinkerpop/machine/processor/beam/BarrierFn.java   |  8 ++++----
 .../tinkerpop/machine/processor/beam/util/TopologyUtil.java  |  8 ++++----
 4 files changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java
index e7d8b66..ed81393 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java
@@ -84,6 +84,11 @@ public final class OrderBarrier<C, S, P> extends 
AbstractFunction<C> implements
         return traverserSet;
     }
 
+    @Override
+    public int hashCode() {
+        return super.hashCode() ^ this.comparator.hashCode() ^ 
this.compilationCircle.hashCode();
+    }
+
     public static <C, S, P> OrderBarrier<C, S, P> compile(final Instruction<C> 
instruction) {
         final List<Compilation<C, S, P>> compilations = new ArrayList<>();
         final List<Comparator<P>> comparators = new ArrayList<>();
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/MultiComparator.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/MultiComparator.java
index 02f7a20..4135fae 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/MultiComparator.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/MultiComparator.java
@@ -61,6 +61,18 @@ public final class MultiComparator<C> implements 
Comparator<C>, Serializable {
         }
     }
 
+    @Override
+    public int hashCode() {
+        return this.comparators.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object object) {
+        return object instanceof MultiComparator &&
+                this.comparators.equals(((MultiComparator) 
object).comparators) &&
+                this.isShuffle == ((MultiComparator) object).isShuffle;
+    }
+
     public boolean isShuffle() {
         return this.isShuffle;
     }
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java
index d736c60..742db4f 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java
@@ -80,7 +80,7 @@ public class BarrierFn<C, S, E, B> extends 
Combine.CombineFn<Traverser<C, S>, In
         return (Coder<B>) new TraverserSetCoder<C, S>();
     }
 
-    public static class BarrierIterateFn<C, S, E, B> extends DoFn<B, 
Traverser<C, S>> {
+    public static class BarrierIterateFn<C, S, E, B> extends DoFn<B, 
Traverser<C, E>> {
 
         private final BarrierFunction<C, S, E, B> barrierFunction;
         private final TraverserFactory traverserFactory;
@@ -92,10 +92,10 @@ public class BarrierFn<C, S, E, B> extends 
Combine.CombineFn<Traverser<C, S>, In
         }
 
         @ProcessElement
-        public void processElement(final @Element B barrier, final 
OutputReceiver<Traverser<C, S>> output) {
+        public void processElement(final @Element B barrier, final 
OutputReceiver<Traverser<C, E>> output) {
             final B local = 
this.barrierFunction.merge(this.barrierFunction.getInitialValue(), barrier);
-            final Iterator<Traverser<C, S>> iterator = 
this.barrierFunction.returnsTraversers() ?
-                    (Iterator<Traverser<C, S>>) 
this.barrierFunction.createIterator(local) :
+            final Iterator<Traverser<C, E>> iterator = 
this.barrierFunction.returnsTraversers() ?
+                    (Iterator<Traverser<C, E>>) 
this.barrierFunction.createIterator(local) :
                     
IteratorUtils.map(this.barrierFunction.createIterator(local),
                             e -> 
this.traverserFactory.create(this.barrierFunction, e));
             while (iterator.hasNext())
diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
index aa67878..4f27ae7 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java
@@ -72,7 +72,7 @@ public final class TopologyUtil {
         return sink;
     }
 
-    private static <C, S, E, M> PCollection<Traverser<C, E>> extend(final 
PCollection<Traverser<C, S>> source, final CFunction<C> function, final 
TraverserFactory<C> traverserFactory) {
+    private static <C, S, E, B> PCollection<Traverser<C, E>> extend(final 
PCollection<Traverser<C, S>> source, final CFunction<C> function, final 
TraverserFactory<C> traverserFactory) {
         PCollection sink;
         if (function instanceof MapFunction) {
             sink = source.apply(ParDo.of(new MapFn<>((MapFunction<C, S, E>) 
function)));
@@ -85,9 +85,9 @@ public final class TopologyUtil {
         } else if (function instanceof ReduceFunction) {
             sink = source.apply(Combine.globally(new 
ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory)));
         } else if (function instanceof BarrierFunction) {
-            sink = source.apply(Combine.globally(new 
BarrierFn<>((BarrierFunction<C, S, E, M>) function)));
-            sink.setCoder(new TraverserSetCoder<>()); // TODO: generalize to 
any Barrier (this will be hard)
-            sink = (PCollection) sink.apply(ParDo.of(new 
BarrierFn.BarrierIterateFn<>((BarrierFunction<C, S, E, M>) function, 
traverserFactory)));
+            sink = source.apply(Combine.globally(new 
BarrierFn<>((BarrierFunction<C, S, E, B>) function)));
+            sink.setCoder(new TraverserSetCoder<>()); // TODO: generalize to 
any Barrier (just wrap in some container)
+            sink = (PCollection) sink.apply(ParDo.of(new 
BarrierFn.BarrierIterateFn<>((BarrierFunction<C, S, E, B>) function, 
traverserFactory)));
         } else if (function instanceof RepeatBranch) {
             final RepeatBranch<C, S> repeatFunction = (RepeatBranch<C, S>) 
function;
             final List<PCollection<Traverser<C, S>>> repeatOutputs = new 
ArrayList<>();

Reply via email to