This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 11ceb5b Figured out how to compile a Flow once and reuse it over and
over again. This is great for nested traversals where a single traverser is
inserted and result is returned and this happens over and over again for each
incoming traverser. By 'caching' the Flow, we save on compilation costs.
11ceb5b is described below
commit 11ceb5baa725384893aa05d5c71bbaadc9ee604d
Author: Marko A. Rodriguez
AuthorDate: Thu Apr 11 08:09:16 2019 -0600
Figured out how to compile a Flow once and reuse it over and over again.
This is great for nested traversals where a single traverser is inserted and
result is returned and this happens over and over again for each incoming
traverser. By 'caching' the Flow, we save on compilation costs.
---
.../apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java | 3 ++-
.../apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java | 5 -
.../org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java | 6 +-
3 files changed, 11 insertions(+), 3 deletions(-)
diff --git
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index c9b35c3..6c478f4 100644
---
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -60,9 +60,10 @@ public abstract class AbstractRxJava implements
Processor {
@Override
public void reset() {
+if (null != this.disposable)
+this.disposable.dispose();
this.starts.clear();
this.ends.clear();
-this.disposable = null;
this.executed = false;
}
diff --git
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index d278cbd..c1cda6f 100644
---
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -50,6 +50,7 @@ public final class ParallelRxJava extends
AbstractRxJava {
private final ExecutorService threadPool;
private final String bytecodeId;
+private final ParallelFlowable> flowable;
ParallelRxJava(final Compilation compilation, final
ExecutorService threadPool) {
super(compilation);
@@ -57,13 +58,15 @@ public final class ParallelRxJava extends
AbstractRxJava {
this.bytecodeId = compilation.getBytecode().getParent().isEmpty() ?
(String)
BytecodeUtil.getSourceInstructions(compilation.getBytecode(),
RxJavaProcessor.RX_ROOT_BYTECODE_ID).get(0).args()[0] :
null;
+// compile once and use many times
+this.flowable =
this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)),
this.compilation);
}
@Override
protected void prepareFlow() {
if (!this.executed) {
this.executed = true;
-this.disposable =
this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)),
this.compilation).
+this.disposable = this.flowable.
doOnNext(this.ends::add).
sequential().
doFinally(() -> {
diff --git
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index e68b2ad..5da1091 100644
---
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -45,15 +45,19 @@ import java.util.Map;
*/
public final class SerialRxJava extends AbstractRxJava {
+private final Flowable> flowable;
+
SerialRxJava(final Compilation compilation) {
super(compilation);
+// compile once and reuse many times
+this.flowable =
SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation);
}
@Override
protected void prepareFlow() {
if (!this.executed) {
this.executed = true;
-this.disposable =