TINKERPOP-1770 Enable timeouts for remote traversals

Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/63191aef
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/63191aef
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/63191aef

Branch: refs/heads/TINKERPOP-1730
Commit: 63191aef22e6cce2f052cb1c8fa898d18a07615b
Parents: c59393f
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Sep 7 09:15:37 2017 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed Sep 13 06:54:10 2017 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../upgrade/release-3.2.x-incubating.asciidoc   | 20 ++++-
 .../op/traversal/TraversalOpProcessor.java      | 84 ++++++++++----------
 .../server/GremlinServerIntegrateTest.java      | 34 ++++++++
 4 files changed, 93 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63191aef/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index c15835c..70d6134 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,7 @@ TinkerPop 3.2.7 (Release Date: NOT OFFICIALLY RELEASED YET)
 
 * Bump to Jackson 2.8.10.
 * Added an `EmbeddedRemoteConnection` so that it's possible to mimic a remote 
connection within the same JVM.
+* Supported interruption for remote traversals.
 * The Console's `plugin.txt` file is only updated if there were manually 
uninstalled plugins.
 * Fixed a bug in `MatchStep` where mid-traversal `where()` variables were not 
being considered in start-scope.
 * Generalized `MatchStep` to locally compute all clauses with barriers (not 
just reducing barriers).

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63191aef/docs/src/upgrade/release-3.2.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc 
b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
index 407cc89..ae504d4 100644
--- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
@@ -41,10 +41,22 @@ simple way to provide a "remote" that is actually local to 
the same JVM.
 
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-1756[TINKERPOP-1756]
 
-Changes to match()
-~~~~~~~~~~~~~~~~~~
 
-The `match()`-step has been generalized to suppor the local scoping of all 
barrier steps, not just reducing barrier steps.
+Remote Traversal Timeout
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+There was limited support for "timeouts" with remote traversals (i.e. those 
traversals executed using the `withRemote()`
+option) prior to 3.2.7. Remote traversals will now interrupt on the server 
using the `scriptEvaluationTimeout`
+setting in the same way that normal script evaluations would. As a reminder, 
interruptions for traversals are always
+considered "attempts to interrupt" and may not always succeed (a graph 
database implementation might not respect the
+interruption, for example).
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1770[TINKERPOP-1770]
+
+Modifications to match()
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+The `match()`-step has been generalized to support the local scoping of all 
barrier steps, not just reducing barrier steps.
 Previously, the `order().limit()` clause would have worked globally yielding:
 
 [source,groovy]
