This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new 414211a first stub at RxJavaProcessor. everything works except for repeat(). I'm all confused and lost. Need to think. 414211a is described below commit 414211abde8d2f5f257b67f50ff4b192655db104 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Wed Apr 3 12:49:50 2019 -0600 first stub at RxJavaProcessor. everything works except for repeat(). I'm all confused and lost. Need to think. --- .../machine/function/branch/BranchBranch.java | 3 +- java/machine/processor/pom.xml | 1 + java/machine/processor/rxjava/pom.xml | 71 ++++++++++++ .../machine/processor/rxjava/BranchFlow.java | 51 +++++++++ .../machine/processor/rxjava/FilterFlow.java | 40 +++++++ .../machine/processor/rxjava/FlatMapFlow.java | 40 +++++++ .../machine/processor/rxjava/MapFlow.java | 40 +++++++ .../machine/processor/rxjava/ReduceFlow.java | 41 +++++++ .../tinkerpop/machine/processor/rxjava/RxJava.java | 84 +++++++++++++++ .../machine/processor/rxjava/RxJavaProcessor.java | 42 ++++++++ .../processor/rxjava/strategy/RxJavaStrategy.java | 38 +++++++ .../processor/rxjava/util/TopologyUtil.java | 120 +++++++++++++++++++++ .../machine/processor/rxjava/RxJavaTest.java | 68 ++++++++++++ 13 files changed, 638 insertions(+), 1 deletion(-) diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java index 13f182c..165a87d 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/BranchBranch.java @@ -27,6 +27,7 @@ import org.apache.tinkerpop.machine.function.BranchFunction; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -68,7 +69,7 @@ public final class BranchBranch<C, S, E> extends AbstractFunction<C> implements public static <C, S, E> BranchBranch<C, S, E> compile(final Instruction<C> instruction) { final Object[] args = instruction.args(); - final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches = new HashMap<>(); + final Map<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches = new LinkedHashMap<>(); for (int i = 0; i < args.length; i = i + 2) { final Compilation<C, S, ?> predicate = Symbols.DEFAULT.equals(args[i]) ? null : Compilation.compile(args[i]); final Compilation<C, S, E> branch = Compilation.compile(args[i + 1]); diff --git a/java/machine/processor/pom.xml b/java/machine/processor/pom.xml index f23513c..6a34820 100644 --- a/java/machine/processor/pom.xml +++ b/java/machine/processor/pom.xml @@ -27,5 +27,6 @@ limitations under the License. <modules> <module>pipes</module> <module>beam</module> + <module>rxjava</module> </modules> </project> \ No newline at end of file diff --git a/java/machine/processor/rxjava/pom.xml b/java/machine/processor/rxjava/pom.xml new file mode 100644 index 0000000..d41d44f --- /dev/null +++ b/java/machine/processor/rxjava/pom.xml @@ -0,0 +1,71 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>processor</artifactId> + <groupId>org.apache.tinkerpop</groupId> + <version>4.0.0-SNAPSHOT</version> + </parent> + <name>Apache TinkerPop :: Machine :: RxJava</name> + <artifactId>rxjava</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>machine-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>io.reactivex.rxjava2</groupId> + <artifactId>rxjava</artifactId> + <version>2.2.8</version> + </dependency> + <!-- TEST --> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>gremlin</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tinkerpop</groupId> + <artifactId>blueprints</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <directory>${basedir}/target</directory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testResources> + <testResource> + <directory>${basedir}/src/test/resources + </directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java new file mode 100644 index 0000000..5b0d7b7 --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/BranchFlow.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava; + +import io.reactivex.functions.Function; +import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; +import org.apache.tinkerpop.machine.function.BranchFunction; +import org.apache.tinkerpop.machine.traverser.Traverser; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class BranchFlow<C, S> implements Function<Traverser<C, S>, List> { + + private final List<Compilation<C, S, ?>> selectors; + + public BranchFlow(final BranchFunction<C, S, ?> function) { + this.selectors = new ArrayList<>(function.getBranches().keySet()); + } + + @Override + public List apply(final Traverser<C, S> traverser) { + for (int i = 0; i < this.selectors.size(); i++) { + final Compilation<C, S, ?> selector = this.selectors.get(i); + if (null != selector) { + if (selector.filterTraverser(traverser)) + return List.of(i, traverser); + } + } + return List.of(-1, traverser); + } +} diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java new file mode 100644 index 0000000..84eaf68 --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava; + +import io.reactivex.functions.Predicate; +import org.apache.tinkerpop.machine.function.FilterFunction; +import org.apache.tinkerpop.machine.traverser.Traverser; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class FilterFlow<C, S> implements Predicate<Traverser<C, S>> { + + private final FilterFunction<C, S> function; + + public FilterFlow(final FilterFunction<C, S> function) { + this.function = function; + } + + @Override + public boolean test(final Traverser<C, S> traverser) { + return this.function.test(traverser); // todo: make this 0/1-flatmap so traverser splitting is correct + } +} diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java new file mode 100644 index 0000000..9000a84 --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava; + +import io.reactivex.functions.Function; +import org.apache.tinkerpop.machine.function.FlatMapFunction; +import org.apache.tinkerpop.machine.traverser.Traverser; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class FlatMapFlow<C, S, E> implements Function<Traverser<C, S>, Iterable<Traverser<C, E>>> { + + private FlatMapFunction<C, S, E> function; + + public FlatMapFlow(final FlatMapFunction<C, S, E> function) { + this.function = function; + } + + @Override + public Iterable<Traverser<C, E>> apply(final Traverser<C, S> traverser) { + return () -> traverser.flatMap(this.function); + } +} diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java new file mode 100644 index 0000000..e09eeae --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava; + +import io.reactivex.functions.Function; +import org.apache.tinkerpop.machine.function.MapFunction; +import org.apache.tinkerpop.machine.traverser.Traverser; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class MapFlow<C, S, E> implements Function<Traverser<C, S>, Traverser<C, E>> { + + private final MapFunction<C, S, E> function; + + public MapFlow(final MapFunction<C, S, E> function) { + this.function = function; + } + + @Override + public Traverser<C, E> apply(final Traverser<C, S> traverser) { + return traverser.map(this.function); + } +} diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java new file mode 100644 index 0000000..d26c3d5 --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ReduceFlow.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava; + +import io.reactivex.functions.BiFunction; +import org.apache.tinkerpop.machine.function.ReduceFunction; +import org.apache.tinkerpop.machine.traverser.Traverser; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ReduceFlow<C, S, E> implements BiFunction<Traverser<C, E>, Traverser<C, S>, Traverser<C, E>> { + + private final ReduceFunction<C, S, E> function; + + public ReduceFlow(final ReduceFunction<C, S, E> function) { + this.function = function; + } + + + @Override + public Traverser<C, E> apply(final Traverser<C, E> seed, final Traverser<C, S> traverser) { + return seed.split(this.function.apply(traverser, seed.object())); // todo: need to think about this re-wrap of the seed + } +} diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java new file mode 100644 index 0000000..cd81390 --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava; + +import io.reactivex.Flowable; +import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; +import org.apache.tinkerpop.machine.processor.Processor; +import org.apache.tinkerpop.machine.processor.rxjava.util.TopologyUtil; +import org.apache.tinkerpop.machine.traverser.Traverser; +import org.apache.tinkerpop.machine.traverser.TraverserSet; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class RxJava<C, S, E> implements Processor<C, S, E> { + + private final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE); + private boolean executed = false; + private final TraverserSet<C, S> starts = new TraverserSet<>(); + private final TraverserSet<C, E> ends = new TraverserSet<>(); + private final Compilation<C, S, E> compilation; + + public RxJava(final Compilation<C, S, E> compilation) { + this.compilation = compilation; + } + + @Override + public void addStart(final Traverser<C, S> traverser) { + this.starts.add(traverser); + } + + @Override + public Traverser<C, E> next() { + this.prepareFlow(); + return this.ends.remove(); + } + + @Override + public boolean hasNext() { + this.prepareFlow(); + return !this.ends.isEmpty(); + } + + @Override + public void reset() { + this.starts.clear(); + this.ends.clear(); + this.executed = false; + } + + private void prepareFlow() { + if (!this.executed) { + this.executed = true; + TopologyUtil.compile(Flowable.fromIterable(this.starts), compilation). + doOnNext(this.ends::add). + doOnComplete(() -> this.alive.set(Boolean.FALSE)). + subscribe(); + } + if (!this.ends.isEmpty()) + return; + while (this.alive.get()) { + if (!this.ends.isEmpty()) + return; + } + } +} diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java new file mode 100644 index 0000000..a57954e --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava; + +import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; +import org.apache.tinkerpop.machine.processor.Processor; +import org.apache.tinkerpop.machine.processor.ProcessorFactory; +import org.apache.tinkerpop.machine.processor.rxjava.strategy.RxJavaStrategy; +import org.apache.tinkerpop.machine.strategy.Strategy; + +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class RxJavaProcessor implements ProcessorFactory { + @Override + public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> compilation) { + return new RxJava<>(compilation); + } + + @Override + public Set<Strategy<?>> getStrategies() { + return Set.of(new RxJavaStrategy()); + } +} diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java new file mode 100644 index 0000000..85d3290 --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava.strategy; + +import org.apache.tinkerpop.machine.bytecode.Bytecode; +import org.apache.tinkerpop.machine.bytecode.BytecodeUtil; +import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler; +import org.apache.tinkerpop.machine.processor.rxjava.RxJavaProcessor; +import org.apache.tinkerpop.machine.strategy.AbstractStrategy; +import org.apache.tinkerpop.machine.strategy.Strategy; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStrategy> implements Strategy.ProviderStrategy { + @Override + public <C> void apply(final Bytecode<C> bytecode) { + if (!BytecodeUtil.hasSourceInstruction(bytecode, CoreCompiler.Symbols.WITH_PROCESSOR)) { + bytecode.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class); + } + } +} diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java new file mode 100644 index 0000000..b6b0e4a --- /dev/null +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava.util; + +import io.reactivex.Flowable; +import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; +import org.apache.tinkerpop.machine.function.BranchFunction; +import org.apache.tinkerpop.machine.function.CFunction; +import org.apache.tinkerpop.machine.function.FilterFunction; +import org.apache.tinkerpop.machine.function.FlatMapFunction; +import org.apache.tinkerpop.machine.function.InitialFunction; +import org.apache.tinkerpop.machine.function.MapFunction; +import org.apache.tinkerpop.machine.function.ReduceFunction; +import org.apache.tinkerpop.machine.processor.rxjava.BranchFlow; +import org.apache.tinkerpop.machine.processor.rxjava.FilterFlow; +import org.apache.tinkerpop.machine.processor.rxjava.FlatMapFlow; +import org.apache.tinkerpop.machine.processor.rxjava.MapFlow; +import org.apache.tinkerpop.machine.processor.rxjava.ReduceFlow; +import org.apache.tinkerpop.machine.traverser.Traverser; +import org.apache.tinkerpop.machine.traverser.TraverserFactory; +import org.apache.tinkerpop.machine.util.IteratorUtils; +import org.reactivestreams.Publisher; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TopologyUtil { + + public static <C, S, E> Flowable<Traverser<C, E>> compile(final Flowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) { + final TraverserFactory<C> traverserFactory = compilation.getTraverserFactory(); + Flowable<Traverser<C, E>> sink = (Flowable) source; + for (final CFunction<C> function : compilation.getFunctions()) { + sink = TopologyUtil.extend(sink, function, traverserFactory); + } + return sink; + } + + /* + private final void stageInput() { + if (this.hasStartPredicates) { + final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove(); + if (1 == this.untilLocation) { + if (this.untilCompilation.filterTraverser(traverser)) { + this.outputTraversers.add(traverser); + } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser)) { + this.outputTraversers.add(traverser.repeatDone(this.repeatBranch)); + this.repeat.addTraverser(traverser); + } else + this.repeat.addTraverser(traverser); + } else if (1 == this.emitLocation) { + if (this.emitCompilation.filterTraverser(traverser)) + this.outputTraversers.add(traverser.repeatDone(this.repeatBranch)); + if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser)) + this.outputTraversers.add(traverser.repeatDone(this.repeatBranch)); + else + this.repeat.addTraverser(traverser); + } + } else { + this.repeat.addTraverser(this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove()); + } + } + + + */ + + private static <C, S, E, B> Flowable<Traverser<C, E>> extend(Flowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) { + if (function instanceof MapFunction) + return flow.map(new MapFlow<>((MapFunction<C, S, E>) function)); + else if (function instanceof FilterFunction) { + return (Flowable) flow.filter(new FilterFlow<>((FilterFunction<C, S>) function)); + } else if (function instanceof FlatMapFunction) { + return flow.flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function)); + } else if (function instanceof InitialFunction) { + return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s))); + } else if (function instanceof ReduceFunction) { + final ReduceFunction<C, S, E> reduceFunction = (ReduceFunction<C, S, E>) function; + return flow.reduce(traverserFactory.create(reduceFunction, reduceFunction.getInitialValue()), new ReduceFlow<>(reduceFunction)).toFlowable(); + } else if (function instanceof BranchFunction) { + final Flowable<List> selectorFlow = flow.map(new BranchFlow<>((BranchFunction<C, S, B>) function)); + final List<Publisher<Traverser<C, E>>> branchFlows = new ArrayList<>(); + for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches : ((BranchFunction<C, S, E>) function).getBranches().entrySet()) { + for (int i = 0; i < branches.getValue().size(); i++) { + final Compilation<C, S, E> branch = branches.getValue().get(i); + final int id = i; + branchFlows.add( + selectorFlow. + filter(list -> list.get(0).equals(null == branches.getKey() ? -1 : id)). + map(list -> (Traverser<C, S>) list.get(1)). + publish(f -> compile(f, branch))); + } + } + Flowable<Traverser<C, E>> sink = (Flowable) flow.filter(t -> false); // branches are the only outputs + for (final Publisher<Traverser<C, E>> branchFlow : branchFlows) { + sink = sink.mergeWith(branchFlow); + } + return sink; + } + throw new RuntimeException("Need a new execution plan step: " + function); + } +} \ No newline at end of file diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java new file mode 100644 index 0000000..8085bbc --- /dev/null +++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.machine.processor.rxjava; + +import org.apache.tinkerpop.language.gremlin.Gremlin; +import org.apache.tinkerpop.language.gremlin.P; +import org.apache.tinkerpop.language.gremlin.Traversal; +import org.apache.tinkerpop.language.gremlin.TraversalSource; +import org.apache.tinkerpop.language.gremlin.TraversalUtil; +import org.apache.tinkerpop.language.gremlin.common.__; +import org.apache.tinkerpop.machine.Machine; +import org.apache.tinkerpop.machine.coefficient.LongCoefficient; +import org.apache.tinkerpop.machine.species.LocalMachine; +import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy; +import org.junit.jupiter.api.Test; + +import static org.apache.tinkerpop.language.gremlin.common.__.incr; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class RxJavaTest { + + @Test + public void doStuff() { + final Machine machine = LocalMachine.open(); + final TraversalSource<Long> g = Gremlin.<Long>traversal(machine) + .withCoefficient(LongCoefficient.class) + .withProcessor(RxJavaProcessor.class) + .withStrategy(IdentityStrategy.class); + + Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(),__.<Long>incr().incr()); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + traversal = g.inject(1L).choose(__.is(1L), incr(),__.<Long>incr().incr()); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal.toList()); + System.out.println("\n----------\n"); + + /*traversal = g.inject(1L).until(__.is(P.lt(3L))).emit().repeat(incr()); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal); + System.out.println(TraversalUtil.getBytecode(traversal)); + System.out.println(traversal.toList()); + System.out.println("\n----------\n");*/ + } +}