This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new b90f19d did some local testing to determine whether or not SerialRxJava was returning results in real-time. It wasn't. Learned about subscribeOn(thread) method to ensure that the subscriber of the flowable is not in the same thread as the executor. Works. b90f19d is described below commit b90f19dddf8f467e835219930e620910feda8a26 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Wed Apr 10 17:48:04 2019 -0600 did some local testing to determine whether or not SerialRxJava was returning results in real-time. It wasn't. Learned about subscribeOn(thread) method to ensure that the subscriber of the flowable is not in the same thread as the executor. Works. --- .../org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java | 2 +- .../org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java index 5a6c26a..d278cbd 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java @@ -72,7 +72,7 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> { RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId); } }). - subscribe(); // don't block the execution so results can be streamed back in real-time + subscribeOn(Schedulers.newThread()).subscribe(); // don't block the execution so results can be streamed back in real-time } this.waitForCompletionOrResult(); diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java index c6d2f41..e68b2ad 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.processor.rxjava; import io.reactivex.Flowable; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; import org.apache.tinkerpop.machine.function.BarrierFunction; import org.apache.tinkerpop.machine.function.BranchFunction; @@ -54,7 +55,7 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> { this.executed = true; this.disposable = SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation). doOnNext(this.ends::add). - subscribe(); // don't block the execution so results can be streamed back in real-time + subscribeOn(Schedulers.newThread()).subscribe(); // don't block the execution so results can be streamed back in real-time } this.waitForCompletionOrResult(); }