[tinkerpop] branch tp4 updated: fixed a bug in Pipes.subscribe().
This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git The following commit(s) were added to refs/heads/tp4 by this push: new 576f880 fixed a bug in Pipes.subscribe(). 576f880 is described below commit 576f880e5918b0912fa8b9bdf531d24e116be3ba Author: Marko A. Rodriguez AuthorDate: Tue Apr 23 06:54:50 2019 -0600 fixed a bug in Pipes.subscribe(). --- .../src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java | 2 +- .../main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java index 20007c4..0bb39fe 100644 --- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java +++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java @@ -89,7 +89,7 @@ public class Beam implements Processor { @Override public void stop() { try { -if (null != this.pipelineResult) +if (this.isRunning()) this.pipelineResult.cancel(); } catch (final IOException e) { throw new RuntimeException(e.getMessage(), e); diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java index 317659f..cc5a4cf 100644 --- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java +++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java @@ -115,7 +115,6 @@ public final class Pipes implements Processor { if (this.isRunning()) throw Processor.Exceptions.processorIsCurrentlyRunning(this); -this.alive.set(Boolean.TRUE); new Thread(() -> { final Iterator> iterator = this.iterator(starts); while (iterator.hasNext()) {
[tinkerpop] branch tp4 updated: some reorg on RxJava.
This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git The following commit(s) were added to refs/heads/tp4 by this push: new d9a9863 some reorg on RxJava. d9a9863 is described below commit d9a986302a487359ebb42387bfb7a3a851ffcbca Author: Marko A. Rodriguez AuthorDate: Tue Apr 23 05:02:31 2019 -0600 some reorg on RxJava. --- .../tinkerpop/machine/processor/pipes/Pipes.java | 1 - .../machine/processor/rxjava/AbstractRxJava.java | 23 +- .../machine/processor/rxjava/ParallelRxJava.java | 5 +++-- .../machine/processor/rxjava/SerialRxJava.java | 5 +++-- 4 files changed, 15 insertions(+), 19 deletions(-) diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java index 9f3c7c8..317659f 100644 --- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java +++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java @@ -100,7 +100,6 @@ public final class Pipes implements Processor { @Override public Iterator> iterator(final Iterator> starts) { - if (this.isRunning()) throw Processor.Exceptions.processorIsCurrentlyRunning(this); diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java index d16a4e1..9d88651 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java @@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation; import org.apache.tinkerpop.machine.processor.Processor; import org.apache.tinkerpop.machine.traverser.Traverser; import org.apache.tinkerpop.machine.traverser.TraverserSet; +import org.apache.tinkerpop.machine.util.IteratorUtils; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,15 +64,8 @@ public abstract class AbstractRxJava implements Processor { @Override public Iterator> iterator(final Iterator> starts) { -if (this.isRunning()) -throw Processor.Exceptions.processorIsCurrentlyRunning(this); - -this.running.set(Boolean.TRUE); -this.starts.clear(); -this.ends.clear(); -starts.forEachRemaining(this.starts::add); -this.prepareFlow(this.ends::add); -return new Iterator<>() { +this.prepareFlow(starts, this.ends::add); +return IteratorUtils.onLast(new Iterator<>() { @Override public boolean hasNext() { waitForCompletionOrResult(); @@ -83,11 +77,15 @@ public abstract class AbstractRxJava implements Processor { waitForCompletionOrResult(); return ends.remove(); } -}; +}, this::stop); } @Override public void subscribe(final Iterator> starts, final Consumer> consumer) { +this.prepareFlow(starts, consumer::accept); +} + +protected void prepareFlow(final Iterator> starts, final io.reactivex.functions.Consumer> consumer) { if (this.isRunning()) throw Processor.Exceptions.processorIsCurrentlyRunning(this); @@ -95,13 +93,10 @@ public abstract class AbstractRxJava implements Processor { this.starts.clear(); this.ends.clear(); starts.forEachRemaining(this.starts::add); -this.prepareFlow(consumer::accept); } -protected abstract void prepareFlow(final io.reactivex.functions.Consumer> consumer); - private void waitForCompletionOrResult() { -while (this.ends.isEmpty() && this.isRunning()) { +while (this.ends.isEmpty() && this.running.get()) { // wait until either the flow is complete or there is a traverser result } } diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java index b0d92b1..3aac6a2 100644 --- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java +++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java @@ -34,13 +34,13 @@ import org.apache.tinkerpop.machine.function.InitialFunction; import org.apache.tinkerpop.machine.function.MapFunction; import
[tinkerpop] branch tp4 updated: Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't don
This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git The following commit(s) were added to refs/heads/tp4 by this push: new 2cf7cdf Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't done this properly for Beam as I need to rework TraverserServer. Also, need to think about how to use Compilation within a push-based system so that local/nested traversals don't block on iterator.next(). 2cf7cdf is described below commit 2cf7cdfed05234a22b29a124c6617fb45bf5e7de Author: Marko A. Rodriguez AuthorDate: Tue Apr 23 04:38:15 2019 -0600 Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't done this properly for Beam as I need to rework TraverserServer. Also, need to think about how to use Compilation within a push-based system so that local/nested traversals don't block on iterator.next(). --- .../machine/bytecode/compiler/Compilation.java | 39 +++-- .../machine/processor/EmptyProcessor.java | 16 -- .../machine/processor/FilterProcessor.java | 7 ++- .../machine/processor/LoopsProcessor.java | 5 +- .../tinkerpop/machine/processor/Processor.java | 45 +-- .../machine/processor/SimpleProcessor.java | 44 +++ .../tinkerpop/machine/species/BasicMachine.java| 3 +- .../tinkerpop/machine/species/LocalMachine.java| 5 +- .../tinkerpop/machine/util/IteratorUtils.java | 35 +++- .../tinkerpop/machine/processor/beam/Beam.java | 43 ++ .../machine/processor/pipes/BranchStep.java| 4 +- .../tinkerpop/machine/processor/pipes/Pipes.java | 45 +++ .../machine/processor/pipes/RepeatStep.java| 22 +--- .../machine/processor/rxjava/AbstractRxJava.java | 65 -- .../machine/processor/rxjava/ParallelRxJava.java | 31 +-- .../machine/processor/rxjava/SerialRxJava.java | 15 +++-- 16 files changed, 305 insertions(+), 119 deletions(-) diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java index 32a04e4..f7105d9 100644 --- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java +++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.machine.structure.EmptyStructure; import org.apache.tinkerpop.machine.structure.StructureFactory; import org.apache.tinkerpop.machine.traverser.Traverser; import org.apache.tinkerpop.machine.traverser.TraverserFactory; +import org.apache.tinkerpop.machine.util.IteratorUtils; import java.io.Serializable; import java.util.ArrayList; @@ -75,53 +76,51 @@ public final class Compilation implements Serializable, Cloneable { } public Processor getProcessor() { -if (null == this.processor) -this.processor = this.processorFactory.mint(this); +this.prepareProcessor(); return this.processor; } private void prepareProcessor() { if (null == this.processor) this.processor = this.processorFactory.mint(this); -else -this.processor.reset(); } -private Traverser prepareTraverser(final Traverser traverser) { +private Iterator> prepareTraverser(final Traverser traverser) { final Traverser clone = traverser.clone(); clone.coefficient().unity(); -return clone; +return IteratorUtils.of(clone); } public Traverser mapTraverser(final Traverser traverser) { this.prepareProcessor(); -this.processor.addStart(this.prepareTraverser(traverser)); -if (!this.processor.hasNext()) +final Iterator> iterator = this.processor.iterator(this.prepareTraverser(traverser)); +if (!iterator.hasNext()) throw new RuntimeException("The nested traversal is not a map function: " + this); -return this.processor.next(); +final Traverser result = iterator.next(); +this.processor.stop(); +return result; } public Traverser mapObject(final S object) { this.prepareProcessor(); - this.processor.addStart(this.traverserFactory.create(this.functions.get(0), object)); -return this.processor.next(); +final Iterator> iterator = this.processor.iterator(this.prepareTraverser(this.traverserFactory.create(this.functions.get(0), object))); +final Traverser result = iterator.next(); +
[GitHub] [tinkerpop] jorgebay commented on issue #1098: Allow to configure processor in Gremlin-JavaScript
jorgebay commented on issue #1098: Allow to configure processor in Gremlin-JavaScript URL: https://github.com/apache/tinkerpop/pull/1098#issuecomment-485715138 Thank for your contribution, good idea to cover it with unit tests. VOTE +1. @dwitry Could you add the CHANGELOG entry and base it on tp33 branch? If you don't have time to do it, I can cherry pick it and add the changelog entry when merging it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [tinkerpop] dwitry commented on issue #1098: Allow to configure processor in Gremlin-JavaScript
dwitry commented on issue #1098: Allow to configure processor in Gremlin-JavaScript URL: https://github.com/apache/tinkerpop/pull/1098#issuecomment-485710403 @spmallette, could you please elaborate on your side note? In this case, a client sends query string and Cypher→Gremlin translation happens on TinkerPop server side (using Cypher for Gremlin plugin). Translation cannot be done on JavaScript client side because translation works in JVM. Please clarify how do you see the use of bytecode in this scenario? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services