This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 0fd1e42  cleaned up and organized Apache Beam machine. Need to add 
reducer support and then its at the same state as the Pipes machines.
0fd1e42 is described below

commit 0fd1e42e32242401b34c95f699b8d684b4b623fd
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Mar 11 06:39:35 2019 -0600

    cleaned up and organized Apache Beam machine. Need to add reducer support 
and then its at the same state as the Pipes machines.
---
 .../tinkerpop/machine/traversers/TraverserSet.java |  3 +-
 java/machine/beam/pom.xml                          |  5 ++
 .../machine/beam/{MapFn.java => AbstractFn.java}   | 25 ++++++----
 .../org/apache/tinkerpop/machine/beam/Beam.java    | 57 +++++++++++++---------
 .../apache/tinkerpop/machine/beam/FilterFn.java    |  4 +-
 .../machine/beam/{FilterFn.java => Fn.java}        | 17 ++-----
 .../machine/beam/{FilterFn.java => InitialFn.java} | 22 ++++++---
 .../org/apache/tinkerpop/machine/beam/MapFn.java   | 27 ++++++++--
 .../machine/beam/{MapFn.java => ReduceFn.java}     | 27 +++++-----
 .../tinkerpop/machine/beam/TraverserCoder.java     |  6 +--
 .../apache/tinkerpop/machine/beam/BeamTest.java    |  5 +-
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  |  2 +-
 12 files changed, 119 insertions(+), 81 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
index ce52b63..66169ab 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.traversers;
 
 import org.apache.tinkerpop.util.FastNoSuchElementException;
 
+import java.io.Serializable;
 import java.util.AbstractSet;
 import java.util.Collections;
 import java.util.Iterator;
