[tinkerpop] branch tp4 updated: fixed a bug in Pipes.subscribe().

2019-04-23 Thread okram
This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
 new 576f880  fixed a bug in Pipes.subscribe().
576f880 is described below

commit 576f880e5918b0912fa8b9bdf531d24e116be3ba
Author: Marko A. Rodriguez 
AuthorDate: Tue Apr 23 06:54:50 2019 -0600

fixed a bug in Pipes.subscribe().
---
 .../src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java | 2 +-
 .../main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java   | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
index 20007c4..0bb39fe 100644
--- 
a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
+++ 
b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
@@ -89,7 +89,7 @@ public class Beam implements Processor {
 @Override
 public void stop() {
 try {
-if (null != this.pipelineResult)
+if (this.isRunning())
 this.pipelineResult.cancel();
 } catch (final IOException e) {
 throw new RuntimeException(e.getMessage(), e);
diff --git 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
index 317659f..cc5a4cf 100644
--- 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
+++ 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
@@ -115,7 +115,6 @@ public final class Pipes implements Processor {
 if (this.isRunning())
 throw Processor.Exceptions.processorIsCurrentlyRunning(this);
 
-this.alive.set(Boolean.TRUE);
 new Thread(() -> {
 final Iterator> iterator = this.iterator(starts);
 while (iterator.hasNext()) {



[tinkerpop] branch tp4 updated: some reorg on RxJava.

2019-04-23 Thread okram
This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
 new d9a9863  some reorg on RxJava.
d9a9863 is described below

commit d9a986302a487359ebb42387bfb7a3a851ffcbca
Author: Marko A. Rodriguez 
AuthorDate: Tue Apr 23 05:02:31 2019 -0600

some reorg on RxJava.
---
 .../tinkerpop/machine/processor/pipes/Pipes.java   |  1 -
 .../machine/processor/rxjava/AbstractRxJava.java   | 23 +-
 .../machine/processor/rxjava/ParallelRxJava.java   |  5 +++--
 .../machine/processor/rxjava/SerialRxJava.java |  5 +++--
 4 files changed, 15 insertions(+), 19 deletions(-)

diff --git 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
index 9f3c7c8..317659f 100644
--- 
a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
+++ 
b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
@@ -100,7 +100,6 @@ public final class Pipes implements Processor {
 
 @Override
 public Iterator> iterator(final Iterator> 
starts) {
-
 if (this.isRunning())
 throw Processor.Exceptions.processorIsCurrentlyRunning(this);
 
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index d16a4e1..9d88651 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -23,6 +23,7 @@ import 
org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
 
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,15 +64,8 @@ public abstract class AbstractRxJava implements 
Processor {
 
 @Override
 public Iterator> iterator(final Iterator> 
starts) {
-if (this.isRunning())
-throw Processor.Exceptions.processorIsCurrentlyRunning(this);
-
-this.running.set(Boolean.TRUE);
-this.starts.clear();
-this.ends.clear();
-starts.forEachRemaining(this.starts::add);
-this.prepareFlow(this.ends::add);
-return new Iterator<>() {
+this.prepareFlow(starts, this.ends::add);
+return IteratorUtils.onLast(new Iterator<>() {
 @Override
 public boolean hasNext() {
 waitForCompletionOrResult();
@@ -83,11 +77,15 @@ public abstract class AbstractRxJava implements 
Processor {
 waitForCompletionOrResult();
 return ends.remove();
 }
-};
+}, this::stop);
 }
 
 @Override
 public void subscribe(final Iterator> starts, final 
Consumer> consumer) {
+this.prepareFlow(starts, consumer::accept);
+}
+
+protected void prepareFlow(final Iterator> starts, final 
io.reactivex.functions.Consumer> consumer) {
 if (this.isRunning())
 throw Processor.Exceptions.processorIsCurrentlyRunning(this);
 
@@ -95,13 +93,10 @@ public abstract class AbstractRxJava implements 
Processor {
 this.starts.clear();
 this.ends.clear();
 starts.forEachRemaining(this.starts::add);
-this.prepareFlow(consumer::accept);
 }
 
-protected abstract void prepareFlow(final 
io.reactivex.functions.Consumer> consumer);
-
 private void waitForCompletionOrResult() {
-while (this.ends.isEmpty() && this.isRunning()) {
+while (this.ends.isEmpty() && this.running.get()) {
 // wait until either the flow is complete or there is a traverser 
result
 }
 }
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index b0d92b1..3aac6a2 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -34,13 +34,13 @@ import 
org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import 

[tinkerpop] branch tp4 updated: Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't don

2019-04-23 Thread okram
This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
 new 2cf7cdf  Processor no longer implement Iterator. It now supports both 
a push-based and a pull-based execution via subscribe(Consumer) and iterator(), 
respectively. I haven't done this properly for Beam as I need to rework 
TraverserServer. Also, need to think about how to use Compilation within a 
push-based system so that local/nested traversals don't block on 
iterator.next().
2cf7cdf is described below

commit 2cf7cdfed05234a22b29a124c6617fb45bf5e7de
Author: Marko A. Rodriguez 
AuthorDate: Tue Apr 23 04:38:15 2019 -0600

Processor no longer implement Iterator. It now supports both a push-based 
and a pull-based execution via subscribe(Consumer) and iterator(), 
respectively. I haven't done this properly for Beam as I need to rework 
TraverserServer. Also, need to think about how to use Compilation within a 
push-based system so that local/nested traversals don't block on 
iterator.next().
---
 .../machine/bytecode/compiler/Compilation.java | 39 +++--
 .../machine/processor/EmptyProcessor.java  | 16 --
 .../machine/processor/FilterProcessor.java |  7 ++-
 .../machine/processor/LoopsProcessor.java  |  5 +-
 .../tinkerpop/machine/processor/Processor.java | 45 +--
 .../machine/processor/SimpleProcessor.java | 44 +++
 .../tinkerpop/machine/species/BasicMachine.java|  3 +-
 .../tinkerpop/machine/species/LocalMachine.java|  5 +-
 .../tinkerpop/machine/util/IteratorUtils.java  | 35 +++-
 .../tinkerpop/machine/processor/beam/Beam.java | 43 ++
 .../machine/processor/pipes/BranchStep.java|  4 +-
 .../tinkerpop/machine/processor/pipes/Pipes.java   | 45 +++
 .../machine/processor/pipes/RepeatStep.java| 22 +---
 .../machine/processor/rxjava/AbstractRxJava.java   | 65 --
 .../machine/processor/rxjava/ParallelRxJava.java   | 31 +--
 .../machine/processor/rxjava/SerialRxJava.java | 15 +++--
 16 files changed, 305 insertions(+), 119 deletions(-)

diff --git 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
index 32a04e4..f7105d9 100644
--- 
a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
+++ 
b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.machine.structure.EmptyStructure;
 import org.apache.tinkerpop.machine.structure.StructureFactory;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -75,53 +76,51 @@ public final class Compilation implements 
Serializable, Cloneable {
 }
 
 public Processor getProcessor() {
-if (null == this.processor)
-this.processor = this.processorFactory.mint(this);
+this.prepareProcessor();
 return this.processor;
 }
 
 private void prepareProcessor() {
 if (null == this.processor)
 this.processor = this.processorFactory.mint(this);
-else
-this.processor.reset();
 }
 
-private Traverser prepareTraverser(final Traverser traverser) {
+private Iterator> prepareTraverser(final Traverser 
traverser) {
 final Traverser clone = traverser.clone();
 clone.coefficient().unity();
-return clone;
+return IteratorUtils.of(clone);
 }
 
 public Traverser mapTraverser(final Traverser traverser) {
 this.prepareProcessor();
-this.processor.addStart(this.prepareTraverser(traverser));
-if (!this.processor.hasNext())
+final Iterator> iterator = 
this.processor.iterator(this.prepareTraverser(traverser));
+if (!iterator.hasNext())
 throw new RuntimeException("The nested traversal is not a map 
function: " + this);
-return this.processor.next();
+final Traverser result = iterator.next();
+this.processor.stop();
+return result;
 }
 
 public Traverser mapObject(final S object) {
 this.prepareProcessor();
-
this.processor.addStart(this.traverserFactory.create(this.functions.get(0), 
object));
-return this.processor.next();
+final Iterator> iterator = 
this.processor.iterator(this.prepareTraverser(this.traverserFactory.create(this.functions.get(0),
 object)));
+final Traverser result = iterator.next();
+

[GitHub] [tinkerpop] jorgebay commented on issue #1098: Allow to configure processor in Gremlin-JavaScript

2019-04-23 Thread GitBox
jorgebay commented on issue #1098: Allow to configure processor in 
Gremlin-JavaScript
URL: https://github.com/apache/tinkerpop/pull/1098#issuecomment-485715138
 
 
   Thank for your contribution, good idea to cover it with unit tests.
   
   VOTE +1.
   
   @dwitry Could you add the CHANGELOG entry and base it on tp33 branch?
   
   If you don't have time to do it, I can cherry pick it and add the changelog 
entry when merging it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [tinkerpop] dwitry commented on issue #1098: Allow to configure processor in Gremlin-JavaScript

2019-04-23 Thread GitBox
dwitry commented on issue #1098: Allow to configure processor in 
Gremlin-JavaScript
URL: https://github.com/apache/tinkerpop/pull/1098#issuecomment-485710403
 
 
   @spmallette, could you please elaborate on your side note?
   
   In this case, a client sends query string and Cypher→Gremlin translation 
happens on TinkerPop server side (using Cypher for Gremlin plugin). Translation 
cannot be done on JavaScript client side because translation works in JVM. 
Please clarify how do you see the use of bytecode in this scenario?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services