Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2199

Change subject: [WIP] fix timeout behavior
......................................................................

[WIP] fix timeout behavior

Change-Id: Ide0515dc8ef9f8c295e1dc2ffde297100634060a
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
4 files changed, 76 insertions(+), 34 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/99/2199/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 64ea73d..9368061 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -68,7 +68,7 @@
     @Override
     protected void executeStatement(String statementsText, SessionOutput 
sessionOutput,
             IStatementExecutor.ResultDelivery delivery, 
IStatementExecutor.Stats stats, RequestParameters param,
-            long[] outExecStartEnd, Map<String, String> optionalParameters) 
throws Exception {
+            RequestExecutionState execution, Map<String, String> 
optionalParameters) throws Exception {
         // Running on NC -> send 'execute' message to CC
         INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
         INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
@@ -88,7 +88,7 @@
             ExecuteStatementRequestMessage requestMsg = new 
ExecuteStatementRequestMessage(ncCtx.getNodeId(),
                     responseFuture.getFutureId(), queryLanguage, 
statementsText, sessionOutput.config(), ccDelivery,
                     param.clientContextID, handleUrl, optionalParameters);
-            outExecStartEnd[0] = System.nanoTime();
+            execution.start();
             ncMb.sendMessageToCC(requestMsg);
             try {
                 responseMsg = (ExecuteStatementResponseMessage) 
responseFuture.get(timeout, TimeUnit.MILLISECONDS);
@@ -96,12 +96,13 @@
                 cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, e, 
false);
                 throw e;
             } catch (TimeoutException exception) {
-                RuntimeDataException hde = new 
RuntimeDataException(ErrorCode.QUERY_TIMEOUT, exception);
+                RuntimeDataException hde = new 
RuntimeDataException(ErrorCode.QUERY_TIMEOUT);
+                hde.addSuppressed(exception);
                 // cancel query
                 cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, 
hde, true);
                 throw hde;
             }
-            outExecStartEnd[1] = System.nanoTime();
+            execution.end();
         } finally {
             ncMb.deregisterMessageFuture(responseFuture.getFutureId());
         }
@@ -150,13 +151,13 @@
     }
 
     @Override
-    protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
+    protected void handleExecuteStatementException(Throwable t, 
RequestExecutionState execution) {
         if (t instanceof TimeoutException
                 || (t instanceof HyracksDataException && 
ExceptionUtils.getRootCause(t) instanceof IPCException)) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t);
-            return HttpResponseStatus.SERVICE_UNAVAILABLE;
+            execution.setStatus(ResultStatus.FAILED, 
HttpResponseStatus.SERVICE_UNAVAILABLE);
         } else {
-            return super.handleExecuteStatementException(t);
+            super.handleExecuteStatementException(t, execution);
         }
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 42c9edd..d947ec4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -36,6 +35,7 @@
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.aql.parser.TokenMgrError;
 import org.apache.asterix.lang.common.base.IParser;
@@ -191,6 +191,46 @@
         }
     }
 
+    static final class RequestExecutionState {
+        private long execStart = -1;
+        private long execEnd = -1;
+        private ResultStatus resultStatus = ResultStatus.SUCCESS;
+        private HttpResponseStatus httpResponseStatus = HttpResponseStatus.OK;
+
+        void setStatus(ResultStatus resultStatus, HttpResponseStatus 
httpResponseStatus) {
+            this.resultStatus = resultStatus;
+            this.httpResponseStatus = httpResponseStatus;
+        }
+
+        ResultStatus getResultStatus() {
+            return resultStatus;
+        }
+
+        HttpResponseStatus getHttpStatus() {
+            return httpResponseStatus;
+        }
+
+        void start() {
+            execStart = System.nanoTime();
+        }
+
+        void end() {
+            execEnd = System.nanoTime();
+        }
+
+        void finish() {
+            if (execStart == -1) {
+                execEnd = -1;
+            } else if (execEnd == -1) {
+                execEnd = System.nanoTime();
+            }
+        }
+
+        long duration() {
+            return execEnd - execStart;
+        }
+    }
+
     private static String getParameterValue(String content, String attribute) {
         if (content == null || attribute == null) {
             return null;
@@ -289,7 +329,6 @@
         ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, true);
         pw.print("\t");
         ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), 
processedObjects, hasErrors);
-        pw.print("\t");
         if (hasErrors) {
             pw.print("\t");
             ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), errorCount, 
false);
@@ -334,6 +373,7 @@
             param.pretty = 
Boolean.parseBoolean(request.getParameter(Parameter.PRETTY.str()));
             param.mode = toLower(request.getParameter(Parameter.MODE.str()));
             param.clientContextID = 
request.getParameter(Parameter.CLIENT_ID.str());
+            param.timeout = request.getParameter(Parameter.TIMEOUT.str());
         }
         return param;
     }
