TINKERPOP-2005 Handle evaluation excetions in AbstractEvalOpProcessor Some script evaluation exceptions in AbstractEvalOpProcessor may occur after the script has started executing. In this situation it it critical to prevent potentially writing multiple final (e.g. error vs. success) responses back to the client.
Exceptions that used to escape from evalOpInternal(...) would be converted to error response messages by OpExecutorHandler, which could coincide with a successful response from a quick script. This change makes AbstractEvalOpProcessor do the error message writing for the same type of exceptions that are handled by OpExecutorHandler. However, AbstractEvalOpProcessor makes sure that at most one final reponse message is sent by using ResponseHandlerContext. Also ResponseHandlerContext.writeAndFlush(...) methods will no longer throw exceptions for attempts to send multiple final messages. This is again to avoid multiple error response messages sent from OpExecutorHandler. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b7a44953 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b7a44953 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b7a44953 Branch: refs/heads/master Commit: b7a44953c8ac62f308b78709ab2565a78eddb1af Parents: f592e34 Author: Dmitri Bourlatchkov <dmitri.bourlatch...@datastax.com> Authored: Mon Jul 30 12:38:21 2018 -0400 Committer: Dmitri Bourlatchkov <dmitri.bourlatch...@datastax.com> Committed: Mon Jul 30 12:38:21 2018 -0400 ---------------------------------------------------------------------- .../gremlin/server/ResponseHandlerContext.java | 19 +++--- .../server/op/AbstractEvalOpProcessor.java | 26 +++++++- .../gremlin/server/op/AbstractOpProcessor.java | 2 +- .../server/ResponseHandlerContextTest.java | 45 +++++++++++--- .../server/op/AbstractEvalOpProcessorTest.java | 62 ++++++++++++++++++++ 5 files changed, 135 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java index fff4480..3c8c13c 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java @@ -20,6 +20,8 @@ package org.apache.tinkerpop.gremlin.server; import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -27,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * A context for asynchronously writing response messages related to a particular request. * <p>The "write" methods of this class ensure that at most one {@link ResponseStatusCode#isFinalResponse() final} * response message is written to the underlying channel. Attempts to write more than one final response message will - * result in an {@link IllegalStateException}.</p> + * be ignored with a warning log message.</p> * <p>Note: an object of this class should be used instead of writing to the channel directly when multiple threads * are expected to produce final response messages concurrently. Callers must ensure that the same * {@link ResponseHandlerContext} is used by all threads writing response messages for the same request.</p> @@ -35,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * @author Dmitri Bourlatchkov */ public class ResponseHandlerContext { + private static final Logger logger = LoggerFactory.getLogger(ResponseHandlerContext.class); private final Context context; private final AtomicBoolean finalResponseWritten = new AtomicBoolean(); @@ -52,7 +55,7 @@ public class ResponseHandlerContext { * {@link ResponseStatusCode#isFinalResponse() final} response is written. * <p>Note: this method should be used instead of writing to the channel directly when multiple threads * are expected to produce response messages concurrently.</p> - * <p>Attempts to write more than one final response message will result in an {@link IllegalStateException}.</p> + * <p>Attempts to write more than one final response message will be ignored.</p> * @see #writeAndFlush(ResponseStatusCode, Object) */ public void writeAndFlush(ResponseMessage message) { @@ -65,16 +68,18 @@ public class ResponseHandlerContext { * <p>The caller must make sure that the provided response status code matches the content of the message.</p> * <p>Note: this method should be used instead of writing to the channel directly when multiple threads * are expected to produce response messages concurrently.</p> - * <p>Attempts to write more than one final response message will result in an {@link IllegalStateException}.</p> + * <p>Attempts to write more than one final response message will be ignored.</p> * @see #writeAndFlush(ResponseMessage) */ public void writeAndFlush(ResponseStatusCode code, Object responseMessage) { final boolean messageIsFinal = code.isFinalResponse(); - if(!finalResponseWritten.compareAndSet(false, messageIsFinal)) { - final String errorMessage = String.format("Another final response message was already written for request %s", context.getRequestMessage().getRequestId()); - throw new IllegalStateException(errorMessage); + if(finalResponseWritten.compareAndSet(false, messageIsFinal)) { + context.getChannelHandlerContext().writeAndFlush(responseMessage); + } else { + final String logMessage = String.format("Another final response message was already written for request %s, ignoring response code: %s", + context.getRequestMessage().getRequestId(), code); + logger.warn(logMessage); } - context.getChannelHandlerContext().writeAndFlush(responseMessage); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java index 6ff0452..39168c2 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java @@ -223,9 +223,33 @@ public abstract class AbstractEvalOpProcessor extends AbstractOpProcessor { * script evaluation. * @param bindingsSupplier A function that returns the {@link Bindings} to provide to the * {@link GremlinExecutor#eval} method. + * @see #evalOpInternal(ResponseHandlerContext, Supplier, BindingSupplier) */ protected void evalOpInternal(final Context context, final Supplier<GremlinExecutor> gremlinExecutorSupplier, final BindingSupplier bindingsSupplier) throws OpProcessorException { + ResponseHandlerContext rhc = new ResponseHandlerContext(context); + try { + evalOpInternal(rhc, gremlinExecutorSupplier, bindingsSupplier); + } catch (Exception ex) { + // Exceptions may occur on after the script started executing, therefore corresponding errors must be + // reported via the ResponseHandlerContext. + logger.warn("Unable to process script evaluation request: " + ex, ex); + rhc.writeAndFlush(ResponseMessage.build(context.getRequestMessage()) + .code(ResponseStatusCode.SERVER_ERROR) + .statusAttributeException(ex) + .statusMessage(ex.getMessage()).create()); + } + } + + /** + * A variant of {@link #evalOpInternal(Context, Supplier, BindingSupplier)} that is suitable for use in situations + * when multiple threads may produce {@link ResponseStatusCode#isFinalResponse() final} response messages + * concurrently. + * @see #evalOpInternal(Context, Supplier, BindingSupplier) + */ + protected void evalOpInternal(final ResponseHandlerContext rhc, final Supplier<GremlinExecutor> gremlinExecutorSupplier, + final BindingSupplier bindingsSupplier) throws OpProcessorException { + final Context context = rhc.getContext(); final Timer.Context timerContext = evalOpTimer.time(); final ChannelHandlerContext ctx = context.getChannelHandlerContext(); final RequestMessage msg = context.getRequestMessage(); @@ -246,8 +270,6 @@ public abstract class AbstractEvalOpProcessor extends AbstractOpProcessor { final long seto = args.containsKey(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT) ? Long.parseLong(args.get(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT).toString()) : settings.scriptEvaluationTimeout; - ResponseHandlerContext rhc = new ResponseHandlerContext(context); - final GremlinExecutor.LifeCycle lifeCycle = GremlinExecutor.LifeCycle.build() .scriptEvaluationTimeoutOverride(seto) .afterFailure((b,t) -> { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java index 1263c81..c2b6f1f 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java @@ -80,7 +80,7 @@ public abstract class AbstractOpProcessor implements OpProcessor { } /** - * A variant of {@link #handleIterator(Context, Iterator)} that is suitable for use in situations when mutiple + * A variant of {@link #handleIterator(Context, Iterator)} that is suitable for use in situations when multiple * threads may produce {@link ResponseStatusCode#isFinalResponse() final} response messages concurrently. * @see #handleIterator(Context, Iterator) */ http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java index bea318b..6f15a33 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java @@ -19,10 +19,13 @@ package org.apache.tinkerpop.gremlin.server; import io.netty.channel.ChannelHandlerContext; +import org.apache.log4j.Logger; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; -import org.hamcrest.CoreMatchers; +import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -32,8 +35,7 @@ import java.util.Arrays; import java.util.UUID; import java.util.function.BiFunction; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) public class ResponseHandlerContextTest { @@ -45,6 +47,7 @@ public class ResponseHandlerContextTest { private final RequestMessage request = RequestMessage.build("test").create(); private final Context context = new Context(request, ctx, null, null, null, null); private final ResponseHandlerContext rhc = new ResponseHandlerContext(context); + private final Log4jRecordingAppender recordingAppender = new Log4jRecordingAppender(); @Parameterized.Parameters(name = "{0}") public static Iterable<Object[]> data() { @@ -79,6 +82,18 @@ public class ResponseHandlerContextTest { }); } + @Before + public void addRecordingAppender() { + final Logger rootLogger = Logger.getRootLogger(); + rootLogger.addAppender(recordingAppender); + } + + @After + public void removeRecordingAppender() { + final Logger rootLogger = Logger.getRootLogger(); + rootLogger.removeAppender(recordingAppender); + } + @Test public void shouldAllowMultipleNonFinalResponses() { writeInvoker.apply(rhc, ResponseStatusCode.AUTHENTICATE); @@ -99,12 +114,24 @@ public class ResponseHandlerContextTest { writeInvoker.apply(rhc, ResponseStatusCode.SUCCESS); Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any()); - try { - writeInvoker.apply(rhc, ResponseStatusCode.SERVER_ERROR_TIMEOUT); - fail("Expected an IllegalStateException"); - } catch (IllegalStateException ex) { - assertThat(ex.toString(), CoreMatchers.containsString(request.getRequestId().toString())); - } + writeInvoker.apply(rhc, ResponseStatusCode.SERVER_ERROR_TIMEOUT); + assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*")); + assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SERVER_ERROR_TIMEOUT + "$")); + + // ensure there were no other writes to the channel Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any()); } + + @Test + public void shouldNotAllowNonFinalMessagesAfterFinalResponse() { + writeInvoker.apply(rhc, ResponseStatusCode.SERVER_ERROR_TIMEOUT); + Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any()); + + writeInvoker.apply(rhc, ResponseStatusCode.PARTIAL_CONTENT); + assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*")); + assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.PARTIAL_CONTENT + "$")); + + // ensure there were no other writes to the channel + Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b7a44953/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessorTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessorTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessorTest.java new file mode 100644 index 0000000..6f25e2e --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessorTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.op; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; +import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; +import org.apache.tinkerpop.gremlin.server.Context; +import org.apache.tinkerpop.gremlin.server.Settings; +import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor; +import org.hamcrest.CoreMatchers; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import javax.script.SimpleBindings; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyString; + +public class AbstractEvalOpProcessorTest { + + @Test + public void evalOpInternalShouldHandleAllEvaluationExceptions() throws OpProcessorException { + AbstractEvalOpProcessor processor = new StandardOpProcessor(); + RequestMessage request = RequestMessage.build("test").create(); + Settings settings = new Settings(); + ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); + ArgumentCaptor<ResponseMessage> responseCaptor = ArgumentCaptor.forClass(ResponseMessage.class); + + GremlinExecutor gremlinExecutor = Mockito.mock(GremlinExecutor.class); + Mockito.when(gremlinExecutor.eval(anyString(), anyString(), Mockito.any(), Mockito.<GremlinExecutor.LifeCycle>any())) + .thenThrow(new IllegalStateException("test-exception")); + + Context context = new Context(request, ctx, settings, null, gremlinExecutor, null); + processor.evalOpInternal(context, context::getGremlinExecutor, SimpleBindings::new); + + Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(responseCaptor.capture()); + assertEquals(ResponseStatusCode.SERVER_ERROR, responseCaptor.getValue().getStatus().getCode()); + assertEquals(request.getRequestId(), responseCaptor.getValue().getRequestId()); + assertThat(responseCaptor.getValue().getStatus().getMessage(), CoreMatchers.containsString("test-exception")); + } +} \ No newline at end of file