This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new b90f19d  did some local testing to determine whether or not 
SerialRxJava was returning results in real-time. It wasn't. Learned about 
subscribeOn(thread) method to ensure that the subscriber of the flowable is not 
in the same thread as the executor. Works.
b90f19d is described below

commit b90f19dddf8f467e835219930e620910feda8a26
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Wed Apr 10 17:48:04 2019 -0600

    did some local testing to determine whether or not SerialRxJava was 
returning results in real-time. It wasn't. Learned about subscribeOn(thread) 
method to ensure that the subscriber of the flowable is not in the same thread 
as the executor. Works.
---
 .../org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java  | 2 +-
 .../org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java    | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 5a6c26a..d278cbd 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -72,7 +72,7 @@ public final class ParallelRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
                             
RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId);
                         }
                     }).
-                    subscribe(); // don't block the execution so results can 
be streamed back in real-time
+                    subscribeOn(Schedulers.newThread()).subscribe(); // don't 
block the execution so results can be streamed back in real-time
 
         }
         this.waitForCompletionOrResult();
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index c6d2f41..e68b2ad 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.machine.processor.rxjava;
 
 import io.reactivex.Flowable;
 import io.reactivex.processors.PublishProcessor;
+import io.reactivex.schedulers.Schedulers;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.function.BarrierFunction;
 import org.apache.tinkerpop.machine.function.BranchFunction;
@@ -54,7 +55,7 @@ public final class SerialRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
             this.executed = true;
             this.disposable = 
SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation).
                     doOnNext(this.ends::add).
-                    subscribe(); // don't block the execution so results can 
be streamed back in real-time
+                    subscribeOn(Schedulers.newThread()).subscribe(); // don't 
block the execution so results can be streamed back in real-time
         }
         this.waitForCompletionOrResult();
     }

Reply via email to