[GitHub] [tinkerpop] bpeav opened a new pull request #1096: TINKERPOP-2090: Fix Sign for Checking if Connection Pool Needs to be Repopulated

2019-04-11 Thread GitBox
bpeav opened a new pull request #1096: TINKERPOP-2090: Fix Sign for Checking if 
Connection Pool Needs to be Repopulated
URL: https://github.com/apache/tinkerpop/pull/1096
 
 
   https://issues.apache.org/jira/browse/TINKERPOP-2090
   
   Going through the commit history, it looks like the comparison changed in 
this PR used to be correct but was erroneously flipped after several 
modifications.
   
   _poolSize is the maximum number of connections in the pool and NrConnections 
is the actual number of connections currently open. The comparison would 
previously always be true, causing EnsurePoolIsPopulatedAsync() to always early 
exit and never repopulate the pool as connections in it were disposed of and 
removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[tinkerpop] branch tp4 updated: Figured out how to compile a Flow once and reuse it over and over again. This is great for nested traversals where a single traverser is inserted and result is returned

2019-04-11 Thread okram
This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
 new 11ceb5b  Figured out how to compile a Flow once and reuse it over and 
over again. This is great for nested traversals where a single traverser is 
inserted and result is returned and this happens over and over again for each 
incoming traverser. By 'caching' the Flow, we save on compilation costs.
11ceb5b is described below

commit 11ceb5baa725384893aa05d5c71bbaadc9ee604d
Author: Marko A. Rodriguez 
AuthorDate: Thu Apr 11 08:09:16 2019 -0600

Figured out how to compile a Flow once and reuse it over and over again. 
This is great for nested traversals where a single traverser is inserted and 
result is returned and this happens over and over again for each incoming 
traverser. By 'caching' the Flow, we save on compilation costs.
---
 .../apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java   | 3 ++-
 .../apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java   | 5 -
 .../org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java | 6 +-
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index c9b35c3..6c478f4 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -60,9 +60,10 @@ public abstract class AbstractRxJava implements 
Processor {
 
 @Override
 public void reset() {
+if (null != this.disposable)
+this.disposable.dispose();
 this.starts.clear();
 this.ends.clear();
-this.disposable = null;
 this.executed = false;
 }
 
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index d278cbd..c1cda6f 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -50,6 +50,7 @@ public final class ParallelRxJava extends 
AbstractRxJava {
 
 private final ExecutorService threadPool;
 private final String bytecodeId;
+private final ParallelFlowable> flowable;
 
 ParallelRxJava(final Compilation compilation, final 
ExecutorService threadPool) {
 super(compilation);
@@ -57,13 +58,15 @@ public final class ParallelRxJava extends 
AbstractRxJava {
 this.bytecodeId = compilation.getBytecode().getParent().isEmpty() ?
 (String) 
BytecodeUtil.getSourceInstructions(compilation.getBytecode(), 
RxJavaProcessor.RX_ROOT_BYTECODE_ID).get(0).args()[0] :
 null;
+// compile once and use many times
+this.flowable = 
this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)),
 this.compilation);
 }
 
 @Override
 protected void prepareFlow() {
 if (!this.executed) {
 this.executed = true;
-this.disposable = 
this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)),
 this.compilation).
+this.disposable = this.flowable.
 doOnNext(this.ends::add).
 sequential().
 doFinally(() -> {
diff --git 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index e68b2ad..5da1091 100644
--- 
a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ 
b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -45,15 +45,19 @@ import java.util.Map;
  */
 public final class SerialRxJava extends AbstractRxJava {
 
+private final Flowable> flowable;
+
 SerialRxJava(final Compilation compilation) {
 super(compilation);
+// compile once and reuse many times
+this.flowable = 
SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation);
 }
 
 @Override
 protected void prepareFlow() {
 if (!this.executed) {
 this.executed = true;
-this.disposable =