This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit fb4930bad2e8d816f217ef83d7fedb283d57f3b9 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Wed Mar 13 13:56:14 2019 -0600 playing around with Reducers and Barriers. Trying to merge them into one. Had it working for Pipes, but Beam requires that a Combiner return a single object -- not an iterator of objects. Still playing. --- .../tinkerpop/machine/pipes/BarrierStep.java | 14 +++++----- .../org/apache/tinkerpop/machine/pipes/Pipes.java | 4 +-- .../apache/tinkerpop/machine/pipes/ReduceStep.java | 12 +++------ .../pipes/util/{Reducer.java => Barrier.java} | 2 +- .../{BasicReducer.java => InMemoryBarrier.java} | 4 +-- .../{BasicReducer.java => InMemoryReducer.java} | 31 ++++++++++++---------- .../tinkerpop/machine/pipes/util/Reducer.java | 10 ++++--- .../apache/tinkerpop/machine/pipes/PipesTest.java | 2 +- 8 files changed, 41 insertions(+), 38 deletions(-) diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java index a61a0b9..09c60e0 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java @@ -19,8 +19,8 @@ package org.apache.tinkerpop.machine.pipes; import org.apache.tinkerpop.machine.functions.BarrierFunction; -import org.apache.tinkerpop.machine.pipes.util.BasicReducer; -import org.apache.tinkerpop.machine.pipes.util.Reducer; +import org.apache.tinkerpop.machine.pipes.util.InMemoryBarrier; +import org.apache.tinkerpop.machine.pipes.util.Barrier; import org.apache.tinkerpop.machine.traversers.Traverser; import java.util.Collections; @@ -31,14 +31,14 @@ import java.util.Iterator; */ public class BarrierStep<C, S, E, B> extends AbstractStep<C, S, E> { - private final Reducer<B> reducer; + private final Barrier<B> barrier; private final BarrierFunction<C, S, E, B> barrierFunction; private boolean done = false; private Iterator<E> output = Collections.emptyIterator(); public BarrierStep(final AbstractStep<C, ?, S> previousStep, final BarrierFunction<C, S, E, B> barrierFunction) { super(previousStep, barrierFunction); - this.reducer = new BasicReducer<>(barrierFunction.getInitialValue()); + this.barrier = new InMemoryBarrier<>(barrierFunction.getInitialValue()); // move to strategy determination this.barrierFunction = barrierFunction; } @@ -46,10 +46,10 @@ public class BarrierStep<C, S, E, B> extends AbstractStep<C, S, E> { public Traverser<C, E> next() { if (!this.done) { while (super.hasNext()) { - this.reducer.update(this.barrierFunction.apply(super.getPreviousTraverser(), this.reducer.get())); + this.barrier.update(this.barrierFunction.apply(super.getPreviousTraverser(), this.barrier.get())); } this.done = true; - this.output = (Iterator<E>) this.barrierFunction.createIterator(this.reducer.get()); + this.output = (Iterator<E>) this.barrierFunction.createIterator(this.barrier.get()); } return (Traverser<C, E>) this.output.next(); } @@ -62,7 +62,7 @@ public class BarrierStep<C, S, E, B> extends AbstractStep<C, S, E> { @Override public void reset() { super.reset(); - this.reducer.reset(); + this.barrier.reset(); this.done = false; } } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java index a54d9f6..8dc493c 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java @@ -28,7 +28,7 @@ import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.functions.MapFunction; import org.apache.tinkerpop.machine.functions.ReduceFunction; import org.apache.tinkerpop.machine.functions.branch.RepeatBranch; -import org.apache.tinkerpop.machine.pipes.util.BasicReducer; +import org.apache.tinkerpop.machine.pipes.util.InMemoryReducer; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.Traverser; @@ -64,7 +64,7 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> { nextStep = new BarrierStep(previousStep, (BarrierFunction) function); else if (function instanceof ReduceFunction) nextStep = new ReduceStep(previousStep, (ReduceFunction<C, ?, ?>) function, - new BasicReducer<>(((ReduceFunction<C, ?, ?>) function).getInitialValue()), compilation.getTraverserFactory()); + new InMemoryReducer((ReduceFunction<C, ?, ?>) function), compilation.getTraverserFactory()); else throw new RuntimeException("You need a new step type:" + function); diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java index 0a9f0f0..6b03434 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java @@ -29,13 +29,13 @@ import org.apache.tinkerpop.machine.traversers.TraverserFactory; public final class ReduceStep<C, S, E> extends AbstractStep<C, S, E> { private final ReduceFunction<C, S, E> reduceFunction; - private final Reducer<E> reducer; + private final Reducer<C, S, E> reducer; private final TraverserFactory<C> traverserFactory; private boolean done = false; public ReduceStep(final AbstractStep<C, ?, S> previousStep, final ReduceFunction<C, S, E> reduceFunction, - final Reducer<E> reducer, + final Reducer<C, S, E> reducer, final TraverserFactory<C> traverserFactory) { super(previousStep, reduceFunction); this.reduceFunction = reduceFunction; @@ -45,15 +45,11 @@ public final class ReduceStep<C, S, E> extends AbstractStep<C, S, E> { @Override public Traverser<C, E> next() { - Traverser<C, S> traverser = null; while (this.hasNext()) { - traverser = super.getPreviousTraverser(); - this.reducer.update(this.reduceFunction.apply(traverser, this.reducer.get())); + this.reducer.add(super.getPreviousTraverser()); } this.done = true; - return null == traverser ? - this.traverserFactory.create(this.function.coefficient(), this.reduceFunction.getInitialValue()) : - traverser.reduce(this.reduceFunction, this.reducer.get()); + return this.traverserFactory.create(this.reduceFunction.coefficient(), this.reducer.get()); } @Override diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Barrier.java similarity index 95% copy from java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java copy to java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Barrier.java index 2bdaf44..9f2253d 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Barrier.java @@ -23,7 +23,7 @@ import java.io.Serializable; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface Reducer<S> extends Serializable { +public interface Barrier<S> extends Serializable { public S get(); diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryBarrier.java similarity index 91% copy from java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java copy to java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryBarrier.java index 489db46..cccb316 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryBarrier.java @@ -21,12 +21,12 @@ package org.apache.tinkerpop.machine.pipes.util; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class BasicReducer<S> implements Reducer<S> { +public final class InMemoryBarrier<S> implements Barrier<S> { private S value; private final S initialValue; - public BasicReducer(final S initialValue) { + public InMemoryBarrier(final S initialValue) { this.initialValue = initialValue; this.value = initialValue; } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryReducer.java similarity index 59% rename from java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java rename to java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryReducer.java index 489db46..d849df2 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryReducer.java @@ -18,31 +18,34 @@ */ package org.apache.tinkerpop.machine.pipes.util; +import org.apache.tinkerpop.machine.functions.ReduceFunction; +import org.apache.tinkerpop.machine.traversers.Traverser; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class BasicReducer<S> implements Reducer<S> { +public class InMemoryReducer<C, S, E> implements Reducer<C, S, E> { - private S value; - private final S initialValue; + private final ReduceFunction<C, S, E> reduceFunction; + private E value; - public BasicReducer(final S initialValue) { - this.initialValue = initialValue; - this.value = initialValue; + public InMemoryReducer(final ReduceFunction<C, S, E> reduceFunction) { + this.reduceFunction = reduceFunction; + this.value = this.reduceFunction.getInitialValue(); } @Override - public void reset() { - this.value = this.initialValue; - } - - public S get() { + public E get() { return this.value; } - public void update(final S newValue) { - this.value = newValue; + @Override + public void add(final Traverser<C, S> traverser) { + this.value = this.reduceFunction.apply(traverser, this.value); } - + @Override + public void reset() { + this.value = this.reduceFunction.getInitialValue(); + } } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java index 2bdaf44..f1eb42b 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java @@ -18,16 +18,20 @@ */ package org.apache.tinkerpop.machine.pipes.util; +import org.apache.tinkerpop.machine.traversers.Traverser; + import java.io.Serializable; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface Reducer<S> extends Serializable { +public interface Reducer<C, S, E> extends Serializable { - public S get(); + public E get(); - public void update(final S newValue); + public void add(final Traverser<C, S> traverser); public void reset(); + + } diff --git a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java index 8a94843..b07f183 100644 --- a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java +++ b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java @@ -44,7 +44,7 @@ public class PipesTest { .withProcessor(PipesProcessor.class) .withStrategy(IdentityStrategy.class); - Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L, 1L)).<Long>unfold().repeat(incr()).until(is(10L)); + Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L, 1L)).<Long>unfold().repeat(incr()).until(is(10L)).sum(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList());