This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new c7ee5a0 got repeat() working. c7ee5a0 is described below commit c7ee5a0c93f8f54509a2c82971ae6c108f2caec8 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Wed Apr 3 13:36:10 2019 -0600 got repeat() working. --- .../processor/rxjava/util/TopologyUtil.java | 91 +++++++++++++++------- .../machine/processor/rxjava/RxJavaTest.java | 83 +++++++++++++++++++- 2 files changed, 143 insertions(+), 31 deletions(-) diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java index b6b0e4a..4146623 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java @@ -27,6 +27,7 @@ import org.apache.tinkerpop.machine.function.FlatMapFunction; import org.apache.tinkerpop.machine.function.InitialFunction; import org.apache.tinkerpop.machine.function.MapFunction; import org.apache.tinkerpop.machine.function.ReduceFunction; +import org.apache.tinkerpop.machine.function.branch.RepeatBranch; import org.apache.tinkerpop.machine.processor.rxjava.BranchFlow; import org.apache.tinkerpop.machine.processor.rxjava.FilterFlow; import org.apache.tinkerpop.machine.processor.rxjava.FlatMapFlow; @@ -46,6 +47,8 @@ import java.util.Map; */ public final class TopologyUtil { + private static final int MAX_ITERATIONS = 20; + public static <C, S, E> Flowable<Traverser<C, E>> compile(final Flowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) { final TraverserFactory<C> traverserFactory = compilation.getTraverserFactory(); Flowable<Traverser<C, E>> sink = (Flowable) source; @@ -55,34 +58,6 @@ public final class TopologyUtil { return sink; } - /* - private final void stageInput() { - if (this.hasStartPredicates) { - final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove(); - if (1 == this.untilLocation) { - if (this.untilCompilation.filterTraverser(traverser)) { - this.outputTraversers.add(traverser); - } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser)) { - this.outputTraversers.add(traverser.repeatDone(this.repeatBranch)); - this.repeat.addTraverser(traverser); - } else - this.repeat.addTraverser(traverser); - } else if (1 == this.emitLocation) { - if (this.emitCompilation.filterTraverser(traverser)) - this.outputTraversers.add(traverser.repeatDone(this.repeatBranch)); - if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser)) - this.outputTraversers.add(traverser.repeatDone(this.repeatBranch)); - else - this.repeat.addTraverser(traverser); - } - } else { - this.repeat.addTraverser(this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove()); - } - } - - - */ - private static <C, S, E, B> Flowable<Traverser<C, E>> extend(Flowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) { if (function instanceof MapFunction) return flow.map(new MapFlow<>((MapFunction<C, S, E>) function)); @@ -114,6 +89,66 @@ public final class TopologyUtil { sink = sink.mergeWith(branchFlow); } return sink; + } else if (function instanceof RepeatBranch) { + final RepeatBranch<C, S> repeatBranch = (RepeatBranch<C, S>) function; + final List<Publisher<Traverser<C, S>>> outputs = new ArrayList<>(); + for (int i = 0; i < MAX_ITERATIONS; i++) { + Flowable<List> selectorFlow = flow.flatMapIterable(t -> { + final List<List> list = new ArrayList<>(); + if (repeatBranch.hasStartPredicates()) { + if (1 == repeatBranch.getUntilLocation()) { + if (repeatBranch.getUntil().filterTraverser(t)) { + list.add(List.of(0, t.repeatDone(repeatBranch))); + } else if (2 == repeatBranch.getEmitLocation() && repeatBranch.getEmit().filterTraverser(t)) { + list.add(List.of(1, t)); + list.add(List.of(0, t.repeatDone(repeatBranch))); + } else + list.add(List.of(1, t)); + } else if (1 == repeatBranch.getEmitLocation()) { + if (repeatBranch.getEmit().filterTraverser(t)) + list.add(List.of(0, t.repeatDone(repeatBranch))); + if (2 == repeatBranch.getUntilLocation() && repeatBranch.getUntil().filterTraverser(t)) { + list.add(List.of(0, t.repeatDone(repeatBranch))); + } else + list.add(List.of(1, t)); + } + } else + list.add(List.of(1, t)); + return list; + }); + outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1))); + flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)).publish(f -> compile(f, repeatBranch.getRepeat())); + selectorFlow = flow.flatMapIterable(t -> { + final List<List> list = new ArrayList<>(); + if (repeatBranch.hasEndPredicates()) { + if (3 == repeatBranch.getUntilLocation()) { + if (repeatBranch.getUntil().filterTraverser(t)) { + list.add(List.of(0, t.repeatDone(repeatBranch))); + } else if (4 == repeatBranch.getEmitLocation() && repeatBranch.getEmit().filterTraverser(t)) { + list.add(List.of(0, t.repeatDone(repeatBranch))); + list.add(List.of(1, t)); + } else + list.add(List.of(1, t)); + } else if (3 == repeatBranch.getEmitLocation()) { + if (repeatBranch.getEmit().filterTraverser(t)) + list.add(List.of(0, t.repeatDone(repeatBranch))); + if (4 == repeatBranch.getUntilLocation() && repeatBranch.getUntil().filterTraverser(t)) + list.add(List.of(0, t.repeatDone(repeatBranch))); + else + list.add(List.of(1, t)); + } + } else + list.add(List.of(1, t.repeatLoop(repeatBranch))); + return list; + }); + outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1))); + flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)); + } + Flowable<Traverser<C, S>> sink = flow.filter(t -> false); // branches are the only outputs + for (final Publisher<Traverser<C, S>> output : outputs) { + sink = sink.mergeWith(output); + } + return (Flowable) sink; } throw new RuntimeException("Need a new execution plan step: " + function); } diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java index 8085bbc..74f5b63 100644 --- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java +++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java @@ -27,9 +27,13 @@ import org.apache.tinkerpop.language.gremlin.common.__; import org.apache.tinkerpop.machine.Machine; import org.apache.tinkerpop.machine.coefficient.LongCoefficient; import org.apache.tinkerpop.machine.species.LocalMachine; +import org.apache.tinkerpop.machine.species.remote.MachineServer; +import org.apache.tinkerpop.machine.species.remote.RemoteMachine; import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy; import org.junit.jupiter.api.Test; +import java.util.List; + import static org.apache.tinkerpop.language.gremlin.common.__.incr; /** @@ -45,24 +49,97 @@ public class RxJavaTest { .withProcessor(RxJavaProcessor.class) .withStrategy(IdentityStrategy.class); - Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(),__.<Long>incr().incr()); + Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(), __.<Long>incr().incr()); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal.toList()); System.out.println("\n----------\n"); - traversal = g.inject(1L).choose(__.is(1L), incr(),__.<Long>incr().incr()); + traversal = g.inject(1L).choose(__.is(1L), incr(), __.<Long>incr().incr()); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + + traversal = g.inject(1L).emit().repeat(incr()).until(__.is(3L)); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal.toList()); System.out.println("\n----------\n"); + } + + @Test + public void shouldWork() { + final MachineServer server = new MachineServer(7777); + final Machine machine = RemoteMachine.open(6666, "localhost", 7777); + final TraversalSource<Long> g = Gremlin.<Long>traversal(machine) + .withCoefficient(LongCoefficient.class) + .withProcessor(RxJavaProcessor.class) + .withStrategy(IdentityStrategy.class); - /*traversal = g.inject(1L).until(__.is(P.lt(3L))).emit().repeat(incr()); + Traversal<Long, ?, ?> traversal = g.inject(List.of(1L, 1L)).<Long>unfold().map(incr()).c(4L).repeat(incr()).until(__.is(__.constant(8L).incr().incr())).sum(); System.out.println(TraversalUtil.getBytecode(traversal)); System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(List.of(1L, 2L)).unfold().is(P.lt(__.constant(2L))).groupCount().by(__.incr()); System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(1L).times(10).repeat(__.incr()).emit(); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + /*traversal = g.inject(1L).repeat(incr()).emit(__.constant(true)).until(__.<Long, Long>loops().is(P.gt(5))); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); System.out.println(traversal.toList()); System.out.println("\n----------\n");*/ + traversal = g.inject(1L).emit(__.constant(true)).until(__.is(5L)).repeat(incr()); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(1L).until(__.is(5L)).repeat(incr()).emit(__.constant(true)); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(7L).union(__.incr(), __.<Long>incr().incr().union(__.incr(), __.incr())); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(7L).choose(__.is(7L), __.incr()).sum(); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(List.of(7L, 8L, 9L)).<Long>unfold().choose(__.is(7L), __.incr(), __.<Long>incr().incr()); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(7L).as("a").union(__.<Long>incr().as("b"), __.<Long>incr().incr().as("b"), __.<Long>incr().incr().incr().as("b")).path("a", "b").by(__.incr()); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + /*traversal = g.inject(7L, 7L, 7L, 2L).incr().barrier(); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(traversal.hasNext()); + System.out.println(traversal.nextTraverser()); + System.out.println(traversal.hasNext()); + System.out.println(traversal.nextTraverser()); + System.out.println(traversal.hasNext());*/ + /// + g.close(); + machine.close(); + server.close(); } }