This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new 22b7903 cleanup on Beam and order().by() instruction. 22b7903 is described below commit 22b7903eb4f79e6bd172f33ef0de117c8f958566 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Thu Mar 28 11:06:43 2019 -0600 cleanup on Beam and order().by() instruction. --- .../tinkerpop/machine/function/barrier/OrderBarrier.java | 5 +++++ .../org/apache/tinkerpop/machine/util/MultiComparator.java | 12 ++++++++++++ .../apache/tinkerpop/machine/processor/beam/BarrierFn.java | 8 ++++---- .../tinkerpop/machine/processor/beam/util/TopologyUtil.java | 8 ++++---- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java index e7d8b66..ed81393 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/barrier/OrderBarrier.java @@ -84,6 +84,11 @@ public final class OrderBarrier<C, S, P> extends AbstractFunction<C> implements return traverserSet; } + @Override + public int hashCode() { + return super.hashCode() ^ this.comparator.hashCode() ^ this.compilationCircle.hashCode(); + } + public static <C, S, P> OrderBarrier<C, S, P> compile(final Instruction<C> instruction) { final List<Compilation<C, S, P>> compilations = new ArrayList<>(); final List<Comparator<P>> comparators = new ArrayList<>(); diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/MultiComparator.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/MultiComparator.java index 02f7a20..4135fae 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/MultiComparator.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/MultiComparator.java @@ -61,6 +61,18 @@ public final class MultiComparator<C> implements Comparator<C>, Serializable { } } + @Override + public int hashCode() { + return this.comparators.hashCode(); + } + + @Override + public boolean equals(final Object object) { + return object instanceof MultiComparator && + this.comparators.equals(((MultiComparator) object).comparators) && + this.isShuffle == ((MultiComparator) object).isShuffle; + } + public boolean isShuffle() { return this.isShuffle; } diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java index d736c60..742db4f 100644 --- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/BarrierFn.java @@ -80,7 +80,7 @@ public class BarrierFn<C, S, E, B> extends Combine.CombineFn<Traverser<C, S>, In return (Coder<B>) new TraverserSetCoder<C, S>(); } - public static class BarrierIterateFn<C, S, E, B> extends DoFn<B, Traverser<C, S>> { + public static class BarrierIterateFn<C, S, E, B> extends DoFn<B, Traverser<C, E>> { private final BarrierFunction<C, S, E, B> barrierFunction; private final TraverserFactory traverserFactory; @@ -92,10 +92,10 @@ public class BarrierFn<C, S, E, B> extends Combine.CombineFn<Traverser<C, S>, In } @ProcessElement - public void processElement(final @Element B barrier, final OutputReceiver<Traverser<C, S>> output) { + public void processElement(final @Element B barrier, final OutputReceiver<Traverser<C, E>> output) { final B local = this.barrierFunction.merge(this.barrierFunction.getInitialValue(), barrier); - final Iterator<Traverser<C, S>> iterator = this.barrierFunction.returnsTraversers() ? - (Iterator<Traverser<C, S>>) this.barrierFunction.createIterator(local) : + final Iterator<Traverser<C, E>> iterator = this.barrierFunction.returnsTraversers() ? + (Iterator<Traverser<C, E>>) this.barrierFunction.createIterator(local) : IteratorUtils.map(this.barrierFunction.createIterator(local), e -> this.traverserFactory.create(this.barrierFunction, e)); while (iterator.hasNext()) diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java index aa67878..4f27ae7 100644 --- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/util/TopologyUtil.java @@ -72,7 +72,7 @@ public final class TopologyUtil { return sink; } - private static <C, S, E, M> PCollection<Traverser<C, E>> extend(final PCollection<Traverser<C, S>> source, final CFunction<C> function, final TraverserFactory<C> traverserFactory) { + private static <C, S, E, B> PCollection<Traverser<C, E>> extend(final PCollection<Traverser<C, S>> source, final CFunction<C> function, final TraverserFactory<C> traverserFactory) { PCollection sink; if (function instanceof MapFunction) { sink = source.apply(ParDo.of(new MapFn<>((MapFunction<C, S, E>) function))); @@ -85,9 +85,9 @@ public final class TopologyUtil { } else if (function instanceof ReduceFunction) { sink = source.apply(Combine.globally(new ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory))); } else if (function instanceof BarrierFunction) { - sink = source.apply(Combine.globally(new BarrierFn<>((BarrierFunction<C, S, E, M>) function))); - sink.setCoder(new TraverserSetCoder<>()); // TODO: generalize to any Barrier (this will be hard) - sink = (PCollection) sink.apply(ParDo.of(new BarrierFn.BarrierIterateFn<>((BarrierFunction<C, S, E, M>) function, traverserFactory))); + sink = source.apply(Combine.globally(new BarrierFn<>((BarrierFunction<C, S, E, B>) function))); + sink.setCoder(new TraverserSetCoder<>()); // TODO: generalize to any Barrier (just wrap in some container) + sink = (PCollection) sink.apply(ParDo.of(new BarrierFn.BarrierIterateFn<>((BarrierFunction<C, S, E, B>) function, traverserFactory))); } else if (function instanceof RepeatBranch) { final RepeatBranch<C, S> repeatFunction = (RepeatBranch<C, S>) function; final List<PCollection<Traverser<C, S>>> repeatOutputs = new ArrayList<>();