This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new b46ffb1  was able to use TopologyUtil.compile() instead of 
Flowable.publish() to chain flows in rxJava.
b46ffb1 is described below

commit b46ffb10a8b2a7c0f4107900c822023159826faf
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Wed Apr 3 15:07:56 2019 -0600

    was able to use TopologyUtil.compile() instead of Flowable.publish() to 
chain flows in rxJava.
---
 .../tinkerpop/machine/processor/rxjava/util/TopologyUtil.java    | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
index a9bee79..71b95bd 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
@@ -77,11 +77,10 @@ public final class TopologyUtil {
                 for (int i = 0; i < branches.getValue().size(); i++) {
                     final Compilation<C, S, E> branch = 
branches.getValue().get(i);
                     final int id = i;
-                    branchFlows.add(
-                            selectorFlow.
+                    branchFlows.add(compile(selectorFlow.
                                     filter(list -> list.get(0).equals(null == 
branches.getKey() ? -1 : id)).
-                                    map(list -> (Traverser<C, S>) list.get(1)).
-                                    publish(f -> compile(f, branch)));
+                                    map(list -> (Traverser<C, S>) list.get(1)),
+                            branch));
                 }
             }
             Flowable<Traverser<C, E>> sink = (Flowable) flow.filter(t -> 
false); // branches are the only outputs
@@ -117,7 +116,7 @@ public final class TopologyUtil {
                     return list;
                 });
                 outputs.add(selectorFlow.filter(list -> 
list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
-                flow = selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)).publish(f -> 
compile(f, repeatBranch.getRepeat()));
+                flow = compile(selectorFlow.filter(list -> 
list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), 
repeatBranch.getRepeat());
                 selectorFlow = flow.flatMapIterable(t -> {
                     final List<List> list = new ArrayList<>();
                     if (repeatBranch.hasEndPredicates()) {

Reply via email to