This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new ec18e21  learned about Disposable. Got rid of the AtomicBoolean in 
AbstractRxJava. Both serial and parallel processors support real-time streaming 
of results.
ec18e21 is described below

commit ec18e214bee503fc3fe94a1557c14f63df395bd7
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Tue Apr 9 13:20:28 2019 -0600

    learned about Disposable. Got rid of the AtomicBoolean in AbstractRxJava. 
Both serial and parallel processors support real-time streaming of results.
---
 .../tinkerpop/machine/processor/rxjava/AbstractRxJava.java       | 5 +++--
 .../tinkerpop/machine/processor/rxjava/ParallelRxJava.java       | 9 +++++----
 .../apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java  | 8 +++-----
 3 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index 4da0657..22a6e9e 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.machine.processor.rxjava;
 
+import io.reactivex.disposables.Disposable;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
@@ -33,7 +34,7 @@ public abstract class AbstractRxJava<C, S, E> implements 
Processor<C, S, E> {
     static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic 
configuration
 
     boolean executed = false;
-    final AtomicBoolean alive = new AtomicBoolean(Boolean.FALSE);
+    Disposable disposable;
     final TraverserSet<C, S> starts = new TraverserSet<>();
     final TraverserSet<C, E> ends = new TraverserSet<>();
     final Compilation<C, S, E> compilation;
@@ -63,8 +64,8 @@ public abstract class AbstractRxJava<C, S, E> implements 
Processor<C, S, E> {
     public void reset() {
         this.starts.clear();
         this.ends.clear();
+        this.disposable = null;
         this.executed = false;
-        this.alive.set(Boolean.FALSE);
     }
 
     protected abstract void prepareFlow();
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index daa8bd4..8828e5e 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -63,22 +63,23 @@ public final class ParallelRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
-            this.alive.set(Boolean.TRUE);
-            this.compile(
+            this.disposable = this.compile(
                     ParallelFlowable.from(Flowable.fromIterable(this.starts)).
                             runOn(Schedulers.from(this.threadPool)), 
this.compilation).
                     doOnNext(this.ends::add).
                     sequential().
                     doFinally(() -> {
-                        this.alive.set(Boolean.FALSE);
                         if (null != this.bytecodeId) { // only the parent 
compilation should close the thread pool
                             this.threadPool.shutdown();
                             
RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId);
                         }
                     }).
-                    blockingSubscribe(); // thread this so results can be 
received before computation completes
+                    subscribe(); // don't block the execution so results can 
be streamed back in real-time
 
         }
+        while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
+            // only return if there is a result ready from the flow (or the 
flow is dead)
+        }
     }
 
     // EXECUTION PLAN COMPILER
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 183f043..87d2c97 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -52,13 +52,11 @@ public final class SerialRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
-            this.alive.set(Boolean.TRUE);
-            SerialRxJava.compile(Flowable.fromIterable(this.starts), 
this.compilation).
+            this.disposable = 
SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation).
                     doOnNext(this.ends::add).
-                    doFinally(() -> this.alive.set(Boolean.FALSE)).
-                    subscribe();
+                    subscribe(); // don't block the execution so results can 
be streamed back in real-time
         }
-        while (this.alive.get() && this.ends.isEmpty()) {
+        while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
             // only return if there is a result ready from the flow (or the 
flow is dead)
         }
     }

Reply via email to