Repository: tinkerpop
Updated Branches:
  refs/heads/master efcff1613 -> 150a53d8a


TINKERPOP-1511 Fixed problem in TraversalOpProcessor

TraversalOpProcessor was sending back the final message before the tx commit 
was happening. CTR


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/150a53d8
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/150a53d8
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/150a53d8

Branch: refs/heads/master
Commit: 150a53d8a0ba2c2276f2dec49428f1ee14e67d70
Parents: efcff16
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Mon Oct 17 13:53:32 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Mon Oct 17 13:57:14 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   1 +
 .../op/traversal/TraversalOpProcessor.java      | 146 ++++++++++++++++++-
 2 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/150a53d8/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 67506e0..5b4c578 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,6 +27,7 @@ TinkerPop 3.2.3 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 * Restructured Gremlin-Python's GraphSON I/O package to make it easier for 
users to register serializers/deserializers. (*breaking*)
+* Fixed a bug with `TraversalOpProcessor` that was returning a final result 
prior to committing the transaction.
 * Fixed a bug in `ConnectiveStrategy` where infix and/or was not correctly 
reasoning on `choose()` `HasNextStep` injections.
 * Increased performance of `CredentialGraph` authentication.
 * Removed Java 8 stream usage from `TraversalHelper` for performance reasons.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/150a53d8/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index 88d8d90..4b559a3 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -22,6 +22,8 @@ import com.codahale.metrics.Timer;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
@@ -38,6 +40,8 @@ import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.OpProcessor;
 import org.apache.tinkerpop.gremlin.server.Settings;
+import org.apache.tinkerpop.gremlin.server.handler.Frame;
+import org.apache.tinkerpop.gremlin.server.handler.StateKey;
 import org.apache.tinkerpop.gremlin.server.op.AbstractOpProcessor;
 import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
