This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 51f8eb9  got ParallelFlowable really nice in rxJava. Had to do some 
thread safe work on TraverserSet. Having a weird socket issue where tests are 
closing and starting too fast before socket can be fully closed. Don't know why 
this creeped up.
51f8eb9 is described below

commit 51f8eb9fdd906eba9336b7eb4eb4389dcd69e44a
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Mon Apr 8 09:54:06 2019 -0600

    got ParallelFlowable really nice in rxJava. Had to do some thread safe work 
on TraverserSet. Having a weird socket issue where tests are closing and 
starting too fast before socket can be fully closed. Don't know why this 
creeped up.
---
 .../machine/function/branch/RepeatBranch.java      |  4 +-
 .../machine/species/remote/MachineServer.java      |  8 ++--
 .../machine/species/remote/TraverserServer.java    | 16 ++++++--
 .../tinkerpop/machine/traverser/TraverserSet.java  | 32 +++++++++-------
 .../apache/tinkerpop/machine/SimpleTestSuite.java  |  2 +-
 .../machine/processor/rxjava/AbstractRxJava.java   |  3 +-
 .../machine/processor/rxjava/FlatMapFlow.java      |  6 +--
 .../machine/processor/rxjava/ParallelRxJava.java   | 44 +++++++++++-----------
 .../machine/processor/rxjava/RepeatEnd.java        | 32 +++++++++-------
 .../machine/processor/rxjava/RepeatStart.java      | 30 ++++++++-------
 .../machine/processor/rxjava/SerialRxJava.java     | 11 ++++--
 11 files changed, 109 insertions(+), 79 deletions(-)

diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
index eddb43b..378eb6d 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
@@ -112,8 +112,8 @@ public final class RepeatBranch<C, S> extends 
AbstractFunction<C> {
     public RepeatBranch<C, S> clone() {
         final RepeatBranch<C, S> clone = (RepeatBranch<C, S>) super.clone();
         clone.repeatCompilation = this.repeatCompilation.clone();
-        clone.emitCompilation = this.emitCompilation.clone();
-        clone.untilCompilation = this.untilCompilation.clone();
+        clone.emitCompilation = null == this.emitCompilation ? null : 
this.emitCompilation.clone();
+        clone.untilCompilation = null == this.untilCompilation ? null : 
this.untilCompilation.clone();
         return clone;
     }
 
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
index 8931f40..e041d83 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
@@ -41,7 +41,7 @@ public final class MachineServer implements AutoCloseable {
 
     private final int machineServerPort;
     private ServerSocket machineServerSocket;
-    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE);
+    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE);
     private final Machine machine = LocalMachine.open();
 
     public MachineServer(final int machineServerPort) {
@@ -51,6 +51,7 @@ public final class MachineServer implements AutoCloseable {
 
     private void run() {
         try {
+            this.serverAlive.set(Boolean.TRUE);
             this.machineServerSocket = new 
ServerSocket(this.machineServerPort);
             while (this.serverAlive.get()) {
                 final Socket clientSocket = this.machineServerSocket.accept();
@@ -65,9 +66,10 @@ public final class MachineServer implements AutoCloseable {
     public void close() {
         if (this.serverAlive.get()) {
             try {
-                this.serverAlive.set(Boolean.FALSE);
-                this.machineServerSocket.close();
+                if (null != this.machineServerSocket)
+                    this.machineServerSocket.close();
                 this.machine.close();
+                this.serverAlive.set(Boolean.FALSE);
             } catch (final IOException e) {
                 throw new RuntimeException(e.getMessage(), e);
             }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
index d647f96..4e61fff 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
@@ -38,7 +38,7 @@ public final class TraverserServer<C, S> implements 
AutoCloseable, Iterator<Trav
     private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
     private final int serverPort;
     private ServerSocket serverSocket;
-    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE);
+    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE);
 
     public TraverserServer(final int serverPort) {
         this.serverPort = serverPort;
@@ -47,6 +47,7 @@ public final class TraverserServer<C, S> implements 
AutoCloseable, Iterator<Trav
 
     private void run() {
         try {
+            this.serverAlive.set(Boolean.TRUE);
             this.serverSocket = new ServerSocket(this.serverPort);
             while (this.serverAlive.get()) {
                 final Socket clientSocket = this.serverSocket.accept();
@@ -73,15 +74,24 @@ public final class TraverserServer<C, S> implements 
AutoCloseable, Iterator<Trav
 
     @Override
     public Traverser<C, S> next() {
-        return this.traverserSet.remove();
+        if (!this.traverserSet.isEmpty())
+            return this.traverserSet.remove();
+        else {
+            while (this.serverAlive.get()) {
+                if (!this.traverserSet.isEmpty())
+                    return this.traverserSet.remove();
+            }
+            return this.traverserSet.remove();
+        }
     }
 
     @Override
     public synchronized void close() {
         if (this.serverAlive.get()) {
             try {
+                if (null != this.serverSocket)
+                    this.serverSocket.close();
                 this.serverAlive.set(Boolean.FALSE);
-                this.serverSocket.close();
             } catch (final IOException e) {
                 throw new RuntimeException(e.getMessage(), e);
             }
diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
index 8c40dbb..7fdbf86 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
@@ -76,13 +76,15 @@ public final class TraverserSet<C, S> extends 
AbstractSet<Traverser<C, S>> imple
 
     @Override
     public boolean add(final Traverser<C, S> traverser) {
-        final Traverser<C, S> existing = this.map.get(traverser);
-        if (null == existing) {
-            this.map.put(traverser, traverser);
-            return true;
-        } else {
-            existing.coefficient().sum(traverser.coefficient());
-            return false;
+        synchronized (this.map) {
+            final Traverser<C, S> existing = this.map.get(traverser);
+            if (null == existing) {
+                this.map.put(traverser, traverser);
+                return true;
+            } else {
+                existing.coefficient().sum(traverser.coefficient());
+                return false;
+            }
         }
     }
 
@@ -93,12 +95,14 @@ public final class TraverserSet<C, S> extends 
AbstractSet<Traverser<C, S>> imple
 
     @Override
     public Traverser<C, S> remove() {  // pop, exception if empty
-        final Iterator<Traverser<C, S>> iterator = 
this.map.values().iterator();
-        if (!iterator.hasNext())
-            throw FastNoSuchElementException.instance();
-        final Traverser<C, S> next = iterator.next();
-        iterator.remove();
-        return next;
+        synchronized (this.map) {
+            final Iterator<Traverser<C, S>> iterator = 
this.map.values().iterator();
+            if (!iterator.hasNext())
+                throw FastNoSuchElementException.instance();
+            final Traverser<C, S> next = iterator.next();
+            iterator.remove();
+            return next;
+        }
     }
 
     @Override
@@ -136,7 +140,7 @@ public final class TraverserSet<C, S> extends 
AbstractSet<Traverser<C, S>> imple
         return this.map.values().toString();
     }
 
-    public void sort(final Comparator<Traverser<C,S>> comparator) {
+    public void sort(final Comparator<Traverser<C, S>> comparator) {
         final List<Traverser<C, S>> list = new ArrayList<>(this.map.size());
         
IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add);
         Collections.sort(list, comparator);
diff --git 
a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
 
b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
index b53577b..31177e4 100644
--- 
a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
+++ 
b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
@@ -235,7 +235,7 @@ public class SimpleTestSuite extends 
AbstractTestSuite<Long> {
 
     @Test
     void g_injectXlistX1_2_3XX_unfold_incr() {
-        verifyOrder(List.of(2L, 3L, 4L),
+        verify(List.of(2L, 3L, 4L),
                 g.inject(List.of(1L, 2L, 3L)).unfold().incr());
     }
 
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index 9d05fd2..4da0657 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -32,8 +32,8 @@ public abstract class AbstractRxJava<C, S, E> implements 
Processor<C, S, E> {
 
     static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic 
configuration
 
-    final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE);
     boolean executed = false;
+    final AtomicBoolean alive = new AtomicBoolean(Boolean.FALSE);
     final TraverserSet<C, S> starts = new TraverserSet<>();
     final TraverserSet<C, E> ends = new TraverserSet<>();
     final Compilation<C, S, E> compilation;
@@ -64,6 +64,7 @@ public abstract class AbstractRxJava<C, S, E> implements 
Processor<C, S, E> {
         this.starts.clear();
         this.ends.clear();
         this.executed = false;
+        this.alive.set(Boolean.FALSE);
     }
 
     protected abstract void prepareFlow();
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
index 0a57f0d..2d1cd9b 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
  */
 final class FlatMapFlow<C, S, E> implements Function<Traverser<C, S>, 
Iterable<Traverser<C, E>>> {
 
-    private FlatMapFunction<C, S, E> function;
+    private ThreadLocal<FlatMapFunction<C, S, E>> function;
 
     FlatMapFlow(final FlatMapFunction<C, S, E> function) {
-        this.function = function;
+        this.function = ThreadLocal.withInitial(() -> (FlatMapFunction) 
function.clone());
     }
 
     @Override
     public Iterable<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
-        return () -> traverser.flatMap(this.function);
+        return () -> traverser.flatMap(this.function.get());
     }
 }
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index a55ae4c..75bb398 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -49,6 +49,7 @@ import java.util.concurrent.Executors;
 public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
     private final int threads;
+    private ExecutorService threadPool;
 
     ParallelRxJava(final Compilation<C, S, E> compilation, final int threads) {
         super(compilation);
@@ -58,43 +59,46 @@ public final class ParallelRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
     @Override
     protected void prepareFlow() {
         if (!this.executed) {
-            ExecutorService threadPool = 
Executors.newFixedThreadPool(this.threads);
             this.executed = true;
-            
ParallelRxJava.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(threadPool)),
 this.compilation).
+            this.alive.set(Boolean.TRUE);
+            this.threadPool = Executors.newFixedThreadPool(this.threads);
+            this.compile(
+                    ParallelFlowable.from(Flowable.fromIterable(this.starts)).
+                            runOn(Schedulers.from(this.threadPool)), 
this.compilation).
                     doOnNext(this.ends::add).
                     sequential().
                     doOnComplete(() -> this.alive.set(Boolean.FALSE)).
-                    doFinally(threadPool::shutdown).
-                    blockingSubscribe();
+                    doFinally(this.threadPool::shutdown).
+                    blockingSubscribe(); // thread this so results can be 
received before computation completes
         }
     }
 
     // EXECUTION PLAN COMPILER
 
-    private static <C, S, E> ParallelFlowable<Traverser<C, E>> compile(final 
ParallelFlowable<Traverser<C, S>> source, final Compilation<C, S, E> 
compilation) {
+    private ParallelFlowable<Traverser<C, E>> compile(final 
ParallelFlowable<Traverser<C, S>> source, final Compilation<C, S, E> 
compilation) {
         final TraverserFactory<C> traverserFactory = 
compilation.getTraverserFactory();
         ParallelFlowable<Traverser<C, E>> sink = (ParallelFlowable) source;
         for (final CFunction<C> function : compilation.getFunctions()) {
-            sink = ParallelRxJava.extend(sink, function, traverserFactory);
+            sink = this.extend((ParallelFlowable) sink, function, 
traverserFactory);
         }
         return sink;
     }
 
-    private static <C, S, E, B> ParallelFlowable<Traverser<C, E>> 
extend(ParallelFlowable<Traverser<C, S>> flow, final CFunction<C> function, 
final TraverserFactory<C> traverserFactory) {
+    private <B> ParallelFlowable<Traverser<C, E>> 
extend(ParallelFlowable<Traverser<C, S>> flow, final CFunction<C> function, 
final TraverserFactory<C> traverserFactory) {
         if (function instanceof MapFunction)
             return flow.map(new MapFlow<>((MapFunction<C, S, E>) function));
         else if (function instanceof FilterFunction) {
             return (ParallelFlowable) flow.filter(new 
FilterFlow<>((FilterFunction<C, S>) function));
         } else if (function instanceof FlatMapFunction) {
-            return flow.sequential().flatMapIterable(new 
FlatMapFlow<>((FlatMapFunction<C, S, E>) function)).parallel();
+            return flow.sequential().flatMapIterable(new 
FlatMapFlow<>((FlatMapFunction<C, S, E>) 
function)).parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof InitialFunction) {
-            return Flowable.fromIterable(() -> 
IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> 
traverserFactory.create(function, s))).parallel();
+            return Flowable.fromIterable(() -> 
IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> 
traverserFactory.create(function, 
s))).parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof ReduceFunction) {
             final ReduceFunction<C, S, E> reduceFunction = (ReduceFunction<C, 
S, E>) function;
-            return 
flow.sequential().reduce(traverserFactory.create(reduceFunction, 
reduceFunction.getInitialValue()), new 
Reducer<>(reduceFunction)).toFlowable().parallel();
+            return 
flow.sequential().reduce(traverserFactory.create(reduceFunction, 
reduceFunction.getInitialValue()), new 
Reducer<>(reduceFunction)).toFlowable().parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof BarrierFunction) {
             final BarrierFunction<C, S, E, B> barrierFunction = 
(BarrierFunction<C, S, E, B>) function;
-            return flow.sequential().reduce(barrierFunction.getInitialValue(), 
new Barrier<>(barrierFunction)).toFlowable().flatMapIterable(new 
BarrierFlow<>(barrierFunction, traverserFactory)).parallel();
+            return flow.sequential().reduce(barrierFunction.getInitialValue(), 
new Barrier<>(barrierFunction)).toFlowable().flatMapIterable(new 
BarrierFlow<>(barrierFunction, traverserFactory)).parallel(1); // order 
requires serial
         } else if (function instanceof BranchFunction) {
             final ParallelFlowable<List> selectorFlow = flow.map(new 
BranchFlow<>((BranchFunction<C, S, B>) function));
             final List<Publisher<Traverser<C, E>>> branchFlows = new 
ArrayList<>();
@@ -103,31 +107,29 @@ public final class ParallelRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
                 final int branchId = null == branches.getKey() ? -1 : 
branchCounter;
                 branchCounter++;
                 for (final Compilation<C, S, E> branch : branches.getValue()) {
-                    branchFlows.add(compile(selectorFlow.
+                    branchFlows.add(this.compile(selectorFlow.
                                     filter(list -> 
list.get(0).equals(branchId)).
                                     map(list -> (Traverser<C, S>) list.get(1)),
                             branch).sequential());
                 }
             }
-            return PublishProcessor.merge(branchFlows).parallel();
+            return 
PublishProcessor.merge(branchFlows).parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof RepeatBranch) {
             final RepeatBranch<C, S> repeatBranch = (RepeatBranch<C, S>) 
function;
             final List<Publisher<Traverser<C, S>>> outputs = new ArrayList<>();
             ParallelFlowable<List> selectorFlow;
             for (int i = 0; i < MAX_REPETITIONS; i++) {
                 if (repeatBranch.hasStartPredicates()) {
-                    selectorFlow = flow.sequential().flatMapIterable(new 
RepeatStart<>(repeatBranch)).parallel();
+                    selectorFlow = flow.sequential().flatMapIterable(new 
RepeatStart<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool));
                     outputs.add(selectorFlow.filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)).sequential());
-                    flow = compile(selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), 
repeatBranch.getRepeat());
-                } else {
-                    flow = compile(flow, repeatBranch.getRepeat());
-                }
-                selectorFlow = flow.sequential().flatMapIterable(new 
RepeatEnd<>(repeatBranch)).parallel();
+                    flow = this.compile(selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), 
(Compilation) repeatBranch.getRepeat());
+                } else
+                    flow = this.compile(flow, (Compilation) 
repeatBranch.getRepeat());
+                selectorFlow = flow.sequential().flatMapIterable(new 
RepeatEnd<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool));
                 outputs.add(selectorFlow.sequential().filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
                 flow = selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));
             }
-
-            return (ParallelFlowable) 
PublishProcessor.merge(outputs).parallel();
+            return (ParallelFlowable) 
PublishProcessor.merge(outputs).parallel().runOn(Schedulers.from(this.threadPool));
         }
         throw new RuntimeException("Need a new execution plan step: " + 
function);
     }
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
index 9086f32..0b19111 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
@@ -30,30 +30,30 @@ import java.util.List;
  */
 public final class RepeatEnd<C, S> implements Function<Traverser<C, S>, 
List<List>> {
 
-    private final RepeatBranch<C, S> repeatBranch;
+    private final ThreadLocal<RepeatBranch<C, S>> repeatBranch;
 
     RepeatEnd(final RepeatBranch<C, S> repeatBranch) {
-        this.repeatBranch = repeatBranch;
+        this.repeatBranch = ThreadLocal.withInitial(repeatBranch::clone);
     }
 
     @Override
     public List<List> apply(final Traverser<C, S> traverser) {
-        final Traverser<C,S> t = traverser.repeatLoop(this.repeatBranch);
+        final Traverser<C, S> t = traverser.repeatLoop(this.getRepeatBranch());
         final List<List> list = new ArrayList<>();
-        if (this.repeatBranch.hasEndPredicates()) {
-            if (3 == this.repeatBranch.getUntilLocation()) {
-                if (this.repeatBranch.getUntil().filterTraverser(t)) {
-                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
-                } else if (4 == this.repeatBranch.getEmitLocation() && 
this.repeatBranch.getEmit().filterTraverser(t)) {
-                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
+        if (this.repeatBranch.get().hasEndPredicates()) {
+            if (3 == this.getRepeatBranch().getUntilLocation()) {
+                if (this.getRepeatBranch().getUntil().filterTraverser(t)) {
+                    list.add(List.of(0, t.repeatDone(this.getRepeatBranch())));
+                } else if (4 == this.getRepeatBranch().getEmitLocation() && 
this.getRepeatBranch().getEmit().filterTraverser(t)) {
+                    list.add(List.of(0, t.repeatDone(this.getRepeatBranch())));
                     list.add(List.of(1, t));
                 } else
                     list.add(List.of(1, t));
-            } else if (3 == this.repeatBranch.getEmitLocation()) {
-                if (this.repeatBranch.getEmit().filterTraverser(t))
-                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
-                if (4 == this.repeatBranch.getUntilLocation() && 
this.repeatBranch.getUntil().filterTraverser(t))
-                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
+            } else if (3 == this.getRepeatBranch().getEmitLocation()) {
+                if (this.getRepeatBranch().getEmit().filterTraverser(t))
+                    list.add(List.of(0, t.repeatDone(this.getRepeatBranch())));
+                if (4 == this.getRepeatBranch().getUntilLocation() && 
this.getRepeatBranch().getUntil().filterTraverser(t))
+                    list.add(List.of(0, t.repeatDone(this.getRepeatBranch())));
                 else
                     list.add(List.of(1, t));
             }
@@ -61,4 +61,8 @@ public final class RepeatEnd<C, S> implements 
Function<Traverser<C, S>, List<Lis
             list.add(List.of(1, t));
         return list;
     }
+
+    private RepeatBranch<C, S> getRepeatBranch() {
+        return this.repeatBranch.get();
+    }
 }
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
index 5dea785..0620efa 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
@@ -30,29 +30,29 @@ import java.util.List;
  */
 public final class RepeatStart<C, S> implements Function<Traverser<C, S>, 
List<List>> {
 
-    private final RepeatBranch<C, S> repeatBranch;
+    private final ThreadLocal<RepeatBranch<C, S>> repeatBranch;
 
     RepeatStart(final RepeatBranch<C, S> repeatBranch) {
-        this.repeatBranch = repeatBranch;
+        this.repeatBranch = ThreadLocal.withInitial(repeatBranch::clone);
     }
 
     @Override
     public List<List> apply(final Traverser<C, S> traverser) {
         final List<List> list = new ArrayList<>();
-        if (this.repeatBranch.hasStartPredicates()) {
-            if (1 == this.repeatBranch.getUntilLocation()) {
-                if (this.repeatBranch.getUntil().filterTraverser(traverser)) {
-                    list.add(List.of(0, 
traverser.repeatDone(this.repeatBranch)));
-                } else if (2 == this.repeatBranch.getEmitLocation() && 
this.repeatBranch.getEmit().filterTraverser(traverser)) {
+        if (this.getRepeatBranch().hasStartPredicates()) {
+            if (1 == this.getRepeatBranch().getUntilLocation()) {
+                if 
(this.getRepeatBranch().getUntil().filterTraverser(traverser)) {
+                    list.add(List.of(0, 
traverser.repeatDone(this.getRepeatBranch())));
+                } else if (2 == this.getRepeatBranch().getEmitLocation() && 
this.getRepeatBranch().getEmit().filterTraverser(traverser)) {
                     list.add(List.of(1, traverser));
-                    list.add(List.of(0, 
traverser.repeatDone(this.repeatBranch)));
+                    list.add(List.of(0, 
traverser.repeatDone(this.getRepeatBranch())));
                 } else
                     list.add(List.of(1, traverser));
-            } else if (1 == this.repeatBranch.getEmitLocation()) {
-                if (this.repeatBranch.getEmit().filterTraverser(traverser))
-                    list.add(List.of(0, 
traverser.repeatDone(this.repeatBranch)));
-                if (2 == this.repeatBranch.getUntilLocation() && 
this.repeatBranch.getUntil().filterTraverser(traverser)) {
-                    list.add(List.of(0, 
traverser.repeatDone(this.repeatBranch)));
+            } else if (1 == this.getRepeatBranch().getEmitLocation()) {
+                if 
(this.getRepeatBranch().getEmit().filterTraverser(traverser))
+                    list.add(List.of(0, 
traverser.repeatDone(this.getRepeatBranch())));
+                if (2 == this.getRepeatBranch().getUntilLocation() && 
this.getRepeatBranch().getUntil().filterTraverser(traverser)) {
+                    list.add(List.of(0, 
traverser.repeatDone(this.getRepeatBranch())));
                 } else
                     list.add(List.of(1, traverser));
             }
@@ -60,4 +60,8 @@ public final class RepeatStart<C, S> implements 
Function<Traverser<C, S>, List<L
             list.add(List.of(1, traverser));
         return list;
     }
+
+    private RepeatBranch<C, S> getRepeatBranch() {
+        return this.repeatBranch.get();
+    }
 }
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 40fc11e..5331ee3 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -44,7 +44,7 @@ import java.util.Map;
  */
 public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
-    public SerialRxJava(final Compilation<C, S, E> compilation) {
+    SerialRxJava(final Compilation<C, S, E> compilation) {
         super(compilation);
     }
 
@@ -52,10 +52,14 @@ public final class SerialRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
+            this.alive.set(Boolean.TRUE);
             SerialRxJava.compile(Flowable.fromIterable(this.starts), 
this.compilation).
                     doOnNext(this.ends::add).
                     doOnComplete(() -> this.alive.set(Boolean.FALSE)).
-                    blockingSubscribe();
+                    subscribe();
+        }
+        while (this.alive.get() && this.ends.isEmpty()) {
+            // only return if there is a result ready from the flow (or the 
flow is dead)
         }
     }
 
@@ -109,9 +113,8 @@ public final class SerialRxJava<C, S, E> extends 
AbstractRxJava<C, S, E> {
                     selectorFlow = flow.flatMapIterable(new 
RepeatStart<>(repeatBranch));
                     outputs.add(selectorFlow.filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
                     flow = compile(selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), 
repeatBranch.getRepeat());
-                } else {
+                } else
                     flow = compile(flow, repeatBranch.getRepeat());
-                }
                 selectorFlow = flow.flatMapIterable(new 
RepeatEnd<>(repeatBranch));
                 outputs.add(selectorFlow.filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
                 flow = selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));

Reply via email to