This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 0fd1e42e32242401b34c95f699b8d684b4b623fd Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Mon Mar 11 06:39:35 2019 -0600 cleaned up and organized Apache Beam machine. Need to add reducer support and then its at the same state as the Pipes machines. --- .../tinkerpop/machine/traversers/TraverserSet.java | 3 +- java/machine/beam/pom.xml | 5 ++ .../machine/beam/{MapFn.java => AbstractFn.java} | 25 ++++++---- .../org/apache/tinkerpop/machine/beam/Beam.java | 57 +++++++++++++--------- .../apache/tinkerpop/machine/beam/FilterFn.java | 4 +- .../machine/beam/{FilterFn.java => Fn.java} | 17 ++----- .../machine/beam/{FilterFn.java => InitialFn.java} | 22 ++++++--- .../org/apache/tinkerpop/machine/beam/MapFn.java | 27 ++++++++-- .../machine/beam/{MapFn.java => ReduceFn.java} | 27 +++++----- .../tinkerpop/machine/beam/TraverserCoder.java | 6 +-- .../apache/tinkerpop/machine/beam/BeamTest.java | 5 +- .../org/apache/tinkerpop/machine/pipes/Pipes.java | 2 +- 12 files changed, 119 insertions(+), 81 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java index ce52b63..66169ab 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traversers/TraverserSet.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.machine.traversers; import org.apache.tinkerpop.util.FastNoSuchElementException; +import java.io.Serializable; import java.util.AbstractSet; import java.util.Collections; import java.util.Iterator; @@ -33,7 +34,7 @@ import java.util.Spliterator; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> implements Set<Traverser<C, S>>, Queue<Traverser<C, S>> { +public class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> implements Set<Traverser<C, S>>, Queue<Traverser<C, S>>, Serializable { private final Map<Traverser<C, S>, Traverser<C, S>> map = Collections.synchronizedMap(new LinkedHashMap<>()); diff --git a/java/machine/beam/pom.xml b/java/machine/beam/pom.xml index 2fb4c82..24a9773 100644 --- a/java/machine/beam/pom.xml +++ b/java/machine/beam/pom.xml @@ -79,6 +79,11 @@ limitations under the License. <version>2.11.0</version> <scope>runtime</scope> </dependency> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>pipes</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <directory>${basedir}/target</directory> diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java similarity index 61% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java index 2b83ab2..63caa5e 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/AbstractFn.java @@ -19,23 +19,30 @@ package org.apache.tinkerpop.machine.beam; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.tinkerpop.machine.functions.MapFunction; +import org.apache.tinkerpop.machine.functions.CFunction; import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.traversers.TraverserSet; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> { +public abstract class AbstractFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> implements Fn<C, S, E> { - private final MapFunction<C, S, E> mapFunction; + protected final TraverserSet<C, S> traversers = new TraverserSet<>(); + protected final CFunction<C> function; - public MapFn(final MapFunction<C, S, E> mapFunction) { - this.mapFunction = mapFunction; + protected AbstractFn(final CFunction<C> function) { + this.function = function; } - @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - output.output(traverser.map(this.mapFunction)); + @Override + public void addStart(final Traverser<C, S> traverser) { + this.traversers.add(traverser); } + + @Override + public String toString() { + return this.function.toString(); + } + } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java index a03ee71..b93174e 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Beam.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.beam; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.tinkerpop.machine.bytecode.Bytecode; @@ -27,8 +28,9 @@ import org.apache.tinkerpop.machine.bytecode.BytecodeUtil; import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.apache.tinkerpop.machine.functions.CFunction; import org.apache.tinkerpop.machine.functions.FilterFunction; +import org.apache.tinkerpop.machine.functions.InitialFunction; import org.apache.tinkerpop.machine.functions.MapFunction; -import org.apache.tinkerpop.machine.functions.initial.InjectInitial; +import org.apache.tinkerpop.machine.functions.ReduceFunction; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; @@ -46,51 +48,51 @@ public class Beam<C, S, E> implements Processor<C, S, E> { PCollection collection; public static List<Traverser> OUTPUT = new ArrayList<>(); Iterator<Traverser> iterator = null; + private final List<DoFn> functions = new ArrayList<>(); - public Beam(final Bytecode<C> bytecode) { + public Beam(final List<CFunction<C>> functions) { this.pipeline = Pipeline.create(); this.pipeline.getCoderRegistry().registerCoderForClass(Traverser.class, new TraverserCoder<>()); + this.collection = this.pipeline.apply(Create.of(new CompleteTraverser(LongCoefficient.create(), 1L))); + this.collection.setCoder(new TraverserCoder()); - for (final CFunction<?> function : BytecodeUtil.compile(bytecode)) { - if (function instanceof InjectInitial) { - final List<Traverser<C, S>> objects = new ArrayList<>(); - final Iterator<S> iterator = ((InjectInitial) function).get(); - while (iterator.hasNext()) - objects.add(new CompleteTraverser(LongCoefficient.create(), iterator.next())); - this.collection = this.pipeline.apply(Create.of(objects).withCoder(new TraverserCoder<>())); + DoFn fn = null; + for (final CFunction<?> function : functions) { + if (function instanceof InitialFunction) { + fn = new InitialFn<>((InitialFunction) function); } else if (function instanceof FilterFunction) { - collection = (PCollection) collection.apply(ParDo.of(new FilterFn<>((FilterFunction<C, S>) function))); - collection.setCoder(new TraverserCoder()); + fn = new FilterFn<>((FilterFunction) function); } else if (function instanceof MapFunction) { - collection = (PCollection) collection.apply(ParDo.of(new MapFn<>((MapFunction<C, S, E>) function))); - collection.setCoder(new TraverserCoder()); + fn = new MapFn<>((MapFunction) function); + } else if (function instanceof ReduceFunction) { + //fn = new ReduceFn<>((ReduceFunction)function) } else throw new RuntimeException("You need a new step type:" + function); + this.functions.add(fn); + this.collection = (PCollection) collection.apply(ParDo.of(fn)); + this.collection.setCoder(new TraverserCoder()); } collection = (PCollection) collection.apply(ParDo.of(new OutputStep())); + } + public Beam(final Bytecode<C> bytecode) { + this(BytecodeUtil.compile(bytecode)); } @Override public void addStart(Traverser<C, S> traverser) { - + ((Fn) this.functions.get(0)).addStart(traverser); } @Override public Traverser<C, E> next() { - if (null == this.iterator) { - pipeline.run().waitUntilFinish(); - this.iterator = OUTPUT.iterator(); - } + this.setupPipeline(); return this.iterator.next(); } @Override public boolean hasNext() { - if (null == this.iterator) { - pipeline.run().waitUntilFinish(); - this.iterator = OUTPUT.iterator(); - } + this.setupPipeline(); return this.iterator.hasNext(); } @@ -101,6 +103,15 @@ public class Beam<C, S, E> implements Processor<C, S, E> { @Override public String toString() { - return this.pipeline.toString(); + return this.functions.toString(); + } + + private final void setupPipeline() { + if (null == this.iterator) { + pipeline.run().waitUntilFinish(); + this.iterator = new ArrayList<>(OUTPUT).iterator(); + OUTPUT.clear(); + } } + } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java index a3d1f9d..e0df567 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java @@ -18,18 +18,18 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.tinkerpop.machine.functions.FilterFunction; import org.apache.tinkerpop.machine.traversers.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> { +public class FilterFn<C, S> extends AbstractFn<C, S, S> { private FilterFunction<C, S> filterFunction; public FilterFn(final FilterFunction<C, S> filterFunction) { + super(filterFunction); this.filterFunction = filterFunction; } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java similarity index 62% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java index a3d1f9d..1967b96 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/Fn.java @@ -18,24 +18,13 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.tinkerpop.machine.functions.FilterFunction; import org.apache.tinkerpop.machine.traversers.Traverser; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> { +public interface Fn<C, S,E> { - private FilterFunction<C, S> filterFunction; + public void addStart(final Traverser<C, S> traverser); - public FilterFn(final FilterFunction<C, S> filterFunction) { - this.filterFunction = filterFunction; - } - - @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, S>> output) { - if (traverser.filter(this.filterFunction)) - output.output(traverser); - } -} \ No newline at end of file +} diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java similarity index 62% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java index a3d1f9d..33556e6 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/FilterFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/InitialFn.java @@ -18,24 +18,30 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.tinkerpop.machine.functions.FilterFunction; +import org.apache.tinkerpop.machine.coefficients.LongCoefficient; +import org.apache.tinkerpop.machine.functions.InitialFunction; +import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; +import java.util.Iterator; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class FilterFn<C, S> extends DoFn<Traverser<C, S>, Traverser<C, S>> { +public class InitialFn<C, S> extends AbstractFn<C, S, S> { - private FilterFunction<C, S> filterFunction; + private final InitialFunction<C, S> initialFunction; - public FilterFn(final FilterFunction<C, S> filterFunction) { - this.filterFunction = filterFunction; + public InitialFn(final InitialFunction<C, S> initialFunction) { + super(initialFunction); + this.initialFunction = initialFunction; } @ProcessElement public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, S>> output) { - if (traverser.filter(this.filterFunction)) - output.output(traverser); + final Iterator<S> iterator = this.initialFunction.get(); + while (iterator.hasNext()) { + output.output(new CompleteTraverser(LongCoefficient.create(), iterator.next())); + } } } \ No newline at end of file diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java index 2b83ab2..18d916f 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java @@ -18,24 +18,43 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.tinkerpop.machine.functions.MapFunction; +import org.apache.tinkerpop.machine.functions.NestedFunction; +import org.apache.tinkerpop.machine.pipes.Pipes; +import org.apache.tinkerpop.machine.traversers.CompleteTraverserFactory; import org.apache.tinkerpop.machine.traversers.Traverser; +import java.util.NoSuchElementException; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> { +public class MapFn<C, S, E> extends AbstractFn<C, S, E> { private final MapFunction<C, S, E> mapFunction; + private boolean first = true; public MapFn(final MapFunction<C, S, E> mapFunction) { + super(mapFunction); this.mapFunction = mapFunction; } @ProcessElement public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - output.output(traverser.map(this.mapFunction)); + if (this.first) { + if (this.mapFunction instanceof NestedFunction) { + Pipes beam = new Pipes(((NestedFunction) this.mapFunction).getFunctions(), new CompleteTraverserFactory()); + ((NestedFunction) this.mapFunction).setProcessor(beam); + while (!this.traversers.isEmpty()) { + beam.addStart(this.traversers.remove()); + } + } + this.first = false; + } + try { + output.output(traverser.map(this.mapFunction)); + } catch(final NoSuchElementException e) { + // do nothing + } } } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java similarity index 55% copy from java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java copy to java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java index 2b83ab2..37a46ee 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/MapFn.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/ReduceFn.java @@ -18,24 +18,25 @@ */ package org.apache.tinkerpop.machine.beam; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.tinkerpop.machine.functions.MapFunction; -import org.apache.tinkerpop.machine.traversers.Traverser; +import org.apache.tinkerpop.machine.functions.ReduceFunction; +import org.apache.tinkerpop.machine.functions.reduce.Reducer; +import org.apache.tinkerpop.machine.traversers.TraverserFactory; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class MapFn<C, S, E> extends DoFn<Traverser<C, S>, Traverser<C, E>> { +public class ReduceFn<C, S, E> extends AbstractFn<C, S, E> { - private final MapFunction<C, S, E> mapFunction; + private final ReduceFunction<C, S, E> reduceFunction; + private final Reducer<E> reducer; + private final TraverserFactory<C, E> traverserFactory; - public MapFn(final MapFunction<C, S, E> mapFunction) { - this.mapFunction = mapFunction; - } - - @ProcessElement - public void processElement(final @Element Traverser<C, S> traverser, final OutputReceiver<Traverser<C, E>> output) { - output.output(traverser.map(this.mapFunction)); + public ReduceFn(final ReduceFunction<C, S, E> reduceFunction, + final Reducer<E> reducer, + final TraverserFactory<C, E> traverserFactory) { + super(reduceFunction); + this.reduceFunction = reduceFunction; + this.reducer = reducer; + this.traverserFactory = traverserFactory; } } diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java index f35cdd1..c286034 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/TraverserCoder.java @@ -20,8 +20,6 @@ package org.apache.tinkerpop.machine.beam; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.tinkerpop.machine.coefficients.LongCoefficient; -import org.apache.tinkerpop.machine.traversers.CompleteTraverser; import org.apache.tinkerpop.machine.traversers.Traverser; import java.io.IOException; @@ -40,14 +38,14 @@ public class TraverserCoder<C, S> extends Coder<Traverser<C, S>> { @Override public void encode(final Traverser<C, S> value, final OutputStream outStream) throws CoderException, IOException { ObjectOutputStream outputStream = new ObjectOutputStream(outStream); - outputStream.writeObject(value.object()); + outputStream.writeObject(value); } @Override public Traverser<C, S> decode(InputStream inStream) throws CoderException, IOException { try { ObjectInputStream inputStream = new ObjectInputStream(inStream); - return new CompleteTraverser(LongCoefficient.create(), inputStream.readObject()); + return (Traverser<C, S>) inputStream.readObject(); } catch (final ClassNotFoundException e) { throw new IOException(e.getMessage(), e); } diff --git a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java index 1a84253..0c30723 100644 --- a/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java +++ b/java/machine/beam/src/test/java/org/apache/tinkerpop/machine/beam/BeamTest.java @@ -22,6 +22,7 @@ import org.apache.tinkerpop.language.Gremlin; import org.apache.tinkerpop.language.Traversal; import org.apache.tinkerpop.language.TraversalSource; import org.apache.tinkerpop.language.TraversalUtil; +import org.apache.tinkerpop.language.__; import org.apache.tinkerpop.machine.coefficients.LongCoefficient; import org.junit.jupiter.api.Test; @@ -34,12 +35,12 @@ public class BeamTest { final TraversalSource<Long> g = Gremlin.<Long>traversal() .coefficient(LongCoefficient.create()) .processor(BeamProcessor.class); - Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).identity().incr(); + Traversal<Long, Long, Long> traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(7L, 10L, 12L).as("a").c(3L).identity().incr().is(44L);//.count(); + traversal = g.inject(7L, 10L, 12L).as("a").c(3L).map(__.incr()).identity().incr().is(9L);//.count(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(traversal.toList()); diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java index b38ecd4..29f64bf 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java @@ -43,7 +43,7 @@ public class Pipes<C, S, E> implements Processor<C, S, E> { private Step<C, ?, E> endStep; private Step<C, S, ?> startStep = EmptyStep.instance(); - private Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, S> traverserFactory) { + public Pipes(final List<CFunction<C>> functions, final TraverserFactory<C, S> traverserFactory) { AbstractStep<C, ?, ?> previousStep = EmptyStep.instance(); for (final CFunction<?> function : functions) { if (function instanceof NestedFunction)