This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new 0fd1e42 cleaned up and organized Apache Beam machine. Need to add reducer support and then its at the same state as the Pipes machines. 0fd1e42 is described below commit 0fd1e42e32242401b34c95f699b8d684b4b623fd Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Mon Mar 11 06:39:35 2019 -0600 cleaned up and organized Apache Beam machine. Need to add reducer support and then its at the same state as the Pipes machines. --- .../tinkerpop/machine/traversers/TraverserSet.java | 3 +- java/machine/beam/pom.xml | 5 ++ .../machine/beam/{MapFn.java => AbstractFn.java} | 25 ++++++---- .../org/apache/tinkerpop/machine/beam/Beam.java | 57 +++++++++++++--------- .../apache/tinkerpop/machine/beam/FilterFn.java | 4 +- .../machine/beam/{FilterFn.java => Fn.java} | 17 ++----- .../machine/beam/{FilterFn.java => InitialFn.java} | 22 ++++++--- .../org/apache/tinkerpop/machine/beam/MapFn.java | 27 ++++++++-- .../machine/beam/{MapFn.java => ReduceFn.java} | 27 +++++----- .../tinkerpop/machine/beam/TraverserCoder.java | 6 +-- .../apache/tinkerpop/machine/beam/BeamTest.java | 5 +- .../org/apache/tinkerpop/machine/pipes/Pipes.java | 2 +- 12 files changed, 119 insertions(+), 81 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java index ce52b63..66169ab 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.util.FastNoSuchElementException; +import java.io.Serializable; import java.util.AbstractSet; import java.util.Collections; import java.util.Iterator; @@ -33,7 +34,7 @@ import java.util.Spliterator; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> implements Set<Traverser<C, S>>, Queue<Traverser<C, S>> { +public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> implements Set<Traverser<C, S>>, Queue<Traverser<C, S>>, Serializable { private final Map<Traverser<C, S>, Traverser<C, S>> map = Collections.synchronizedMap(new LinkedHashMap<>()); diff --git a/java/machine/beam/pom.xml b/java/machine/beam/pom.xml index 2fb4c82..24a9773 100644 --- a/java/machine/beam/pom.xml +++ b/java/machine/beam/pom.xml @@ -79,6 +79,11 @@ limitations under the License. <version>2.11.0</version> <scope>runtime</scope> </dependency> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>pipes</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <directory>${basedir}/target</directory> diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java similarity index 61% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java index 2b83ab2..63caa5e 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java @@ -19,23 +19,30 @@ package org.apache.tinkerpop.machine.beam; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.tinkerpop.machine.functions.MapFunction; +import org.apache.tinkerpop.machine.functions.CFunction; import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.traversers.TraverserSet; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> { +public abstract class AbstractFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> implements Fn<C, S, E> { - private final MapFunction<C, S, E> mapFunction; + protected final TraverserSet<C, S> traversers = new TraverserSet<>(); + protected final CFunction<C> function; - public MapFn(final MapFunction<C, S, E> mapFunction) { - this.mapFunction = mapFunction; + protected AbstractFn(final CFunction<C> function) { + this.function = function; } - @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - output.output(traverser.map(this.mapFunction)); + @Override + public void addStart(final Traverser<C, S> traverser) { + this.traversers.add(traverser); } + + @Override + public String toString() { + return this.function.toString(); + } + } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java index a03ee71..b93174e 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.beam; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.tinkerpop.machine.bytecode.Bytecode; @@ -27,8 +28,9 @@ import org.apache.tinkerpop.machine.bytecode.BytecodeUtil; import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.apache.tinkerpop.machine.functions.CFunction; import org.apache.tinkerpop.machine.functions.FilterFunction; +import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.functions.MapFunction; -import org.apache.tinkerpop.machine.functions.initial.InjectInitial; +import org.apache.tinkerpop.machine.functions.ReduceFunction; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; @@ -46,51 +48,51 @@ public class Beam<C, S, E> implements Processor<C, S, E> { PCollection collection; public static List<Traverser> OUTPUT = new ArrayList<>(); Iterator<Traverser> iterator = null; + private final List<DoFn> functions = new ArrayList<>(); - public Beam(final Bytecode<C> bytecode) { + public Beam(final List<CFunction<C>> functions) { this.pipeline = Pipeline.create(); this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new TraverserCoder<>()); + this.collection = this.pipeline.apply(Create.of(new CompleteTraverser(LongCoefficient.create(), 1L))); + this.collection.setCoder(new TraverserCoder()); - for (final CFunction<?> function : BytecodeUtil.compile(bytecode)) { - if (function instanceof InjectInitial) { - final List<Traverser<C, S>> objects = new ArrayList<>(); - final Iterator<S> iterator = ((InjectInitial) function).get(); - while (iterator.hasNext()) - objects.add(new CompleteTraverser(LongCoefficient.create(), iterator.next())); - this.collection = this.pipeline.apply(Create.of(objects).withCoder(new TraverserCoder<>())); + DoFn fn = null; + for (final CFunction<?> function : functions) { + if (function instanceof InitialFunction) { + fn = new InitialFn<>((InitialFunction) function); } else if (function instanceof FilterFunction) { - collection = (PCollection) collection.apply(ParDo.of(new FilterFn<>((FilterFunction<C, S>) function))); - collection.setCoder(new TraverserCoder()); + fn = new FilterFn<>((FilterFunction) function); } else if (function instanceof MapFunction) { - collection = (PCollection) collection.apply(ParDo.of(new MapFn<>((MapFunction<C, S, E>) function))); - collection.setCoder(new TraverserCoder()); + fn = new MapFn<>((MapFunction) function); + } else if (function instanceof ReduceFunction) { + //fn = new ReduceFn<>((ReduceFunction)function) } else throw new RuntimeException("You need a new step type:" + function); + this.functions.add(fn); + this.collection = (PCollection) collection.apply(ParDo.of(fn)); + this.collection.setCoder(new TraverserCoder()); } collection = (PCollection) collection.apply(ParDo.of(new OutputStep())); + } + public Beam(final Bytecode<C> bytecode) { + this(BytecodeUtil.compile(bytecode)); } @Override public void addStart(Traverser<C, S> traverser) { - + ((Fn) this.functions.get(0)).addStart(traverser); } @Override public Traverser<C, E> next() { - if (null == this.iterator) { - pipeline.run().waitUntilFinish(); - this.iterator = OUTPUT.iterator(); - } + this.setupPipeline(); return this.iterator.next(); } @Override public boolean hasNext() { - if (null == this.iterator) { - pipeline.run().waitUntilFinish(); - this.iterator = OUTPUT.iterator(); - } + this.setupPipeline(); return this.iterator.hasNext(); } @@ -101,6 +103,15 @@ public class Beam<C, S, E> implements Processor<C, S, E> { @Override public String toString() { - return this.pipeline.toString(); + return this.functions.toString(); + } + + private final void setupPipeline() { + if (null == this.iterator) { + pipeline.run().waitUntilFinish(); + this.iterator = new ArrayList<>(OUTPUT).iterator(); + OUTPUT.clear(); + } } + } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java index a3d1f9d..e0df567 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java @@ -18,18 +18,18 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.tinkerpop.machine.functions.FilterFunction; import org.apache.tinkerpop.machine.traversers.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> { +public class FilterFn<C, S> extends AbstractFn<C, S, S> { private FilterFunction<C, S> filterFunction; public FilterFn(final FilterFunction<C, S> filterFunction) { + super(filterFunction); this.filterFunction = filterFunction; } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java similarity index 62% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java index a3d1f9d..1967b96 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java @@ -18,24 +18,13 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.tinkerpop.machine.functions.FilterFunction; import org.apache.tinkerpop.machine.traversers.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> { +public interface Fn<C, S,E> { - private FilterFunction<C, S> filterFunction; + public void addStart(final Traverser<C, S> traverser); - public FilterFn(final FilterFunction<C, S> filterFunction) { - this.filterFunction = filterFunction; - } - - @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, S>> output) { - if (traverser.filter(this.filterFunction)) - output.output(traverser); - } -} \ No newline at end of file +} diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java similarity index 62% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java index a3d1f9d..33556e6 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java @@ -18,24 +18,30 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.tinkerpop.machine.functions.FilterFunction; +import org.apache.tinkerpop.machine.coefficients.LongCoefficient; +import org.apache.tinkerpop.machine.functions.InitialFunction; +import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; +import java.util.Iterator; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> { +public class InitialFn<C, S> extends AbstractFn<C, S, S> { - private FilterFunction<C, S> filterFunction; + private final InitialFunction<C, S> initialFunction; - public FilterFn(final FilterFunction<C, S> filterFunction) { - this.filterFunction = filterFunction; + public InitialFn(final InitialFunction<C, S> initialFunction) { + super(initialFunction); + this.initialFunction = initialFunction; } @ProcessElement public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, S>> output) { - if (traverser.filter(this.filterFunction)) - output.output(traverser); + final Iterator<S> iterator = this.initialFunction.get(); + while (iterator.hasNext()) { + output.output(new CompleteTraverser(LongCoefficient.create(), iterator.next())); + } } } \ No newline at end of file diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java index 2b83ab2..18d916f 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java @@ -18,24 +18,43 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.tinkerpop.machine.functions.MapFunction; +import org.apache.tinkerpop.machine.functions.NestedFunction; +import org.apache.tinkerpop.machine.pipes.Pipes; +import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory; import org.apache.tinkerpop.machine.traversers.Traverser; +import java.util.NoSuchElementException; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> { +public class MapFn<C, S, E> extends AbstractFn<C, S, E> { private final MapFunction<C, S, E> mapFunction; + private boolean first = true; public MapFn(final MapFunction<C, S, E> mapFunction) { + super(mapFunction); this.mapFunction = mapFunction; } @ProcessElement public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - output.output(traverser.map(this.mapFunction)); + if (this.first) { + if (this.mapFunction instanceof NestedFunction) { + Pipes beam = new Pipes(((NestedFunction) this.mapFunction).getFunctions(), new CompleteTraverserFactory()); + ((NestedFunction) this.mapFunction).setProcessor(beam); + while (!this.traversers.isEmpty()) { + beam.addStart(this.traversers.remove()); + } + } + this.first = false; + } + try { + output.output(traverser.map(this.mapFunction)); + } catch(final NoSuchElementException e) { + // do nothing + } } } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java similarity index 55% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java index 2b83ab2..37a46ee 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java @@ -18,24 +18,25 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.tinkerpop.machine.functions.MapFunction; -import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.functions.ReduceFunction; +import org.apache.tinkerpop.machine.functions.reduce.Reducer; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> { +public class ReduceFn<C, S, E> extends AbstractFn<C, S, E> { - private final MapFunction<C, S, E> mapFunction; + private final ReduceFunction<C, S, E> reduceFunction; + private final Reducer<E> reducer; + private final TraverserFactory<C, E> traverserFactory; - public MapFn(final MapFunction<C, S, E> mapFunction) { - this.mapFunction = mapFunction; - } - - @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - output.output(traverser.map(this.mapFunction)); + public ReduceFn(final ReduceFunction<C, S, E> reduceFunction, + final Reducer<E> reducer, + final TraverserFactory<C, E> traverserFactory) { + super(reduceFunction); + this.reduceFunction = reduceFunction; + this.reducer = reducer; + this.traverserFactory = traverserFactory; } } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java index f35cdd1..c286034 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java @@ -20,8 +20,6 @@ package org.apache.tinkerpop.machine.beam; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.tinkerpop.machine.coefficients.LongCoefficient; -import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; import java.io.IOException; @@ -40,14 +38,14 @@ public class TraverserCoder<C, S> extends Coder<Traverser<C, S>> { @Override public void encode(final Traverser<C, S> value, final OutputStream outStream) throws CoderException, IOException { ObjectOutputStream outputStream = new ObjectOutputStream(outStream); - outputStream.writeObject(value.object()); + outputStream.writeObject(value); } @Override public Traverser<C, S> decode(InputStream inStream) throws CoderException, IOException { try { ObjectInputStream inputStream = new ObjectInputStream(inStream); - return new CompleteTraverser(LongCoefficient.create(), inputStream.readObject()); + return (Traverser<C, S>) inputStream.readObject(); } catch (final ClassNotFoundException e) { throw new IOException(e.getMessage(), e); } diff --git a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java index 1a84253..0c30723 100644 --- a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java +++ b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java @@ -22,6 +22,7 @@ import org.apache.tinkerpop.language.Gremlin; import org.apache.tinkerpop.language.Traversal; import org.apache.tinkerpop.language.TraversalSource; import org.apache.tinkerpop.language.TraversalUtil; +import org.apache.tinkerpop.language.__; import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.junit.jupiter.api.Test; @@ -34,12 +35,12 @@ public class BeamTest { final TraversalSource<Long> g = Gremlin.<Long>traversal() .coefficient(LongCoefficient.create()) .processor(BeamProcessor.class); - Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).identity().incr(); + Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L, 10L, 12L).as("a").c(3L).identity().incr().is(44L);//.count(); + traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().is(9L);//.count(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java index b38ecd4..29f64bf 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java @@ -43,7 +43,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> { private Step<C, ?, E> endStep; private Step<C, S, ?> startStep = EmptyStep.instance(); - private Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, S> traverserFactory) { + public Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, S> traverserFactory) { AbstractStep<C, ?, ?> previousStep = EmptyStep.instance(); for (final CFunction<?> function : functions) { if (function instanceof NestedFunction)