This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new ec18e21 learned about Disposable. Got rid of the AtomicBoolean in AbstractRxJava. Both serial and parallel processors support real-time streaming of results. ec18e21 is described below commit ec18e214bee503fc3fe94a1557c14f63df395bd7 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Tue Apr 9 13:20:28 2019 -0600 learned about Disposable. Got rid of the AtomicBoolean in AbstractRxJava. Both serial and parallel processors support real-time streaming of results. --- .../tinkerpop/machine/processor/rxjava/AbstractRxJava.java | 5 +++-- .../tinkerpop/machine/processor/rxjava/ParallelRxJava.java | 9 +++++---- .../apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java | 8 +++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java index 4da0657..22a6e9e 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.machine.processor.rxjava; +import io.reactivex.disposables.Disposable; import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traverser.Traverser; @@ -33,7 +34,7 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> { static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic configuration boolean executed = false; - final AtomicBoolean alive = new AtomicBoolean(Boolean.FALSE); + Disposable disposable; final TraverserSet<C, S> starts = new TraverserSet<>(); final TraverserSet<C, E> ends = new TraverserSet<>(); final Compilation<C, S, E> compilation; @@ -63,8 +64,8 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> { public void reset() { this.starts.clear(); this.ends.clear(); + this.disposable = null; this.executed = false; - this.alive.set(Boolean.FALSE); } protected abstract void prepareFlow(); diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java index daa8bd4..8828e5e 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java @@ -63,22 +63,23 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> { protected void prepareFlow() { if (!this.executed) { this.executed = true; - this.alive.set(Boolean.TRUE); - this.compile( + this.disposable = this.compile( ParallelFlowable.from(Flowable.fromIterable(this.starts)). runOn(Schedulers.from(this.threadPool)), this.compilation). doOnNext(this.ends::add). sequential(). doFinally(() -> { - this.alive.set(Boolean.FALSE); if (null != this.bytecodeId) { // only the parent compilation should close the thread pool this.threadPool.shutdown(); RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId); } }). - blockingSubscribe(); // thread this so results can be received before computation completes + subscribe(); // don't block the execution so results can be streamed back in real-time } + while (!this.disposable.isDisposed() && this.ends.isEmpty()) { + // only return if there is a result ready from the flow (or the flow is dead) + } } // EXECUTION PLAN COMPILER diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java index 183f043..87d2c97 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java @@ -52,13 +52,11 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> { protected void prepareFlow() { if (!this.executed) { this.executed = true; - this.alive.set(Boolean.TRUE); - SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation). + this.disposable = SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation). doOnNext(this.ends::add). - doFinally(() -> this.alive.set(Boolean.FALSE)). - subscribe(); + subscribe(); // don't block the execution so results can be streamed back in real-time } - while (this.alive.get() && this.ends.isEmpty()) { + while (!this.disposable.isDisposed() && this.ends.isEmpty()) { // only return if there is a result ready from the flow (or the flow is dead) } }