This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit fb4930bad2e8d816f217ef83d7fedb283d57f3b9
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Wed Mar 13 13:56:14 2019 -0600

    playing around with Reducers and Barriers. Trying to merge them into one. 
Had it working for Pipes, but Beam requires that a Combiner return a single 
object -- not an iterator of objects. Still playing.
---
 .../tinkerpop/machine/pipes/BarrierStep.java       | 14 +++++-----
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  |  4 +--
 .../apache/tinkerpop/machine/pipes/ReduceStep.java | 12 +++------
 .../pipes/util/{Reducer.java => Barrier.java}      |  2 +-
 .../{BasicReducer.java => InMemoryBarrier.java}    |  4 +--
 .../{BasicReducer.java => InMemoryReducer.java}    | 31 ++++++++++++----------
 .../tinkerpop/machine/pipes/util/Reducer.java      | 10 ++++---
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |  2 +-
 8 files changed, 41 insertions(+), 38 deletions(-)

diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
index a61a0b9..09c60e0 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
@@ -19,8 +19,8 @@
 package org.apache.tinkerpop.machine.pipes;
 
 import org.apache.tinkerpop.machine.functions.BarrierFunction;
-import org.apache.tinkerpop.machine.pipes.util.BasicReducer;
-import org.apache.tinkerpop.machine.pipes.util.Reducer;
+import org.apache.tinkerpop.machine.pipes.util.InMemoryBarrier;
+import org.apache.tinkerpop.machine.pipes.util.Barrier;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 import java.util.Collections;
@@ -31,14 +31,14 @@ import java.util.Iterator;
  */
 public class BarrierStep<C, S, E, B> extends AbstractStep<C, S, E> {
 
-    private final Reducer<B> reducer;
+    private final Barrier<B> barrier;
     private final BarrierFunction<C, S, E, B> barrierFunction;
     private boolean done = false;
     private Iterator<E> output = Collections.emptyIterator();
 
     public BarrierStep(final AbstractStep<C, ?, S> previousStep, final 
BarrierFunction<C, S, E, B> barrierFunction) {
         super(previousStep, barrierFunction);
-        this.reducer = new BasicReducer<>(barrierFunction.getInitialValue());
+        this.barrier = new 
InMemoryBarrier<>(barrierFunction.getInitialValue()); // move to strategy 
determination
         this.barrierFunction = barrierFunction;
     }
 
@@ -46,10 +46,10 @@ public class BarrierStep<C, S, E, B> extends 
AbstractStep<C, S, E> {
     public Traverser<C, E> next() {
         if (!this.done) {
             while (super.hasNext()) {
-                
this.reducer.update(this.barrierFunction.apply(super.getPreviousTraverser(), 
this.reducer.get()));
+                
this.barrier.update(this.barrierFunction.apply(super.getPreviousTraverser(), 
this.barrier.get()));
             }
             this.done = true;
-            this.output = (Iterator<E>) 
this.barrierFunction.createIterator(this.reducer.get());
+            this.output = (Iterator<E>) 
this.barrierFunction.createIterator(this.barrier.get());
         }
         return (Traverser<C, E>) this.output.next();
     }
@@ -62,7 +62,7 @@ public class BarrierStep<C, S, E, B> extends AbstractStep<C, 
S, E> {
     @Override
     public void reset() {
         super.reset();
-        this.reducer.reset();
+        this.barrier.reset();
         this.done = false;
     }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index a54d9f6..8dc493c 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -28,7 +28,7 @@ import org.apache.tinkerpop.machine.functions.InitialFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
 import org.apache.tinkerpop.machine.functions.ReduceFunction;
 import org.apache.tinkerpop.machine.functions.branch.RepeatBranch;
-import org.apache.tinkerpop.machine.pipes.util.BasicReducer;
+import org.apache.tinkerpop.machine.pipes.util.InMemoryReducer;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
@@ -64,7 +64,7 @@ public final class Pipes<C, S, E> implements Processor<C, S, 
E> {
                 nextStep = new BarrierStep(previousStep, (BarrierFunction) 
function);
             else if (function instanceof ReduceFunction)
                 nextStep = new ReduceStep(previousStep, (ReduceFunction<C, ?, 
?>) function,
-                        new BasicReducer<>(((ReduceFunction<C, ?, ?>) 
function).getInitialValue()), compilation.getTraverserFactory());
+                        new InMemoryReducer((ReduceFunction<C, ?, ?>) 
function), compilation.getTraverserFactory());
             else
                 throw new RuntimeException("You need a new step type:" + 
function);
 
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
index 0a9f0f0..6b03434 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
@@ -29,13 +29,13 @@ import 
org.apache.tinkerpop.machine.traversers.TraverserFactory;
 public final class ReduceStep<C, S, E> extends AbstractStep<C, S, E> {
 
     private final ReduceFunction<C, S, E> reduceFunction;
-    private final Reducer<E> reducer;
+    private final Reducer<C, S, E> reducer;
     private final TraverserFactory<C> traverserFactory;
     private boolean done = false;
 
     public ReduceStep(final AbstractStep<C, ?, S> previousStep,
                       final ReduceFunction<C, S, E> reduceFunction,
-                      final Reducer<E> reducer,
+                      final Reducer<C, S, E> reducer,
                       final TraverserFactory<C> traverserFactory) {
         super(previousStep, reduceFunction);
         this.reduceFunction = reduceFunction;
@@ -45,15 +45,11 @@ public final class ReduceStep<C, S, E> extends 
AbstractStep<C, S, E> {
 
     @Override
     public Traverser<C, E> next() {
-        Traverser<C, S> traverser = null;
         while (this.hasNext()) {
-            traverser = super.getPreviousTraverser();
-            this.reducer.update(this.reduceFunction.apply(traverser, 
this.reducer.get()));
+            this.reducer.add(super.getPreviousTraverser());
         }
         this.done = true;
-        return null == traverser ?
-                this.traverserFactory.create(this.function.coefficient(), 
this.reduceFunction.getInitialValue()) :
-                traverser.reduce(this.reduceFunction, this.reducer.get());
+        return this.traverserFactory.create(this.reduceFunction.coefficient(), 
this.reducer.get());
     }
 
     @Override
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Barrier.java
similarity index 95%
copy from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
copy to 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Barrier.java
index 2bdaf44..9f2253d 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Barrier.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Reducer<S> extends Serializable {
+public interface Barrier<S> extends Serializable {
 
     public S get();
 
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryBarrier.java
similarity index 91%
copy from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
copy to 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryBarrier.java
index 489db46..cccb316 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryBarrier.java
@@ -21,12 +21,12 @@ package org.apache.tinkerpop.machine.pipes.util;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class BasicReducer<S> implements Reducer<S> {
+public final class InMemoryBarrier<S> implements Barrier<S> {
 
     private S value;
     private final S initialValue;
 
-    public BasicReducer(final S initialValue) {
+    public InMemoryBarrier(final S initialValue) {
         this.initialValue = initialValue;
         this.value = initialValue;
     }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryReducer.java
similarity index 59%
rename from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
rename to 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryReducer.java
index 489db46..d849df2 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/BasicReducer.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/InMemoryReducer.java
@@ -18,31 +18,34 @@
  */
 package org.apache.tinkerpop.machine.pipes.util;
 
+import org.apache.tinkerpop.machine.functions.ReduceFunction;
+import org.apache.tinkerpop.machine.traversers.Traverser;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class BasicReducer<S> implements Reducer<S> {
+public class InMemoryReducer<C, S, E> implements Reducer<C, S, E> {
 
-    private S value;
-    private final S initialValue;
+    private final ReduceFunction<C, S, E> reduceFunction;
+    private E value;
 
-    public BasicReducer(final S initialValue) {
-        this.initialValue = initialValue;
-        this.value = initialValue;
+    public InMemoryReducer(final ReduceFunction<C, S, E> reduceFunction) {
+        this.reduceFunction = reduceFunction;
+        this.value = this.reduceFunction.getInitialValue();
     }
 
     @Override
-    public void reset() {
-        this.value = this.initialValue;
-    }
-
-    public S get() {
+    public E get() {
         return this.value;
     }
 
-    public void update(final S newValue) {
-        this.value = newValue;
+    @Override
+    public void add(final Traverser<C, S> traverser) {
+        this.value = this.reduceFunction.apply(traverser, this.value);
     }
 
-
+    @Override
+    public void reset() {
+        this.value = this.reduceFunction.getInitialValue();
+    }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
index 2bdaf44..f1eb42b 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/util/Reducer.java
@@ -18,16 +18,20 @@
  */
 package org.apache.tinkerpop.machine.pipes.util;
 
+import org.apache.tinkerpop.machine.traversers.Traverser;
+
 import java.io.Serializable;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Reducer<S> extends Serializable {
+public interface Reducer<C, S, E> extends Serializable {
 
-    public S get();
+    public E get();
 
-    public void update(final S newValue);
+    public void add(final Traverser<C, S> traverser);
 
     public void reset();
+
+
 }
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index 8a94843..b07f183 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -44,7 +44,7 @@ public class PipesTest {
                 .withProcessor(PipesProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L, 
1L)).<Long>unfold().repeat(incr()).until(is(10L));
+        Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L, 
1L)).<Long>unfold().repeat(incr()).until(is(10L)).sum();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());

Reply via email to