TINKERPOP-1770 Enable timeouts for remote traversals
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/63191aef Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/63191aef Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/63191aef Branch: refs/heads/TINKERPOP-1730 Commit: 63191aef22e6cce2f052cb1c8fa898d18a07615b Parents: c59393f Author: Stephen Mallette <sp...@genoprime.com> Authored: Thu Sep 7 09:15:37 2017 -0400 Committer: Stephen Mallette <sp...@genoprime.com> Committed: Wed Sep 13 06:54:10 2017 -0400 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../upgrade/release-3.2.x-incubating.asciidoc | 20 ++++- .../op/traversal/TraversalOpProcessor.java | 84 ++++++++++---------- .../server/GremlinServerIntegrateTest.java | 34 ++++++++ 4 files changed, 93 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63191aef/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c15835c..70d6134 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ TinkerPop 3.2.7 (Release Date: NOT OFFICIALLY RELEASED YET) * Bump to Jackson 2.8.10. * Added an `EmbeddedRemoteConnection` so that it's possible to mimic a remote connection within the same JVM. +* Supported interruption for remote traversals. * The Console's `plugin.txt` file is only updated if there were manually uninstalled plugins. * Fixed a bug in `MatchStep` where mid-traversal `where()` variables were not being considered in start-scope. * Generalized `MatchStep` to locally compute all clauses with barriers (not just reducing barriers). http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63191aef/docs/src/upgrade/release-3.2.x-incubating.asciidoc ---------------------------------------------------------------------- diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc b/docs/src/upgrade/release-3.2.x-incubating.asciidoc index 407cc89..ae504d4 100644 --- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc +++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc @@ -41,10 +41,22 @@ simple way to provide a "remote" that is actually local to the same JVM. See: link:https://issues.apache.org/jira/browse/TINKERPOP-1756[TINKERPOP-1756] -Changes to match() -~~~~~~~~~~~~~~~~~~ -The `match()`-step has been generalized to suppor the local scoping of all barrier steps, not just reducing barrier steps. +Remote Traversal Timeout +^^^^^^^^^^^^^^^^^^^^^^^^ + +There was limited support for "timeouts" with remote traversals (i.e. those traversals executed using the `withRemote()` +option) prior to 3.2.7. Remote traversals will now interrupt on the server using the `scriptEvaluationTimeout` +setting in the same way that normal script evaluations would. As a reminder, interruptions for traversals are always +considered "attempts to interrupt" and may not always succeed (a graph database implementation might not respect the +interruption, for example). + +See: link:https://issues.apache.org/jira/browse/TINKERPOP-1770[TINKERPOP-1770] + +Modifications to match() +^^^^^^^^^^^^^^^^^^^^^^^^ + +The `match()`-step has been generalized to support the local scoping of all barrier steps, not just reducing barrier steps. Previously, the `order().limit()` clause would have worked globally yielding: [source,groovy] @@ -74,6 +86,8 @@ This includes steps like `count()`, `min()`, `max()`, `sum()`, `group()`, `group generalized this behavior to all barriers and thus, adds `aggregate()`, `dedup()`, `range()`, `limit()`, `tail()`, and `order()` to the list of locally computed clauses. +See: link:https://issues.apache.org/jira/browse/TINKERPOP-1764[TINKERPOP-1764] + TinkerPop 3.2.6 --------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63191aef/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java index 4e35a85..fc3066a 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java @@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.server.GraphManager; import org.apache.tinkerpop.gremlin.server.GremlinServer; @@ -56,6 +57,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.script.SimpleBindings; +import java.lang.ref.WeakReference; +import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -64,6 +67,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -347,6 +353,10 @@ public class TraversalOpProcessor extends AbstractOpProcessor { // earlier validation in selection of this op method should free us to cast this without worry final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get(); + // timeout override + final long seto = msg.getArgs().containsKey(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT) ? + Long.parseLong(msg.getArgs().get(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT).toString()) : context.getSettings().scriptEvaluationTimeout; + final GraphManager graphManager = context.getGraphManager(); final String traversalSourceName = aliases.entrySet().iterator().next().getValue(); final TraversalSource g = graphManager.getTraversalSource(traversalSourceName); @@ -370,51 +380,54 @@ public class TraversalOpProcessor extends AbstractOpProcessor { } final Timer.Context timerContext = traversalOpTimer.time(); - try { + final FutureTask<Void> evalFuture = new FutureTask<>(() -> { final ChannelHandlerContext ctx = context.getChannelHandlerContext(); final Graph graph = g.getGraph(); - context.getGremlinExecutor().getExecutorService().submit(() -> { + try { + beforeProcessing(graph, context); + try { - beforeProcessing(graph, context); + // compile the traversal - without it getEndStep() has nothing in it + traversal.applyStrategies(); + handleIterator(context, new TraverserIterator(traversal), graph); + } catch (Exception ex) { + Throwable t = ex; + if (ex instanceof UndeclaredThrowableException) + t = t.getCause(); - try { - // compile the traversal - without it getEndStep() has nothing in it - traversal.applyStrategies(); - handleIterator(context, new TraverserIterator(traversal), graph); - } catch (TimeoutException ex) { - final String errorMessage = String.format("Response iteration exceeded the configured threshold for request [%s] - %s", msg.getRequestId(), ex.getMessage()); + if (t instanceof InterruptedException || t instanceof TraversalInterruptedException) { + final String errorMessage = String.format("A timeout occurred during traversal evaluation of [%s] - consider increasing the limit given to scriptEvaluationTimeout", msg); logger.warn(errorMessage); ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT) .statusMessage(errorMessage) .statusAttributeException(ex).create()); onError(graph, context); - return; - } catch (Exception ex) { + } else { logger.warn(String.format("Exception processing a Traversal on iteration for request [%s].", msg.getRequestId()), ex); ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR) .statusMessage(ex.getMessage()) .statusAttributeException(ex).create()); onError(graph, context); - return; } - } catch (Exception ex) { - logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), ex); - ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR) - .statusMessage(ex.getMessage()) - .statusAttributeException(ex).create()); - onError(graph, context); - } finally { - timerContext.stop(); } - }); + } catch (Exception ex) { + logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), ex); + ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR) + .statusMessage(ex.getMessage()) + .statusAttributeException(ex).create()); + onError(graph, context); + } finally { + timerContext.stop(); + } - } catch (Exception ex) { - timerContext.stop(); - throw new OpProcessorException("Could not iterate the Traversal instance", - ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR) - .statusMessage(ex.getMessage()) - .statusAttributeException(ex).create()); + return null; + }); + + final Future<?> executionFuture = context.getGremlinExecutor().getExecutorService().submit(evalFuture); + if (seto > 0) { + // Schedule a timeout in the thread pool for future execution + context.getScheduledExecutorService().schedule(() -> executionFuture.cancel(true), seto, TimeUnit.MILLISECONDS); } } @@ -463,7 +476,7 @@ public class TraversalOpProcessor extends AbstractOpProcessor { return metaData; } - protected void handleIterator(final Context context, final Iterator itty, final Graph graph) throws TimeoutException, InterruptedException { + protected void handleIterator(final Context context, final Iterator itty, final Graph graph) throws InterruptedException { final ChannelHandlerContext ctx = context.getChannelHandlerContext(); final RequestMessage msg = context.getRequestMessage(); final Settings settings = context.getSettings(); @@ -483,10 +496,6 @@ public class TraversalOpProcessor extends AbstractOpProcessor { return; } - // timer for the total serialization time - final StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - // the batch size can be overridden by the request final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE) .orElse(settings.resultIterationBatchSize); @@ -585,17 +594,6 @@ public class TraversalOpProcessor extends AbstractOpProcessor { // this isn't blocking the IO thread - just a worker. TimeUnit.MILLISECONDS.sleep(10); } - - stopWatch.split(); - if (settings.serializedResponseTimeout > 0 && stopWatch.getSplitTime() > settings.serializedResponseTimeout) { - final String timeoutMsg = String.format("Serialization of the entire response exceeded the 'serializeResponseTimeout' setting %s", - warnOnce ? "[Gremlin Server paused writes to client as messages were not being consumed quickly enough]" : ""); - throw new TimeoutException(timeoutMsg.trim()); - } - - stopWatch.unsplit(); } - - stopWatch.stop(); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63191aef/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java index 8b0c280..c401de6 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java @@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.Tokens; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; @@ -51,6 +52,7 @@ import org.apache.tinkerpop.gremlin.process.remote.RemoteGraph; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor; import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor; @@ -229,6 +231,9 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration settings.processors.clear(); settings.processors.add(processorSettingsSmall); break; + case "shouldTimeOutRemoteTraversal": + settings.scriptEvaluationTimeout = 500; + break; } return settings; @@ -289,6 +294,35 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration } @Test + public void shouldTimeOutRemoteTraversal() throws Exception { + final Graph graph = EmptyGraph.instance(); + final GraphTraversalSource g = graph.traversal().withRemote(conf); + + try { + // tests sleeping thread + g.inject(1).sideEffect(Lambda.consumer("Thread.sleep(10000)")).iterate(); + fail("This traversal should have timed out"); + } catch (Exception ex) { + final Throwable t = ex.getCause(); + assertThat(t, instanceOf(ResponseException.class)); + assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode()); + } + + // make a graph with a cycle in it to force a long run traversal + graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate(); + + try { + // tests an "unending" traversal + g.V().repeat(__.out()).until(__.outE().count().is(0)).iterate(); + fail("This traversal should have timed out"); + } catch (Exception ex) { + final Throwable t = ex.getCause(); + assertThat(t, instanceOf(ResponseException.class)); + assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode()); + } + } + + @Test public void shouldUseBaseScript() throws Exception { final Cluster cluster = TestClientFactory.open(); final Client client = cluster.connect(name.getMethodName());