@@ -390,8 +430,8 @@
         SessionConfig sessionConfig = sessionOutput.config();
         HttpUtil.setContentType(response, 
HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
 
+        RequestExecutionState execution = new RequestExecutionState();
         Stats stats = new Stats();
-        long[] execStartEnd = new long[] { -1, -1 };
 
         // buffer the output until we are ready to set the status of the 
response message correctly
         sessionOutput.hold();
@@ -410,27 +450,24 @@
             if (optionalParamProvider != null) {
                 optionalParams = optionalParamProvider.apply(request);
             }
-            response.setStatus(HttpResponseStatus.OK);
-            executeStatement(statementsText, sessionOutput, delivery, stats, 
param, execStartEnd, optionalParams);
+            response.setStatus(execution.getHttpStatus());
+            executeStatement(statementsText, sessionOutput, delivery, stats, 
param, execution, optionalParams);
             if (ResultDelivery.IMMEDIATE == delivery || 
ResultDelivery.DEFERRED == delivery) {
-                ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
+                ResultUtil.printStatus(sessionOutput, 
execution.getResultStatus());
             }
             errorCount = 0;
         } catch (Exception | TokenMgrError | 
org.apache.asterix.aqlplus.parser.TokenMgrError e) {
-            response.setStatus(handleExecuteStatementException(e));
+            handleExecuteStatementException(e, execution);
+            response.setStatus(execution.getHttpStatus());
             ResultUtil.printError(sessionOutput.out(), e);
-            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
+            ResultUtil.printStatus(sessionOutput, execution.getResultStatus());
         } finally {
             // make sure that we stop buffering and return the result to the 
http response
             sessionOutput.release();
-            if (execStartEnd[0] == -1) {
-                execStartEnd[1] = -1;
-            } else if (execStartEnd[1] == -1) {
-                execStartEnd[1] = System.nanoTime();
-            }
+            execution.finish();
         }
-        printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart, 
execStartEnd[1] - execStartEnd[0],
-                stats.getCount(), stats.getSize(), 
stats.getProcessedObjects(), errorCount);
+        printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart, 
execution.duration(), stats.getCount(),
+                stats.getSize(), stats.getProcessedObjects(), errorCount);
         sessionOutput.out().print("}\n");
         sessionOutput.out().flush();
         if (sessionOutput.out().checkError()) {
@@ -439,7 +476,7 @@
     }
 
     protected void executeStatement(String statementsText, SessionOutput 
sessionOutput, ResultDelivery delivery,
-            IStatementExecutor.Stats stats, RequestParameters param, long[] 
outExecStartEnd,
+            IStatementExecutor.Stats stats, RequestParameters param, 
RequestExecutionState execution,
             Map<String, String> optionalParameters) throws Exception {
         IClusterManagementWork.ClusterState clusterState =
                 ((ICcApplicationContext) 
appCtx).getClusterStateManager().getState();
@@ -452,25 +489,28 @@
         MetadataManager.INSTANCE.init();
         IStatementExecutor translator = 
statementExecutorFactory.create((ICcApplicationContext) appCtx, statements,
                 sessionOutput, compilationProvider, componentProvider);
-        outExecStartEnd[0] = System.nanoTime();
-        final IRequestParameters requestParameters =
-                new 
org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), 
delivery, stats, null,
-                        param.clientContextID, optionalParameters);
+        execution.start();
+        final IRequestParameters requestParameters = new 
org.apache.asterix.app.translator.RequestParameters(
+                getHyracksDataset(), delivery, stats, null, 
param.clientContextID, optionalParameters);
         translator.compileAndExecute(getHyracksClientConnection(), queryCtx, 
requestParameters);
-        outExecStartEnd[1] = System.nanoTime();
+        execution.end();
     }
 
-    protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
+    protected void handleExecuteStatementException(Throwable t, 
RequestExecutionState execution) {
         if (t instanceof org.apache.asterix.aqlplus.parser.TokenMgrError || t 
instanceof TokenMgrError
                 || t instanceof AlgebricksException) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, t.getMessage(), t);
-            return HttpResponseStatus.BAD_REQUEST;
+            execution.setStatus(ResultStatus.FATAL, 
HttpResponseStatus.BAD_REQUEST);
         } else if (t instanceof HyracksException) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.getMessage(), t);
-            return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+            if (((HyracksException) t).getErrorCode() == 
ErrorCode.QUERY_TIMEOUT) {
+                execution.setStatus(ResultStatus.TIMEOUT, 
HttpResponseStatus.OK);
+            } else {
+                execution.setStatus(ResultStatus.FATAL, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            }
         } else {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected 
exception", t);
-            return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+            execution.setStatus(ResultStatus.FATAL, 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
         }
     }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index 72d82e0..ccbf68d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -130,8 +130,9 @@
     public static void printError(PrintWriter pw, String msg, int code, 
boolean comma) {
         pw.print("\t\"");
         pw.print(AbstractQueryApiServlet.ResultFields.ERRORS.str());
-        pw.print("\": [{ \n");
+        pw.print("\": [{ \n\t");
         printField(pw, QueryServiceServlet.ErrorField.CODE.str(), code);
+        pw.print("\t");
         printField(pw, QueryServiceServlet.ErrorField.MSG.str(), 
JSONUtil.escape(msg), false);
         pw.print(comma ? "\t}],\n" : "\t}]\n");
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 330cf1f..1f82a17 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -41,7 +41,7 @@
 22 = The distributed job %1$s already exists
 23 = The distributed work failed for %1$s at %2$s
 24 = No result set for job %1$s
-25 = Job %1$s has been cancelled by a user
+25 = Job %1$s has been cancelled
 26 = Node %1$s failed
 27 = File %1$s is not a directory
 28 = User doesn't have read permissions on the file %1$s

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2199
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ide0515dc8ef9f8c295e1dc2ffde297100634060a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>

Reply via email to