@@ -52,9 +56,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.script.SimpleBindings;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
@@ -369,7 +375,7 @@ public class TraversalOpProcessor extends 
AbstractOpProcessor {
                     try {
                         // compile the traversal - without it getEndStep() has 
nothing in it
                         traversal.applyStrategies();
-                        handleIterator(context, new 
TraverserIterator(traversal));
+                        handleIterator(context, new 
TraverserIterator(traversal), graph);
                     } catch (TimeoutException ex) {
                         final String errorMessage = String.format("Response 
iteration exceeded the configured threshold for request [%s] - %s", 
msg.getRequestId(), ex.getMessage());
                         logger.warn(errorMessage);
@@ -382,8 +388,6 @@ public class TraversalOpProcessor extends 
AbstractOpProcessor {
                         onError(graph, context);
                         return;
                     }
-
-                    onTraversalSuccess(graph, context);
                 } catch (Exception ex) {
                     logger.warn(String.format("Exception processing a 
Traversal on request [%s].", msg.getRequestId()), ex);
                     
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).statusMessage(ex.getMessage()).create());
@@ -444,4 +448,140 @@ public class TraversalOpProcessor extends 
AbstractOpProcessor {
 
         return metaData;
     }
+
+    protected void handleIterator(final Context context, final Iterator itty, 
final Graph graph) throws TimeoutException, InterruptedException {
+        final ChannelHandlerContext ctx = context.getChannelHandlerContext();
+        final RequestMessage msg = context.getRequestMessage();
+        final Settings settings = context.getSettings();
+        final MessageSerializer serializer = 
ctx.channel().attr(StateKey.SERIALIZER).get();
+        final boolean useBinary = 
ctx.channel().attr(StateKey.USE_BINARY).get();
+        boolean warnOnce = false;
+
+
+        // we have an empty iterator - happens on stuff like: g.V().iterate()
+        if (!itty.hasNext()) {
+            // as there is nothing left to iterate if we are transaction 
managed then we should execute a
+            // commit here before we send back a NO_CONTENT which implies 
success
+            onTraversalSuccess(graph, context);
+            ctx.writeAndFlush(ResponseMessage.build(msg)
+                    .code(ResponseStatusCode.NO_CONTENT)
+                    .create());
+            return;
+        }
+
+        // timer for the total serialization time
+        final StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+
+        // the batch size can be overridden by the request
+        final int resultIterationBatchSize = (Integer) 
msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
+                .orElse(settings.resultIterationBatchSize);
+        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+
+        // use an external control to manage the loop as opposed to just 
checking hasNext() in the while.  this
+        // prevent situations where auto transactions create a new transaction 
after calls to commit() withing
+        // the loop on calls to hasNext().
+        boolean hasMore = itty.hasNext();
+
+        while (hasMore) {
+            if (Thread.interrupted()) throw new InterruptedException();
+
+            // check if an implementation needs to force flush the aggregated 
results before the iteration batch
+            // size is reached.
+            final boolean forceFlush = isForceFlushed(ctx, msg, itty);
+
+            // have to check the aggregate size because it is possible that 
the channel is not writeable (below)
+            // so iterating next() if the message is not written and flushed 
would bump the aggregate size beyond
+            // the expected resultIterationBatchSize.  Total serialization 
time for the response remains in
+            // effect so if the client is "slow" it may simply timeout.
+            //
+            // there is a need to check hasNext() on the iterator because if 
the channel is not writeable the
+            // previous pass through the while loop will have next()'d the 
iterator and if it is "done" then a
+            // NoSuchElementException will raise its head. also need a check 
to ensure that this iteration doesn't
+            // require a forced flush which can be forced by sub-classes.
+            //
+            // this could be placed inside the isWriteable() portion of the 
if-then below but it seems better to
+            // allow iteration to continue into a batch if that is possible 
rather than just doing nothing at all
+            // while waiting for the client to catch up
+            if (aggregate.size() < resultIterationBatchSize && itty.hasNext() 
&& !forceFlush) aggregate.add(itty.next());
+
+            // send back a page of results if batch size is met or if it's the 
end of the results being iterated.
+            // also check writeability of the channel to prevent OOME for slow 
clients.
+            if (ctx.channel().isWritable()) {
+                if (forceFlush || aggregate.size() == resultIterationBatchSize 
|| !itty.hasNext()) {
+                    final ResponseStatusCode code = itty.hasNext() ? 
ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
+
+                    // serialize here because in sessionless requests the 
serialization must occur in the same
+                    // thread as the eval.  as eval occurs in the 
GremlinExecutor there's no way to get back to the
+                    // thread that processed the eval of the script so, we 
have to push serialization down into that
+                    Frame frame = null;
+                    try {
+                        frame = makeFrame(ctx, msg, serializer, useBinary, 
aggregate, code, generateMetaData(ctx, msg, code, itty));
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable 
release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+
+                        // exception is handled in makeFrame() - serialization 
error gets written back to driver
+                        // at that point
+                        onError(graph, context);
+                        break;
+                    }
+
+                    try {
+                        // only need to reset the aggregation list if there's 
more stuff to write
+                        if (itty.hasNext())
+                            aggregate = new 
ArrayList<>(resultIterationBatchSize);
+                        else {
+                            // iteration and serialization are both complete 
which means this finished successfully. note that
+                            // errors internal to script eval or timeout will 
rollback given GremlinServer's global configurations.
+                            // local errors will get rolledback below because 
the exceptions aren't thrown in those cases to be
+                            // caught by the GremlinExecutor for global 
rollback logic. this only needs to be committed if
+                            // there are no more items to iterate and 
serialization is complete
+                            onTraversalSuccess(graph, context);
+
+                            // exit the result iteration loop as there are no 
more results left.  using this external control
+                            // because of the above commit.  some graphs may 
open a new transaction on the call to
+                            // hasNext()
+                            hasMore = false;
+                        }
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable 
release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+                        throw ex;
+                    }
+
+                    iterateComplete(ctx, msg, itty);
+
+                    // the flush is called after the commit has potentially 
occurred.  in this way, if a commit was
+                    // required then it will be 100% complete before the 
client receives it. the "frame" at this point
+                    // should have completely detached objects from the 
transaction (i.e. serialization has occurred)
+                    // so a new one should not be opened on the flush down the 
netty pipeline
+                    ctx.writeAndFlush(frame);
+                }
+            } else {
+                // don't keep triggering this warning over and over again for 
the same request
+                if (!warnOnce) {
+                    logger.warn("Pausing response writing as 
writeBufferHighWaterMark exceeded on {} - writing will continue once client has 
caught up", msg);
+                    warnOnce = true;
+                }
+
+                // since the client is lagging we can hold here for a period 
of time for the client to catch up.
+                // this isn't blocking the IO thread - just a worker.
+                TimeUnit.MILLISECONDS.sleep(10);
+            }
+
+            stopWatch.split();
+            if (settings.serializedResponseTimeout > 0 && 
stopWatch.getSplitTime() > settings.serializedResponseTimeout) {
+                final String timeoutMsg = String.format("Serialization of the 
entire response exceeded the 'serializeResponseTimeout' setting %s",
+                        warnOnce ? "[Gremlin Server paused writes to client as 
messages were not being consumed quickly enough]" : "");
+                throw new TimeoutException(timeoutMsg.trim());
+            }
+
+            stopWatch.unsplit();
+        }
+
+        stopWatch.stop();
+    }
 }

Reply via email to