@@ -74,6 +86,8 @@ This includes steps like `count()`, `min()`, `max()`, 
`sum()`, `group()`, `group
 generalized this behavior to all barriers and thus, adds `aggregate()`, 
`dedup()`, `range()`, `limit()`, `tail()`, and `order()`
 to the list of locally computed clauses.
 
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1764[TINKERPOP-1764]
+
 TinkerPop 3.2.6
 ---------------
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63191aef/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
index 4e35a85..fc3066a 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java
@@ -35,6 +35,7 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
+import 
org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.server.Context;
 import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
@@ -56,6 +57,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.script.SimpleBindings;
+import java.lang.ref.WeakReference;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,6 +67,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -347,6 +353,10 @@ public class TraversalOpProcessor extends 
AbstractOpProcessor {
         // earlier validation in selection of this op method should free us to 
cast this without worry
         final Map<String, String> aliases = (Map<String, String>) 
msg.optionalArgs(Tokens.ARGS_ALIASES).get();
 
+        // timeout override
+        final long seto = 
msg.getArgs().containsKey(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT) ?
+                
Long.parseLong(msg.getArgs().get(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT).toString()) : 
context.getSettings().scriptEvaluationTimeout;
+
         final GraphManager graphManager = context.getGraphManager();
         final String traversalSourceName = 
aliases.entrySet().iterator().next().getValue();
         final TraversalSource g = 
graphManager.getTraversalSource(traversalSourceName);
@@ -370,51 +380,54 @@ public class TraversalOpProcessor extends 
AbstractOpProcessor {
         }
 
         final Timer.Context timerContext = traversalOpTimer.time();
-        try {
+        final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
             final ChannelHandlerContext ctx = 
context.getChannelHandlerContext();
             final Graph graph = g.getGraph();
 
-            context.getGremlinExecutor().getExecutorService().submit(() -> {
+            try {
+                beforeProcessing(graph, context);
+
                 try {
-                    beforeProcessing(graph, context);
+                    // compile the traversal - without it getEndStep() has 
nothing in it
+                    traversal.applyStrategies();
+                    handleIterator(context, new TraverserIterator(traversal), 
graph);
+                } catch (Exception ex) {
+                    Throwable t = ex;
+                    if (ex instanceof UndeclaredThrowableException)
+                        t = t.getCause();
 
-                    try {
-                        // compile the traversal - without it getEndStep() has 
nothing in it
-                        traversal.applyStrategies();
-                        handleIterator(context, new 
TraverserIterator(traversal), graph);
-                    } catch (TimeoutException ex) {
-                        final String errorMessage = String.format("Response 
iteration exceeded the configured threshold for request [%s] - %s", 
msg.getRequestId(), ex.getMessage());
+                    if (t instanceof InterruptedException || t instanceof 
TraversalInterruptedException) {
+                        final String errorMessage = String.format("A timeout 
occurred during traversal evaluation of [%s] - consider increasing the limit 
given to scriptEvaluationTimeout", msg);
                         logger.warn(errorMessage);
                         
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
                                 .statusMessage(errorMessage)
                                 .statusAttributeException(ex).create());
                         onError(graph, context);
-                        return;
-                    } catch (Exception ex) {
+                    } else {
                         logger.warn(String.format("Exception processing a 
Traversal on iteration for request [%s].", msg.getRequestId()), ex);
                         
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
                                 .statusMessage(ex.getMessage())
                                 .statusAttributeException(ex).create());
                         onError(graph, context);
-                        return;
                     }
-                } catch (Exception ex) {
-                    logger.warn(String.format("Exception processing a 
Traversal on request [%s].", msg.getRequestId()), ex);
-                    
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
-                            .statusMessage(ex.getMessage())
-                            .statusAttributeException(ex).create());
-                    onError(graph, context);
-                } finally {
-                    timerContext.stop();
                 }
-            });
+            } catch (Exception ex) {
+                logger.warn(String.format("Exception processing a Traversal on 
request [%s].", msg.getRequestId()), ex);
+                
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                        .statusMessage(ex.getMessage())
+                        .statusAttributeException(ex).create());
+                onError(graph, context);
+            } finally {
+                timerContext.stop();
+            }
 
-        } catch (Exception ex) {
-            timerContext.stop();
-            throw new OpProcessorException("Could not iterate the Traversal 
instance",
-                    
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
-                            .statusMessage(ex.getMessage())
-                            .statusAttributeException(ex).create());
+            return null;
+        });
+
+        final Future<?> executionFuture = 
context.getGremlinExecutor().getExecutorService().submit(evalFuture);
+        if (seto > 0) {
+            // Schedule a timeout in the thread pool for future execution
+            context.getScheduledExecutorService().schedule(() -> 
executionFuture.cancel(true), seto, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -463,7 +476,7 @@ public class TraversalOpProcessor extends 
AbstractOpProcessor {
         return metaData;
     }
 
-    protected void handleIterator(final Context context, final Iterator itty, 
final Graph graph) throws TimeoutException, InterruptedException {
+    protected void handleIterator(final Context context, final Iterator itty, 
final Graph graph) throws InterruptedException {
         final ChannelHandlerContext ctx = context.getChannelHandlerContext();
         final RequestMessage msg = context.getRequestMessage();
         final Settings settings = context.getSettings();
@@ -483,10 +496,6 @@ public class TraversalOpProcessor extends 
AbstractOpProcessor {
             return;
         }
 
-        // timer for the total serialization time
-        final StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-
         // the batch size can be overridden by the request
         final int resultIterationBatchSize = (Integer) 
msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
                 .orElse(settings.resultIterationBatchSize);
@@ -585,17 +594,6 @@ public class TraversalOpProcessor extends 
AbstractOpProcessor {
                 // this isn't blocking the IO thread - just a worker.
                 TimeUnit.MILLISECONDS.sleep(10);
             }
-
-            stopWatch.split();
-            if (settings.serializedResponseTimeout > 0 && 
stopWatch.getSplitTime() > settings.serializedResponseTimeout) {
-                final String timeoutMsg = String.format("Serialization of the 
entire response exceeded the 'serializeResponseTimeout' setting %s",
-                        warnOnce ? "[Gremlin Server paused writes to client as 
messages were not being consumed quickly enough]" : "");
-                throw new TimeoutException(timeoutMsg.trim());
-            }
-
-            stopWatch.unsplit();
         }
-
-        stopWatch.stop();
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63191aef/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 8b0c280..c401de6 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
@@ -51,6 +52,7 @@ import 
org.apache.tinkerpop.gremlin.process.remote.RemoteGraph;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
 import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor;
 import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
@@ -229,6 +231,9 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
                 settings.processors.clear();
                 settings.processors.add(processorSettingsSmall);
                 break;
+            case "shouldTimeOutRemoteTraversal":
+                settings.scriptEvaluationTimeout = 500;
+                break;
         }
 
         return settings;
@@ -289,6 +294,35 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
     }
 
     @Test
+    public void shouldTimeOutRemoteTraversal() throws Exception {
+        final Graph graph = EmptyGraph.instance();
+        final GraphTraversalSource g = graph.traversal().withRemote(conf);
+
+        try {
+            // tests sleeping thread
+            
g.inject(1).sideEffect(Lambda.consumer("Thread.sleep(10000)")).iterate();
+            fail("This traversal should have timed out");
+        } catch (Exception ex) {
+            final Throwable t = ex.getCause();
+            assertThat(t, instanceOf(ResponseException.class));
+            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, 
((ResponseException) t).getResponseStatusCode());
+        }
+
+        // make a graph with a cycle in it to force a long run traversal
+        
graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate();
+
+        try {
+            // tests an "unending" traversal
+            g.V().repeat(__.out()).until(__.outE().count().is(0)).iterate();
+            fail("This traversal should have timed out");
+        } catch (Exception ex) {
+            final Throwable t = ex.getCause();
+            assertThat(t, instanceOf(ResponseException.class));
+            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, 
((ResponseException) t).getResponseStatusCode());
+        }
+    }
+
+    @Test
     public void shouldUseBaseScript() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client client = cluster.connect(name.getMethodName());

Reply via email to