This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 0fd1e42e32242401b34c95f699b8d684b4b623fd
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Mar 11 06:39:35 2019 -0600

    cleaned up and organized Apache Beam machine. Need to add reducer support 
and then its at the same state as the Pipes machines.
---
 .../tinkerpop/machine/traversers/TraverserSet.java |  3 +-
 java/machine/beam/pom.xml                          |  5 ++
 .../machine/beam/{MapFn.java => AbstractFn.java}   | 25 ++++++----
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 57 +++++++++++++---------
 .../apache/tinkerpop/machine/beam/FilterFn.java    |  4 +-
 .../machine/beam/{FilterFn.java => Fn.java}        | 17 ++-----
 .../machine/beam/{FilterFn.java => InitialFn.java} | 22 ++++++---
 .../org/apache/tinkerpop/machine/beam/MapFn.java   | 27 ++++++++--
 .../machine/beam/{MapFn.java => ReduceFn.java}     | 27 +++++-----
 .../tinkerpop/machine/beam/TraverserCoder.java     |  6 +--
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  5 +-
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  |  2 +-
 12 files changed, 119 insertions(+), 81 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
index ce52b63..66169ab 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.traversers;
 
 import org.apache.tinkerpop.util.FastNoSuchElementException;
 
+import java.io.Serializable;
 import java.util.AbstractSet;
 import java.util.Collections;
 import java.util.Iterator;
