This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new 51f8eb9 got ParallelFlowable really nice in rxJava. Had to do some thread safe work on TraverserSet. Having a weird socket issue where tests are closing and starting too fast before socket can be fully closed. Don't know why this creeped up. 51f8eb9 is described below commit 51f8eb9fdd906eba9336b7eb4eb4389dcd69e44a Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Mon Apr 8 09:54:06 2019 -0600 got ParallelFlowable really nice in rxJava. Had to do some thread safe work on TraverserSet. Having a weird socket issue where tests are closing and starting too fast before socket can be fully closed. Don't know why this creeped up. --- .../machine/function/branch/RepeatBranch.java | 4 +- .../machine/species/remote/MachineServer.java | 8 ++-- .../machine/species/remote/TraverserServer.java | 16 ++++++-- .../tinkerpop/machine/traverser/TraverserSet.java | 32 +++++++++------- .../apache/tinkerpop/machine/SimpleTestSuite.java | 2 +- .../machine/processor/rxjava/AbstractRxJava.java | 3 +- .../machine/processor/rxjava/FlatMapFlow.java | 6 +-- .../machine/processor/rxjava/ParallelRxJava.java | 44 +++++++++++----------- .../machine/processor/rxjava/RepeatEnd.java | 32 +++++++++------- .../machine/processor/rxjava/RepeatStart.java | 30 ++++++++------- .../machine/processor/rxjava/SerialRxJava.java | 11 ++++-- 11 files changed, 109 insertions(+), 79 deletions(-) diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java index eddb43b..378eb6d 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java @@ -112,8 +112,8 @@ public final class RepeatBranch<C, S> extends AbstractFunction<C> { public RepeatBranch<C, S> clone() { final RepeatBranch<C, S> clone = (RepeatBranch<C, S>) super.clone(); clone.repeatCompilation = this.repeatCompilation.clone(); - clone.emitCompilation = this.emitCompilation.clone(); - clone.untilCompilation = this.untilCompilation.clone(); + clone.emitCompilation = null == this.emitCompilation ? null : this.emitCompilation.clone(); + clone.untilCompilation = null == this.untilCompilation ? null : this.untilCompilation.clone(); return clone; } diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java index 8931f40..e041d83 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java @@ -41,7 +41,7 @@ public final class MachineServer implements AutoCloseable { private final int machineServerPort; private ServerSocket machineServerSocket; - private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE); + private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE); private final Machine machine = LocalMachine.open(); public MachineServer(final int machineServerPort) { @@ -51,6 +51,7 @@ public final class MachineServer implements AutoCloseable { private void run() { try { + this.serverAlive.set(Boolean.TRUE); this.machineServerSocket = new ServerSocket(this.machineServerPort); while (this.serverAlive.get()) { final Socket clientSocket = this.machineServerSocket.accept(); @@ -65,9 +66,10 @@ public final class MachineServer implements AutoCloseable { public void close() { if (this.serverAlive.get()) { try { - this.serverAlive.set(Boolean.FALSE); - this.machineServerSocket.close(); + if (null != this.machineServerSocket) + this.machineServerSocket.close(); this.machine.close(); + this.serverAlive.set(Boolean.FALSE); } catch (final IOException e) { throw new RuntimeException(e.getMessage(), e); } diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java index d647f96..4e61fff 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java @@ -38,7 +38,7 @@ public final class TraverserServer<C, S> implements AutoCloseable, Iterator<Trav private final TraverserSet<C, S> traverserSet = new TraverserSet<>(); private final int serverPort; private ServerSocket serverSocket; - private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE); + private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE); public TraverserServer(final int serverPort) { this.serverPort = serverPort; @@ -47,6 +47,7 @@ public final class TraverserServer<C, S> implements AutoCloseable, Iterator<Trav private void run() { try { + this.serverAlive.set(Boolean.TRUE); this.serverSocket = new ServerSocket(this.serverPort); while (this.serverAlive.get()) { final Socket clientSocket = this.serverSocket.accept(); @@ -73,15 +74,24 @@ public final class TraverserServer<C, S> implements AutoCloseable, Iterator<Trav @Override public Traverser<C, S> next() { - return this.traverserSet.remove(); + if (!this.traverserSet.isEmpty()) + return this.traverserSet.remove(); + else { + while (this.serverAlive.get()) { + if (!this.traverserSet.isEmpty()) + return this.traverserSet.remove(); + } + return this.traverserSet.remove(); + } } @Override public synchronized void close() { if (this.serverAlive.get()) { try { + if (null != this.serverSocket) + this.serverSocket.close(); this.serverAlive.set(Boolean.FALSE); - this.serverSocket.close(); } catch (final IOException e) { throw new RuntimeException(e.getMessage(), e); } diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java index 8c40dbb..7fdbf86 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java @@ -76,13 +76,15 @@ public final class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> imple @Override public boolean add(final Traverser<C, S> traverser) { - final Traverser<C, S> existing = this.map.get(traverser); - if (null == existing) { - this.map.put(traverser, traverser); - return true; - } else { - existing.coefficient().sum(traverser.coefficient()); - return false; + synchronized (this.map) { + final Traverser<C, S> existing = this.map.get(traverser); + if (null == existing) { + this.map.put(traverser, traverser); + return true; + } else { + existing.coefficient().sum(traverser.coefficient()); + return false; + } } } @@ -93,12 +95,14 @@ public final class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> imple @Override public Traverser<C, S> remove() { // pop, exception if empty - final Iterator<Traverser<C, S>> iterator = this.map.values().iterator(); - if (!iterator.hasNext()) - throw FastNoSuchElementException.instance(); - final Traverser<C, S> next = iterator.next(); - iterator.remove(); - return next; + synchronized (this.map) { + final Iterator<Traverser<C, S>> iterator = this.map.values().iterator(); + if (!iterator.hasNext()) + throw FastNoSuchElementException.instance(); + final Traverser<C, S> next = iterator.next(); + iterator.remove(); + return next; + } } @Override @@ -136,7 +140,7 @@ public final class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> imple return this.map.values().toString(); } - public void sort(final Comparator<Traverser<C,S>> comparator) { + public void sort(final Comparator<Traverser<C, S>> comparator) { final List<Traverser<C, S>> list = new ArrayList<>(this.map.size()); IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add); Collections.sort(list, comparator); diff --git a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java index b53577b..31177e4 100644 --- a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java +++ b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java @@ -235,7 +235,7 @@ public class SimpleTestSuite extends AbstractTestSuite<Long> { @Test void g_injectXlistX1_2_3XX_unfold_incr() { - verifyOrder(List.of(2L, 3L, 4L), + verify(List.of(2L, 3L, 4L), g.inject(List.of(1L, 2L, 3L)).unfold().incr()); } diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java index 9d05fd2..4da0657 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java @@ -32,8 +32,8 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> { static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic configuration - final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE); boolean executed = false; + final AtomicBoolean alive = new AtomicBoolean(Boolean.FALSE); final TraverserSet<C, S> starts = new TraverserSet<>(); final TraverserSet<C, E> ends = new TraverserSet<>(); final Compilation<C, S, E> compilation; @@ -64,6 +64,7 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> { this.starts.clear(); this.ends.clear(); this.executed = false; + this.alive.set(Boolean.FALSE); } protected abstract void prepareFlow(); diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java index 0a57f0d..2d1cd9b 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java @@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser; */ final class FlatMapFlow<C, S, E> implements Function<Traverser<C, S>, Iterable<Traverser<C, E>>> { - private FlatMapFunction<C, S, E> function; + private ThreadLocal<FlatMapFunction<C, S, E>> function; FlatMapFlow(final FlatMapFunction<C, S, E> function) { - this.function = function; + this.function = ThreadLocal.withInitial(() -> (FlatMapFunction) function.clone()); } @Override public Iterable<Traverser<C, E>> apply(final Traverser<C, S> traverser) { - return () -> traverser.flatMap(this.function); + return () -> traverser.flatMap(this.function.get()); } } diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java index a55ae4c..75bb398 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java @@ -49,6 +49,7 @@ import java.util.concurrent.Executors; public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> { private final int threads; + private ExecutorService threadPool; ParallelRxJava(final Compilation<C, S, E> compilation, final int threads) { super(compilation); @@ -58,43 +59,46 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> { @Override protected void prepareFlow() { if (!this.executed) { - ExecutorService threadPool = Executors.newFixedThreadPool(this.threads); this.executed = true; - ParallelRxJava.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(threadPool)), this.compilation). + this.alive.set(Boolean.TRUE); + this.threadPool = Executors.newFixedThreadPool(this.threads); + this.compile( + ParallelFlowable.from(Flowable.fromIterable(this.starts)). + runOn(Schedulers.from(this.threadPool)), this.compilation). doOnNext(this.ends::add). sequential(). doOnComplete(() -> this.alive.set(Boolean.FALSE)). - doFinally(threadPool::shutdown). - blockingSubscribe(); + doFinally(this.threadPool::shutdown). + blockingSubscribe(); // thread this so results can be received before computation completes } } // EXECUTION PLAN COMPILER - private static <C, S, E> ParallelFlowable<Traverser<C, E>> compile(final ParallelFlowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) { + private ParallelFlowable<Traverser<C, E>> compile(final ParallelFlowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) { final TraverserFactory<C> traverserFactory = compilation.getTraverserFactory(); ParallelFlowable<Traverser<C, E>> sink = (ParallelFlowable) source; for (final CFunction<C> function : compilation.getFunctions()) { - sink = ParallelRxJava.extend(sink, function, traverserFactory); + sink = this.extend((ParallelFlowable) sink, function, traverserFactory); } return sink; } - private static <C, S, E, B> ParallelFlowable<Traverser<C, E>> extend(ParallelFlowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) { + private <B> ParallelFlowable<Traverser<C, E>> extend(ParallelFlowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) { if (function instanceof MapFunction) return flow.map(new MapFlow<>((MapFunction<C, S, E>) function)); else if (function instanceof FilterFunction) { return (ParallelFlowable) flow.filter(new FilterFlow<>((FilterFunction<C, S>) function)); } else if (function instanceof FlatMapFunction) { - return flow.sequential().flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function)).parallel(); + return flow.sequential().flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function)).parallel().runOn(Schedulers.from(this.threadPool)); } else if (function instanceof InitialFunction) { - return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s))).parallel(); + return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s))).parallel().runOn(Schedulers.from(this.threadPool)); } else if (function instanceof ReduceFunction) { final ReduceFunction<C, S, E> reduceFunction = (ReduceFunction<C, S, E>) function; - return flow.sequential().reduce(traverserFactory.create(reduceFunction, reduceFunction.getInitialValue()), new Reducer<>(reduceFunction)).toFlowable().parallel(); + return flow.sequential().reduce(traverserFactory.create(reduceFunction, reduceFunction.getInitialValue()), new Reducer<>(reduceFunction)).toFlowable().parallel().runOn(Schedulers.from(this.threadPool)); } else if (function instanceof BarrierFunction) { final BarrierFunction<C, S, E, B> barrierFunction = (BarrierFunction<C, S, E, B>) function; - return flow.sequential().reduce(barrierFunction.getInitialValue(), new Barrier<>(barrierFunction)).toFlowable().flatMapIterable(new BarrierFlow<>(barrierFunction, traverserFactory)).parallel(); + return flow.sequential().reduce(barrierFunction.getInitialValue(), new Barrier<>(barrierFunction)).toFlowable().flatMapIterable(new BarrierFlow<>(barrierFunction, traverserFactory)).parallel(1); // order requires serial } else if (function instanceof BranchFunction) { final ParallelFlowable<List> selectorFlow = flow.map(new BranchFlow<>((BranchFunction<C, S, B>) function)); final List<Publisher<Traverser<C, E>>> branchFlows = new ArrayList<>(); @@ -103,31 +107,29 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> { final int branchId = null == branches.getKey() ? -1 : branchCounter; branchCounter++; for (final Compilation<C, S, E> branch : branches.getValue()) { - branchFlows.add(compile(selectorFlow. + branchFlows.add(this.compile(selectorFlow. filter(list -> list.get(0).equals(branchId)). map(list -> (Traverser<C, S>) list.get(1)), branch).sequential()); } } - return PublishProcessor.merge(branchFlows).parallel(); + return PublishProcessor.merge(branchFlows).parallel().runOn(Schedulers.from(this.threadPool)); } else if (function instanceof RepeatBranch) { final RepeatBranch<C, S> repeatBranch = (RepeatBranch<C, S>) function; final List<Publisher<Traverser<C, S>>> outputs = new ArrayList<>(); ParallelFlowable<List> selectorFlow; for (int i = 0; i < MAX_REPETITIONS; i++) { if (repeatBranch.hasStartPredicates()) { - selectorFlow = flow.sequential().flatMapIterable(new RepeatStart<>(repeatBranch)).parallel(); + selectorFlow = flow.sequential().flatMapIterable(new RepeatStart<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool)); outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)).sequential()); - flow = compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), repeatBranch.getRepeat()); - } else { - flow = compile(flow, repeatBranch.getRepeat()); - } - selectorFlow = flow.sequential().flatMapIterable(new RepeatEnd<>(repeatBranch)).parallel(); + flow = this.compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), (Compilation) repeatBranch.getRepeat()); + } else + flow = this.compile(flow, (Compilation) repeatBranch.getRepeat()); + selectorFlow = flow.sequential().flatMapIterable(new RepeatEnd<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool)); outputs.add(selectorFlow.sequential().filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1))); flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)); } - - return (ParallelFlowable) PublishProcessor.merge(outputs).parallel(); + return (ParallelFlowable) PublishProcessor.merge(outputs).parallel().runOn(Schedulers.from(this.threadPool)); } throw new RuntimeException("Need a new execution plan step: " + function); } diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java index 9086f32..0b19111 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java @@ -30,30 +30,30 @@ import java.util.List; */ public final class RepeatEnd<C, S> implements Function<Traverser<C, S>, List<List>> { - private final RepeatBranch<C, S> repeatBranch; + private final ThreadLocal<RepeatBranch<C, S>> repeatBranch; RepeatEnd(final RepeatBranch<C, S> repeatBranch) { - this.repeatBranch = repeatBranch; + this.repeatBranch = ThreadLocal.withInitial(repeatBranch::clone); } @Override public List<List> apply(final Traverser<C, S> traverser) { - final Traverser<C,S> t = traverser.repeatLoop(this.repeatBranch); + final Traverser<C, S> t = traverser.repeatLoop(this.getRepeatBranch()); final List<List> list = new ArrayList<>(); - if (this.repeatBranch.hasEndPredicates()) { - if (3 == this.repeatBranch.getUntilLocation()) { - if (this.repeatBranch.getUntil().filterTraverser(t)) { - list.add(List.of(0, t.repeatDone(this.repeatBranch))); - } else if (4 == this.repeatBranch.getEmitLocation() && this.repeatBranch.getEmit().filterTraverser(t)) { - list.add(List.of(0, t.repeatDone(this.repeatBranch))); + if (this.repeatBranch.get().hasEndPredicates()) { + if (3 == this.getRepeatBranch().getUntilLocation()) { + if (this.getRepeatBranch().getUntil().filterTraverser(t)) { + list.add(List.of(0, t.repeatDone(this.getRepeatBranch()))); + } else if (4 == this.getRepeatBranch().getEmitLocation() && this.getRepeatBranch().getEmit().filterTraverser(t)) { + list.add(List.of(0, t.repeatDone(this.getRepeatBranch()))); list.add(List.of(1, t)); } else list.add(List.of(1, t)); - } else if (3 == this.repeatBranch.getEmitLocation()) { - if (this.repeatBranch.getEmit().filterTraverser(t)) - list.add(List.of(0, t.repeatDone(this.repeatBranch))); - if (4 == this.repeatBranch.getUntilLocation() && this.repeatBranch.getUntil().filterTraverser(t)) - list.add(List.of(0, t.repeatDone(this.repeatBranch))); + } else if (3 == this.getRepeatBranch().getEmitLocation()) { + if (this.getRepeatBranch().getEmit().filterTraverser(t)) + list.add(List.of(0, t.repeatDone(this.getRepeatBranch()))); + if (4 == this.getRepeatBranch().getUntilLocation() && this.getRepeatBranch().getUntil().filterTraverser(t)) + list.add(List.of(0, t.repeatDone(this.getRepeatBranch()))); else list.add(List.of(1, t)); } @@ -61,4 +61,8 @@ public final class RepeatEnd<C, S> implements Function<Traverser<C, S>, List<Lis list.add(List.of(1, t)); return list; } + + private RepeatBranch<C, S> getRepeatBranch() { + return this.repeatBranch.get(); + } } diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java index 5dea785..0620efa 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java @@ -30,29 +30,29 @@ import java.util.List; */ public final class RepeatStart<C, S> implements Function<Traverser<C, S>, List<List>> { - private final RepeatBranch<C, S> repeatBranch; + private final ThreadLocal<RepeatBranch<C, S>> repeatBranch; RepeatStart(final RepeatBranch<C, S> repeatBranch) { - this.repeatBranch = repeatBranch; + this.repeatBranch = ThreadLocal.withInitial(repeatBranch::clone); } @Override public List<List> apply(final Traverser<C, S> traverser) { final List<List> list = new ArrayList<>(); - if (this.repeatBranch.hasStartPredicates()) { - if (1 == this.repeatBranch.getUntilLocation()) { - if (this.repeatBranch.getUntil().filterTraverser(traverser)) { - list.add(List.of(0, traverser.repeatDone(this.repeatBranch))); - } else if (2 == this.repeatBranch.getEmitLocation() && this.repeatBranch.getEmit().filterTraverser(traverser)) { + if (this.getRepeatBranch().hasStartPredicates()) { + if (1 == this.getRepeatBranch().getUntilLocation()) { + if (this.getRepeatBranch().getUntil().filterTraverser(traverser)) { + list.add(List.of(0, traverser.repeatDone(this.getRepeatBranch()))); + } else if (2 == this.getRepeatBranch().getEmitLocation() && this.getRepeatBranch().getEmit().filterTraverser(traverser)) { list.add(List.of(1, traverser)); - list.add(List.of(0, traverser.repeatDone(this.repeatBranch))); + list.add(List.of(0, traverser.repeatDone(this.getRepeatBranch()))); } else list.add(List.of(1, traverser)); - } else if (1 == this.repeatBranch.getEmitLocation()) { - if (this.repeatBranch.getEmit().filterTraverser(traverser)) - list.add(List.of(0, traverser.repeatDone(this.repeatBranch))); - if (2 == this.repeatBranch.getUntilLocation() && this.repeatBranch.getUntil().filterTraverser(traverser)) { - list.add(List.of(0, traverser.repeatDone(this.repeatBranch))); + } else if (1 == this.getRepeatBranch().getEmitLocation()) { + if (this.getRepeatBranch().getEmit().filterTraverser(traverser)) + list.add(List.of(0, traverser.repeatDone(this.getRepeatBranch()))); + if (2 == this.getRepeatBranch().getUntilLocation() && this.getRepeatBranch().getUntil().filterTraverser(traverser)) { + list.add(List.of(0, traverser.repeatDone(this.getRepeatBranch()))); } else list.add(List.of(1, traverser)); } @@ -60,4 +60,8 @@ public final class RepeatStart<C, S> implements Function<Traverser<C, S>, List<L list.add(List.of(1, traverser)); return list; } + + private RepeatBranch<C, S> getRepeatBranch() { + return this.repeatBranch.get(); + } } diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java index 40fc11e..5331ee3 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java @@ -44,7 +44,7 @@ import java.util.Map; */ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> { - public SerialRxJava(final Compilation<C, S, E> compilation) { + SerialRxJava(final Compilation<C, S, E> compilation) { super(compilation); } @@ -52,10 +52,14 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> { protected void prepareFlow() { if (!this.executed) { this.executed = true; + this.alive.set(Boolean.TRUE); SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation). doOnNext(this.ends::add). doOnComplete(() -> this.alive.set(Boolean.FALSE)). - blockingSubscribe(); + subscribe(); + } + while (this.alive.get() && this.ends.isEmpty()) { + // only return if there is a result ready from the flow (or the flow is dead) } } @@ -109,9 +113,8 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> { selectorFlow = flow.flatMapIterable(new RepeatStart<>(repeatBranch)); outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1))); flow = compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), repeatBranch.getRepeat()); - } else { + } else flow = compile(flow, repeatBranch.getRepeat()); - } selectorFlow = flow.flatMapIterable(new RepeatEnd<>(repeatBranch)); outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1))); flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));