@@ -33,7 +34,7 @@ import java.util.Spliterator;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> 
implements Set<Traverser<C, S>>, Queue<Traverser<C, S>> {
+public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> 
implements Set<Traverser<C, S>>, Queue<Traverser<C, S>>, Serializable {
 
     private final Map<Traverser<C, S>, Traverser<C, S>> map = 
Collections.synchronizedMap(new LinkedHashMap<>());
 
diff --git a/java/machine/beam/pom.xml b/java/machine/beam/pom.xml
index 2fb4c82..24a9773 100644
--- a/java/machine/beam/pom.xml
+++ b/java/machine/beam/pom.xml
@@ -79,6 +79,11 @@ limitations under the License.
             <version>2.11.0</version>
             <scope>runtime</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>pipes</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <directory>${basedir}/target</directory>
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java
similarity index 61%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java
index 2b83ab2..63caa5e 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java
@@ -19,23 +19,30 @@
 package org.apache.tinkerpop.machine.beam;
 
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.tinkerpop.machine.functions.MapFunction;
+import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.machine.traversers.TraverserSet;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> {
+public abstract class AbstractFn<C, S, E> extends DoFn<Traverser<C, S>, 
Traverser<C, E>> implements Fn<C, S, E> {
 
-    private final MapFunction<C, S, E> mapFunction;
+    protected final TraverserSet<C, S> traversers = new TraverserSet<>();
+    protected final CFunction<C> function;
 
-    public MapFn(final MapFunction<C, S, E> mapFunction) {
-        this.mapFunction = mapFunction;
+    protected AbstractFn(final CFunction<C> function) {
+        this.function = function;
     }
 
-    @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        output.output(traverser.map(this.mapFunction));
+    @Override
+    public void addStart(final Traverser<C, S> traverser) {
+        this.traversers.add(traverser);
     }
+
+    @Override
+    public String toString() {
+        return this.function.toString();
+    }
+
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
index a03ee71..b93174e 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.beam;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
@@ -27,8 +28,9 @@ import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
 import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
+import org.apache.tinkerpop.machine.functions.InitialFunction;
 import org.apache.tinkerpop.machine.functions.MapFunction;
-import org.apache.tinkerpop.machine.functions.initial.InjectInitial;
+import org.apache.tinkerpop.machine.functions.ReduceFunction;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.machine.traversers.Traverser;
@@ -46,51 +48,51 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
     PCollection collection;
     public static List<Traverser> OUTPUT = new ArrayList<>();
     Iterator<Traverser> iterator = null;
+    private final List<DoFn> functions = new ArrayList<>();
 
-    public Beam(final Bytecode<C> bytecode) {
+    public Beam(final List<CFunction<C>> functions) {
         this.pipeline = Pipeline.create();
         
this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new 
TraverserCoder<>());
+        this.collection = this.pipeline.apply(Create.of(new 
CompleteTraverser(LongCoefficient.create(), 1L)));
+        this.collection.setCoder(new TraverserCoder());
 
-        for (final CFunction<?> function : BytecodeUtil.compile(bytecode)) {
-            if (function instanceof InjectInitial) {
-                final List<Traverser<C, S>> objects = new ArrayList<>();
-                final Iterator<S> iterator = ((InjectInitial) function).get();
-                while (iterator.hasNext())
-                    objects.add(new 
CompleteTraverser(LongCoefficient.create(), iterator.next()));
-                this.collection = 
this.pipeline.apply(Create.of(objects).withCoder(new TraverserCoder<>()));
+        DoFn fn = null;
+        for (final CFunction<?> function : functions) {
+            if (function instanceof InitialFunction) {
+                fn = new InitialFn<>((InitialFunction) function);
             } else if (function instanceof FilterFunction) {
-                collection = (PCollection) collection.apply(ParDo.of(new 
FilterFn<>((FilterFunction<C, S>) function)));
-                collection.setCoder(new TraverserCoder());
+                fn = new FilterFn<>((FilterFunction) function);
             } else if (function instanceof MapFunction) {
-                collection = (PCollection) collection.apply(ParDo.of(new 
MapFn<>((MapFunction<C, S, E>) function)));
-                collection.setCoder(new TraverserCoder());
+                fn = new MapFn<>((MapFunction) function);
+            } else if (function instanceof ReduceFunction) {
+                //fn = new ReduceFn<>((ReduceFunction)function)
             } else
                 throw new RuntimeException("You need a new step type:" + 
function);
+            this.functions.add(fn);
+            this.collection = (PCollection) collection.apply(ParDo.of(fn));
+            this.collection.setCoder(new TraverserCoder());
         }
         collection = (PCollection) collection.apply(ParDo.of(new 
OutputStep()));
+    }
 
+    public Beam(final Bytecode<C> bytecode) {
+        this(BytecodeUtil.compile(bytecode));
     }
 
     @Override
     public void addStart(Traverser<C, S> traverser) {
-
+        ((Fn) this.functions.get(0)).addStart(traverser);
     }
 
     @Override
     public Traverser<C, E> next() {
-        if (null == this.iterator) {
-            pipeline.run().waitUntilFinish();
-            this.iterator = OUTPUT.iterator();
-        }
+        this.setupPipeline();
         return this.iterator.next();
     }
 
     @Override
     public boolean hasNext() {
-        if (null == this.iterator) {
-            pipeline.run().waitUntilFinish();
-            this.iterator = OUTPUT.iterator();
-        }
+        this.setupPipeline();
         return this.iterator.hasNext();
     }
 
@@ -101,6 +103,15 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
 
     @Override
     public String toString() {
-        return this.pipeline.toString();
+        return this.functions.toString();
+    }
+
+    private final void setupPipeline() {
+        if (null == this.iterator) {
+            pipeline.run().waitUntilFinish();
+            this.iterator = new ArrayList<>(OUTPUT).iterator();
+            OUTPUT.clear();
+        }
     }
+
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
index a3d1f9d..e0df567 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
@@ -18,18 +18,18 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> {
+public class FilterFn<C, S> extends AbstractFn<C, S, S> {
 
     private FilterFunction<C, S> filterFunction;
 
     public FilterFn(final FilterFunction<C, S> filterFunction) {
+        super(filterFunction);
         this.filterFunction = filterFunction;
     }
 
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
 b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
similarity index 62%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
index a3d1f9d..1967b96 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
+++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java
@@ -18,24 +18,13 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.tinkerpop.machine.functions.FilterFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> {
+public interface Fn<C, S,E> {
 
-    private FilterFunction<C, S> filterFunction;
+    public void addStart(final Traverser<C, S> traverser);
 
-    public FilterFn(final FilterFunction<C, S> filterFunction) {
-        this.filterFunction = filterFunction;
-    }
-
-    @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, S>> output) {
-        if (traverser.filter(this.filterFunction))
-            output.output(traverser);
-    }
-}
\ No newline at end of file
+}
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java
similarity index 62%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java
index a3d1f9d..33556e6 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java
@@ -18,24 +18,30 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.tinkerpop.machine.functions.FilterFunction;
+import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
+import org.apache.tinkerpop.machine.functions.InitialFunction;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.util.Iterator;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> {
+public class InitialFn<C, S> extends AbstractFn<C, S, S> {
 
-    private FilterFunction<C, S> filterFunction;
+    private final InitialFunction<C, S> initialFunction;
 
-    public FilterFn(final FilterFunction<C, S> filterFunction) {
-        this.filterFunction = filterFunction;
+    public InitialFn(final InitialFunction<C, S> initialFunction) {
+        super(initialFunction);
+        this.initialFunction = initialFunction;
     }
 
     @ProcessElement
     public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, S>> output) {
-        if (traverser.filter(this.filterFunction))
-            output.output(traverser);
+        final Iterator<S> iterator = this.initialFunction.get();
+        while (iterator.hasNext()) {
+            output.output(new CompleteTraverser(LongCoefficient.create(), 
iterator.next()));
+        }
     }
 }
\ No newline at end of file
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
index 2b83ab2..18d916f 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
@@ -18,24 +18,43 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.tinkerpop.machine.functions.MapFunction;
+import org.apache.tinkerpop.machine.functions.NestedFunction;
+import org.apache.tinkerpop.machine.pipes.Pipes;
+import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
+import java.util.NoSuchElementException;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> {
+public class MapFn<C, S, E> extends AbstractFn<C, S, E> {
 
     private final MapFunction<C, S, E> mapFunction;
+    private boolean first = true;
 
     public MapFn(final MapFunction<C, S, E> mapFunction) {
+       super(mapFunction);
         this.mapFunction = mapFunction;
     }
 
     @ProcessElement
     public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        output.output(traverser.map(this.mapFunction));
+        if (this.first) {
+            if (this.mapFunction instanceof NestedFunction) {
+                Pipes beam  = new Pipes(((NestedFunction) 
this.mapFunction).getFunctions(), new CompleteTraverserFactory());
+                ((NestedFunction) this.mapFunction).setProcessor(beam);
+                while (!this.traversers.isEmpty()) {
+                    beam.addStart(this.traversers.remove());
+                }
+            }
+            this.first = false;
+        }
+        try {
+            output.output(traverser.map(this.mapFunction));
+        } catch(final NoSuchElementException e) {
+            // do nothing
+        }
     }
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
similarity index 55%
copy from 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
copy to 
java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
index 2b83ab2..37a46ee 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java
@@ -18,24 +18,25 @@
  */
 package org.apache.tinkerpop.machine.beam;
 
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.tinkerpop.machine.functions.MapFunction;
-import org.apache.tinkerpop.machine.traversers.Traverser;
+import org.apache.tinkerpop.machine.functions.ReduceFunction;
+import org.apache.tinkerpop.machine.functions.reduce.Reducer;
+import org.apache.tinkerpop.machine.traversers.TraverserFactory;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> {
+public class ReduceFn<C, S, E> extends AbstractFn<C, S, E> {
 
-    private final MapFunction<C, S, E> mapFunction;
+    private final ReduceFunction<C, S, E> reduceFunction;
+    private final Reducer<E> reducer;
+    private final TraverserFactory<C, E> traverserFactory;
 
-    public MapFn(final MapFunction<C, S, E> mapFunction) {
-        this.mapFunction = mapFunction;
-    }
-
-    @ProcessElement
-    public void processElement(final @Element Traverser<C, S> traverser, final 
OutputReceiver<Traverser<C, E>> output) {
-        output.output(traverser.map(this.mapFunction));
+    public ReduceFn(final ReduceFunction<C, S, E> reduceFunction,
+                    final Reducer<E> reducer,
+                    final TraverserFactory<C, E> traverserFactory) {
+        super(reduceFunction);
+        this.reduceFunction = reduceFunction;
+        this.reducer = reducer;
+        this.traverserFactory = traverserFactory;
     }
 }
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
index f35cdd1..c286034 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java
@@ -20,8 +20,6 @@ package org.apache.tinkerpop.machine.beam;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
-import org.apache.tinkerpop.machine.traversers.CompleteTraverser;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 import java.io.IOException;
@@ -40,14 +38,14 @@ public class TraverserCoder<C, S> extends 
Coder<Traverser<C, S>> {
     @Override
     public void encode(final Traverser<C, S> value, final OutputStream 
outStream) throws CoderException, IOException {
         ObjectOutputStream outputStream = new ObjectOutputStream(outStream);
-        outputStream.writeObject(value.object());
+        outputStream.writeObject(value);
     }
 
     @Override
     public Traverser<C, S> decode(InputStream inStream) throws CoderException, 
IOException {
         try {
             ObjectInputStream inputStream = new ObjectInputStream(inStream);
-            return new CompleteTraverser(LongCoefficient.create(), 
inputStream.readObject());
+            return (Traverser<C, S>) inputStream.readObject();
         } catch (final ClassNotFoundException e) {
             throw new IOException(e.getMessage(), e);
         }
diff --git 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
index 1a84253..0c30723 100644
--- 
a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
+++ 
b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.language.Gremlin;
 import org.apache.tinkerpop.language.Traversal;
 import org.apache.tinkerpop.language.TraversalSource;
 import org.apache.tinkerpop.language.TraversalUtil;
+import org.apache.tinkerpop.language.__;
 import org.apache.tinkerpop.machine.coefficients.LongCoefficient;
 import org.junit.jupiter.api.Test;
 
@@ -34,12 +35,12 @@ public class BeamTest {
         final TraversalSource<Long> g = Gremlin.<Long>traversal()
                 .coefficient(LongCoefficient.create())
                 .processor(BeamProcessor.class);
-        Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).identity().incr();
+        Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).identity().incr().is(44L);//.count();
+        traversal = g.inject(7L, 10L, 
12L).as("a").c(3L).map(__.incr()).identity().incr().is(9L);//.count();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index b38ecd4..29f64bf 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -43,7 +43,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> {
     private Step<C, ?, E> endStep;
     private Step<C, S, ?> startStep = EmptyStep.instance();
 
-    private Pipes(final List<CFunction<C>> functions, final 
TraverserFactory<C, S> traverserFactory) {
+    public Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, 
S> traverserFactory) {
         AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
         for (final CFunction<?> function : functions) {
             if (function instanceof NestedFunction)

Reply via email to