@@ -33,7 +34,7 @@ import java.util.Spliterator;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> 
implements Set<Traverser<C, S>>, Queue<Traverser<C, S>> {
+public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> 
implements Set<Traverser<C, S>>, Queue<Traverser<C, S>>, Serializable {
 
     private final Map<Traverser<C, S>, Traverser<C, S>> map = 
Collections.synchronizedMap(new LinkedHashMap<>());
 
diff --git a/java/machine/beam/pom.xml b/java/machine/beam/pom.xml
index 2fb4c82..24a9773 100644
--- a/java/machine/beam/pom.xml
+++ b/java/machine/beam/pom.xml
@@ -79,6 +79,11 @@ limitations under the License.
             <version>2.11.0</version>
             <scope>runtime</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>pipes</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <directory>${basedir}/target</directory>
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java
similarity index 61%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java
index 2b83ab2..63caa5e 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java
@@ -19,23 +19,30 @@
 package org.apache.tinkerpop.machine.beam;
 
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.tinkerpop.machine.functions.MapFunction;
+import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.machine.traversers.TraverserSet;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> {
+public abstract class AbstractFn<C, S, E> extends DoFn<Traverser<C, S>, 
Traverser<C, E>> implements Fn<C, S, E> {
 
-    private final MapFunction<C, S, E> mapFunction;
+    protected final TraverserSet<C, S> traversers = new TraverserSet<>();
+    protected final CFunction<C> function;
 
-    public MapFn(final MapFunction<C, S, E> mapFunction) {
-        this.mapFunction = mapFunction;
+    protected AbstractFn(final CFunction<C> function) {
+        this.function = function;
     }
 
-    @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        output.output(traverser.map(this.mapFunction));
+    @Override
+    public void addStart(final Traverser<C, S> traverser) {
+        this.traversers.add(traverser);
     }
+
+    @Override
+    public String toString() {
+        return this.function.toString();
+    }
+
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index a03ee71..b93174e 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.beam;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
@@ -27,8 +28,9 @@ import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
 import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
+import org.apache.tinkerpop.machine.functions.InitialFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
-import org.apache.tinkerpop.machine.functions.initial.InjectInitial;
+import org.apache.tinkerpop.machine.functions.ReduceFunction;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.machine.traversers.Traverser;
@@ -46,51 +48,51 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
     PCollection collection;
     public static List<Traverser> OUTPUT = new ArrayList<>();
     Iterator<Traverser> iterator = null;
+    private final List<DoFn> functions = new ArrayList<>();
 
-    public Beam(final Bytecode<C> bytecode) {
+    public Beam(final List<CFunction<C>> functions) {
         this.pipeline = Pipeline.create();
         
this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new 
TraverserCoder<>());
+        this.collection = this.pipeline.apply(Create.of(new 
CompleteTraverser(LongCoefficient.create(), 1L)));
+        this.collection.setCoder(new TraverserCoder());
 
-        for (final CFunction<?> function : BytecodeUtil.compile(bytecode)) {
-            if (function instanceof InjectInitial) {
-                final List<Traverser<C, S>> objects = new ArrayList<>();
-                final Iterator<S> iterator = ((InjectInitial) function).get();
-                while (iterator.hasNext())
-                    objects.add(new 
CompleteTraverser(LongCoefficient.create(), iterator.next()));
-                this.collection = 
this.pipeline.apply(Create.of(objects).withCoder(new TraverserCoder<>()));
+        DoFn fn = null;
+        for (final CFunction<?> function : functions) {
+            if (function instanceof InitialFunction) {
+                fn = new InitialFn<>((InitialFunction) function);
             } else if (function instanceof FilterFunction) {
-                collection = (PCollection) collection.apply(ParDo.of(new 
FilterFn<>((FilterFunction<C, S>) function)));
-                collection.setCoder(new TraverserCoder());
+                fn = new FilterFn<>((FilterFunction) function);
             } else if (function instanceof MapFunction) {
-                collection = (PCollection) collection.apply(ParDo.of(new 
MapFn<>((MapFunction<C, S, E>) function)));
-                collection.setCoder(new TraverserCoder());
+                fn = new MapFn<>((MapFunction) function);
+            } else if (function instanceof ReduceFunction) {
+                //fn = new ReduceFn<>((ReduceFunction)function)
             } else
                 throw new RuntimeException("You need a new step type:" + 
function);
+            this.functions.add(fn);
+            this.collection = (PCollection) collection.apply(ParDo.of(fn));
+            this.collection.setCoder(new TraverserCoder());
         }
         collection = (PCollection) collection.apply(ParDo.of(new 
OutputStep()));
+    }
 
+    public Beam(final Bytecode<C> bytecode) {
+        this(BytecodeUtil.compile(bytecode));
     }
 
     @Override
     public void addStart(Traverser<C, S> traverser) {
-
+        ((Fn) this.functions.get(0)).addStart(traverser);
     }
 
     @Override
     public Traverser<C, E> next() {
-        if (null == this.iterator) {
-            pipeline.run().waitUntilFinish();
-            this.iterator = OUTPUT.iterator();
-        }
+        this.setupPipeline();
         return this.iterator.next();
     }
 
     @Override
     public boolean hasNext() {
-        if (null == this.iterator) {
-            pipeline.run().waitUntilFinish();
-            this.iterator = OUTPUT.iterator();
-        }
+        this.setupPipeline();
         return this.iterator.hasNext();
     }
 
@@ -101,6 +103,15 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
 
     @Override
     public String toString() {
-        return this.pipeline.toString();
+        return this.functions.toString();
+    }
+
+    private final void setupPipeline() {
+        if (null == this.iterator) {
+            pipeline.run().waitUntilFinish();
+            this.iterator = new ArrayList<>(OUTPUT).iterator();
+            OUTPUT.clear();
+        }
     }
+
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
index a3d1f9d..e0df567 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
@@ -18,18 +18,18 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> {
+public class FilterFn<C, S> extends AbstractFn<C, S, S> {
 
     private FilterFunction<C, S> filterFunction;
 
     public FilterFn(final FilterFunction<C, S> filterFunction) {
+        super(filterFunction);
         this.filterFunction = filterFunction;
     }
 
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
 b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
similarity index 62%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
index a3d1f9d..1967b96 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
+++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
@@ -18,24 +18,13 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> {
+public interface Fn<C, S,E> {
 
-    private FilterFunction<C, S> filterFunction;
+    public void addStart(final Traverser<C, S> traverser);
 
-    public FilterFn(final FilterFunction<C, S> filterFunction) {
-        this.filterFunction = filterFunction;
-    }
-
-    @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, S>> output) {
-        if (traverser.filter(this.filterFunction))
-            output.output(traverser);
-    }
-}
\ No newline at end of file
+}
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java
similarity index 62%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java
index a3d1f9d..33556e6 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java
@@ -18,24 +18,30 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.tinkerpop.machine.functions.FilterFunction;
+import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
+import org.apache.tinkerpop.machine.functions.InitialFunction;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.util.Iterator;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> {
+public class InitialFn<C, S> extends AbstractFn<C, S, S> {
 
-    private FilterFunction<C, S> filterFunction;
+    private final InitialFunction<C, S> initialFunction;
 
-    public FilterFn(final FilterFunction<C, S> filterFunction) {
-        this.filterFunction = filterFunction;
+    public InitialFn(final InitialFunction<C, S> initialFunction) {
+        super(initialFunction);
+        this.initialFunction = initialFunction;
     }
 
     @ProcessElement
     public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, S>> output) {
-        if (traverser.filter(this.filterFunction))
-            output.output(traverser);
+        final Iterator<S> iterator = this.initialFunction.get();
+        while (iterator.hasNext()) {
+            output.output(new CompleteTraverser(LongCoefficient.create(), 
iterator.next()));
+        }
     }
 }
\ No newline at end of file
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
index 2b83ab2..18d916f 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
@@ -18,24 +18,43 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.tinkerpop.machine.functions.MapFunction;
+import org.apache.tinkerpop.machine.functions.NestedFunction;
+import org.apache.tinkerpop.machine.pipes.Pipes;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.util.NoSuchElementException;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> {
+public class MapFn<C, S, E> extends AbstractFn<C, S, E> {
 
     private final MapFunction<C, S, E> mapFunction;
+    private boolean first = true;
 
     public MapFn(final MapFunction<C, S, E> mapFunction) {
+       super(mapFunction);
         this.mapFunction = mapFunction;
     }
 
     @ProcessElement
     public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        output.output(traverser.map(this.mapFunction));
+        if (this.first) {
+            if (this.mapFunction instanceof NestedFunction) {
+                Pipes beam  = new Pipes(((NestedFunction) 
this.mapFunction).getFunctions(), new CompleteTraverserFactory());
+                ((NestedFunction) this.mapFunction).setProcessor(beam);
+                while (!this.traversers.isEmpty()) {
+                    beam.addStart(this.traversers.remove());
+                }
+            }
+            this.first = false;
+        }
+        try {
+            output.output(traverser.map(this.mapFunction));
+        } catch(final NoSuchElementException e) {
+            // do nothing
+        }
     }
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
similarity index 55%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
index 2b83ab2..37a46ee 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
@@ -18,24 +18,25 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.tinkerpop.machine.functions.MapFunction;
-import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.machine.functions.ReduceFunction;
+import org.apache.tinkerpop.machine.functions.reduce.Reducer;
+import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> {
+public class ReduceFn<C, S, E> extends AbstractFn<C, S, E> {
 
-    private final MapFunction<C, S, E> mapFunction;
+    private final ReduceFunction<C, S, E> reduceFunction;
+    private final Reducer<E> reducer;
+    private final TraverserFactory<C, E> traverserFactory;
 
-    public MapFn(final MapFunction<C, S, E> mapFunction) {
-        this.mapFunction = mapFunction;
-    }
-
-    @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        output.output(traverser.map(this.mapFunction));
+    public ReduceFn(final ReduceFunction<C, S, E> reduceFunction,
+                    final Reducer<E> reducer,
+                    final TraverserFactory<C, E> traverserFactory) {
+        super(reduceFunction);
+        this.reduceFunction = reduceFunction;
+        this.reducer = reducer;
+        this.traverserFactory = traverserFactory;
     }
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
index f35cdd1..c286034 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
@@ -20,8 +20,6 @@ package org.apache.tinkerpop.machine.beam;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
-import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 import java.io.IOException;
@@ -40,14 +38,14 @@ public class TraverserCoder<C, S> extends 
Coder<Traverser<C, S>> {
     @Override
     public void encode(final Traverser<C, S> value, final OutputStream 
outStream) throws CoderException, IOException {
         ObjectOutputStream outputStream = new ObjectOutputStream(outStream);
-        outputStream.writeObject(value.object());
+        outputStream.writeObject(value);
     }
 
     @Override
     public Traverser<C, S> decode(InputStream inStream) throws CoderException, 
IOException {
         try {
             ObjectInputStream inputStream = new ObjectInputStream(inStream);
-            return new CompleteTraverser(LongCoefficient.create(), 
inputStream.readObject());
+            return (Traverser<C, S>) inputStream.readObject();
         } catch (final ClassNotFoundException e) {
             throw new IOException(e.getMessage(), e);
         }
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 1a84253..0c30723 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.language.Gremlin;
 import org.apache.tinkerpop.language.Traversal;
 import org.apache.tinkerpop.language.TraversalSource;
 import org.apache.tinkerpop.language.TraversalUtil;
+import org.apache.tinkerpop.language.__;
 import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 import org.junit.jupiter.api.Test;
 
@@ -34,12 +35,12 @@ public class BeamTest {
         final TraversalSource<Long> g = Gremlin.<Long>traversal()
                 .coefficient(LongCoefficient.create())
                 .processor(BeamProcessor.class);
-        Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).identity().incr();
+        Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).identity().incr().is(44L);//.count();
+        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().is(9L);//.count();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index b38ecd4..29f64bf 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -43,7 +43,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> {
     private Step<C, ?, E> endStep;
     private Step<C, S, ?> startStep = EmptyStep.instance();
 
-    private Pipes(final List<CFunction<C>> functions, final 
TraverserFactory<C, S> traverserFactory) {
+    public Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, 
S> traverserFactory) {
         AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
         for (final CFunction<?> function : functions) {
             if (function instanceof NestedFunction)

Reply via email to