abdullah alamoudi has submitted this change and it was merged. Change subject: [ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup ......................................................................
[ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup - user model changes: no - storage format changes: no - interface changes: yes --IJobManager.cancel now takes a callback details: - Job cancellation now completes only after the job cleanup work has completed and not merely when the abort tasks are executed. - The NCQueryServiceServlet actively cancels requests that passes 5 minutes. - Cancellation of timedout jobs is not done through the Http API but through message broker. - Typically, requests might timeout when the servers are overloaded. When that is the case, there is a high chance http requests are to be rejected including requests to cancel previously submitted queries. This is the reason for using Message broker for this task. - ExecuteStatementRequest used to execute the statement in a different executor thread even though it is itself is being executed in an executor thread and is not blocking anyone. This was fixed as well. Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1961 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java D asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M asterixdb/asterix-metadata/pom.xml M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java 40 files changed, 919 insertions(+), 335 deletions(-) Approvals: Jenkins: Verified; ; Verified Murtadha Hubail: Looks good to me, approved Objections: Anon. E. Moose #1000171: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java index a4e72f7..c38b3fc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR; import java.io.PrintWriter; import java.util.UUID; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java index 7874aa3..58c282f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR; import java.awt.image.BufferedImage; import java.io.BufferedReader; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java index b24a9a1..4faab1e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; import java.io.IOException; import java.io.PrintWriter; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java index 52d4d67..6dea30c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java index d9a63f7..52af643 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java index dcd0e70..9d2415d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; @@ -34,7 +34,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java new file mode 100644 index 0000000..bdda750 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public enum Duration { + SEC("s", 9), + MILLI("ms", 6), + MICRO("µs", 3), + NANO("ns", 0); + + static final long NANOSECONDS = 1; + static final long MICROSECONDS = 1000 * NANOSECONDS; + static final long MILLISECONDS = 1000 * MICROSECONDS; + static final long SECONDS = 1000 * MILLISECONDS; + static final long MINUTES = 60 * SECONDS; + static final long HOURS = 60 * MINUTES; + + String unit; + int nanoDigits; + + Duration(String unit, int nanoDigits) { + this.unit = unit; + this.nanoDigits = nanoDigits; + } + + public static String formatNanos(long nanoTime) { + final String strTime = String.valueOf(nanoTime); + final int len = strTime.length(); + for (Duration tu : Duration.values()) { + if (len > tu.nanoDigits) { + final String integer = strTime.substring(0, len - tu.nanoDigits); + final String fractional = strTime.substring(len - tu.nanoDigits); + return integer + (fractional.length() > 0 ? "." + fractional : "") + tu.unit; + } + } + return "illegal string value: " + strTime; + } + + // ParseDuration parses a duration string. + // A duration string is a possibly signed sequence of + // decimal numbers, each with optional fraction and a unit suffix, + // such as "300ms", "-1.5h" or "2h45m". + // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + // returns the duration in nano seconds + public static long parseDurationStringToNanos(String orig) throws HyracksDataException { + // [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+ + String s = orig; + long d = 0; + boolean neg = false; + char c; + // Consume [-+]? + if (!s.isEmpty()) { + c = s.charAt(0); + if (c == '-' || c == '+') { + neg = c == '-'; + s = s.substring(1); + } + } + + // Special case: if all that is left is "0", this is zero. + if ("0".equals(s)) { + return 0L; + } + + if (s.isEmpty()) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + + while (!s.isEmpty()) { + long v = 0L; // integers before decimal + long f = 0L; // integers after decimal + double scale = 1.0; // value = v + f/scale + // The next character must be [0-9.] + if (!(s.charAt(0) == '.' || '0' <= s.charAt(0) && s.charAt(0) <= '9')) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + // Consume [0-9]* + int pl = s.length(); + Pair<Long, String> pair = leadingInt(s); + v = pair.getLeft(); + s = pair.getRight(); + boolean pre = pl != s.length(); // whether we consumed anything before a period + + // Consume (\.[0-9]*)? + boolean post = false; + if (!s.isEmpty() && s.charAt(0) == '.') { + s = s.substring(1); + pl = s.length(); + Triple<Long, Double, String> triple = leadingFraction(s); + f = triple.getLeft(); + scale = triple.getMiddle(); + s = triple.getRight(); + post = pl != s.length(); + } + if (!pre && !post) { + // no digits (e.g. ".s" or "-.s") + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + + // Consume unit. + int i = 0; + for (; i < s.length(); i++) { + c = s.charAt(i); + if (c == '.' || '0' <= c && c <= '9') { + break; + } + } + if (i == 0) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + String u = s.substring(0, i); + s = s.substring(i); + long unit = getUnit(u); + if (v > Long.MAX_VALUE / unit) { + // overflow + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + v *= unit; + if (f > 0) { + // float64 is needed to be nanosecond accurate for fractions of hours. + // v >= 0 && (f*unit/scale) <= 3.6e+12 (ns/h, h is the largest unit) + v += (long) (((double) f * (double) unit) / scale); + if (v < 0) { + // overflow + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + } + d += v; + if (d < 0) { + // overflow + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + } + + if (neg) { + d = -d; + } + return d; + } + + private static final long getUnit(String unit) throws HyracksDataException { + switch (unit) { + case "ns": + return NANOSECONDS; + case "us": + case "µs":// U+00B5 = micro symbol + case "μs":// U+03BC = Greek letter mu + return MICROSECONDS; + case "ms": + return MILLISECONDS; + case "s": + return SECONDS; + case "m": + return MINUTES; + case "h": + return HOURS; + default: + throw new RuntimeDataException(ErrorCode.UNKNOWN_DURATION_UNIT, unit); + } + } + + // leadingInt consumes the leading [0-9]* from s. + static Pair<Long, String> leadingInt(String origin) throws HyracksDataException { + String s = origin; + long x = 0L; + int i = 0; + for (; i < s.length(); i++) { + char c = s.charAt(i); + if (c < '0' || c > '9') { + break; + } + if (x > Long.MAX_VALUE / 10) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, origin); + } + x = x * 10 + Character.getNumericValue(c); + if (x < 0) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, origin); + } + } + return Pair.of(x, s.substring(i)); + } + + // leadingFraction consumes the leading [0-9]* from s. + // It is used only for fractions, so does not return an error on overflow, + // it just stops accumulating precision. + static Triple<Long, Double, String> leadingFraction(String s) { + int i = 0; + long x = 0L; + double scale = 1.0; + boolean overflow = false; + for (; i < s.length(); i++) { + char c = s.charAt(i); + if (c < '0' || c > '9') { + break; + } + if (overflow) { + continue; + } + if (x > (1 << 63 - 1) / 10) { + // It's possible for overflow to give a positive number, so take care. + overflow = true; + continue; + } + long y = x * 10 + Character.getNumericValue(c); + if (y < 0) { + overflow = true; + continue; + } + x = y; + scale *= 10; + } + return Triple.of(x, scale, s.substring(i)); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 9547514..ef49c35 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -19,17 +19,20 @@ package org.apache.asterix.api.http.server; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import java.util.logging.Level; -import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.app.message.CancelQueryRequest; import org.apache.asterix.app.message.ExecuteStatementRequestMessage; import org.apache.asterix.app.message.ExecuteStatementResponseMessage; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.om.types.ARecordType; @@ -41,11 +44,14 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.ipc.exceptions.IPCException; +import io.netty.handler.codec.http.HttpResponseStatus; + /** * Query service servlet that can run on NC nodes. * Delegates query execution to CC, then serves the result. */ public class NCQueryServiceServlet extends QueryServiceServlet { + public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, ILangExtension.Language queryLanguage) { super(ctx, paths, appCtx, queryLanguage, null, null, null); @@ -63,13 +69,28 @@ ExecuteStatementResponseMessage responseMsg; MessageFuture responseFuture = ncMb.registerMessageFuture(); try { + if (param.clientContextID == null) { + param.clientContextID = UUID.randomUUID().toString(); + } + long timeout = ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; + if (param.timeout != null) { + timeout = java.util.concurrent.TimeUnit.NANOSECONDS + .toMillis(Duration.parseDurationStringToNanos(param.timeout)); + } ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl); outExecStartEnd[0] = System.nanoTime(); ncMb.sendMessageToCC(requestMsg); - responseMsg = (ExecuteStatementResponseMessage) responseFuture.get( - ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS); + try { + responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout, + java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (TimeoutException exception) { + RuntimeDataException hde = new RuntimeDataException(ErrorCode.QUERY_TIMEOUT, exception); + // cancel query + cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, hde); + throw hde; + } outExecStartEnd[1] = System.nanoTime(); } finally { ncMb.deregisterMessageFuture(responseFuture.getFutureId()); @@ -97,6 +118,22 @@ } } + private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String clientContextID, + Exception exception) { + MessageFuture cancelQueryFuture = messageBroker.registerMessageFuture(); + try { + CancelQueryRequest cancelQueryMessage = + new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID); + messageBroker.sendMessageToCC(cancelQueryMessage); + cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS, + java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (Exception e) { + exception.addSuppressed(e); + } finally { + messageBroker.deregisterMessageFuture(cancelQueryFuture.getFutureId()); + } + } + @Override protected HttpResponseStatus handleExecuteStatementException(Throwable t) { if (t instanceof IPCException || t instanceof TimeoutException) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java index d9757c7..6291869 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java index bfec146..3f07151 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java @@ -23,7 +23,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobId; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index 1f1d282..c630636 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -27,7 +27,6 @@ import java.util.logging.Logger; import org.apache.asterix.algebra.base.ILangExtension; -import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.config.GlobalConfig; @@ -108,7 +107,8 @@ FORMAT("format"), CLIENT_ID("client_context_id"), PRETTY("pretty"), - MODE("mode"); + MODE("mode"), + TIMEOUT("timeout"); private final String str; @@ -154,39 +154,12 @@ } } - public enum TimeUnit { - SEC("s", 9), - MILLI("ms", 6), - MICRO("µs", 3), - NANO("ns", 0); - - String unit; - int nanoDigits; - - TimeUnit(String unit, int nanoDigits) { - this.unit = unit; - this.nanoDigits = nanoDigits; - } - - public static String formatNanos(long nanoTime) { - final String strTime = String.valueOf(nanoTime); - final int len = strTime.length(); - for (TimeUnit tu : TimeUnit.values()) { - if (len > tu.nanoDigits) { - final String integer = strTime.substring(0, len - tu.nanoDigits); - final String fractional = strTime.substring(len - tu.nanoDigits); - return integer + (fractional.length() > 0 ? "." + fractional : "") + tu.unit; - } - } - return "illegal string value: " + strTime; - } - } - static class RequestParameters { String host; String path; String statement; String format; + String timeout; boolean pretty; String clientContextID; String mode; @@ -202,6 +175,7 @@ on.put("pretty", pretty); on.put("mode", mode); on.put("clientContextID", clientContextID); + on.put("format", format); return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on); } catch (JsonProcessingException e) { // NOSONAR return e.getMessage(); @@ -297,9 +271,9 @@ pw.print(ResultFields.METRICS.str()); pw.print("\": {\n"); pw.print("\t"); - ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), TimeUnit.formatNanos(elapsedTime)); + ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(elapsedTime)); pw.print("\t"); - ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), TimeUnit.formatNanos(executionTime)); + ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), Duration.formatNanos(executionTime)); pw.print("\t"); ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true); pw.print("\t"); @@ -334,6 +308,7 @@ param.pretty = getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false); param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str())); param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str()); + param.timeout = getOptText(jsonRequest, Parameter.TIMEOUT.str()); } catch (JsonParseException | JsonMappingException e) { // if the JSON parsing fails, the statement is empty and we get an empty statement error GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java index 27e2806..8536571 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.PrintWriter; import java.util.ArrayDeque; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java index 18aae8e..1a7918c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR; import java.io.IOException; import java.util.List; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java similarity index 96% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java index b815d76..2fe37c3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.api.http.servlet; +package org.apache.asterix.api.http.server; public class ServletConstants { public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION"; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java index fdd106d..06e2383 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java index 5acba381..eeef8e8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; import java.io.IOException; import java.io.PrintWriter; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index a16f678..3b4b974 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -34,14 +34,11 @@ import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventSubscriber; -import org.apache.asterix.active.IRetryPolicy; import org.apache.asterix.active.IRetryPolicyFactory; import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.active.message.StatsRequestMessage; -import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; -import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; @@ -53,8 +50,6 @@ import org.apache.asterix.metadata.api.IActiveEntityController; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.utils.DatasetUtil; -import org.apache.asterix.metadata.utils.MetadataLockUtil; import org.apache.asterix.translator.IStatementExecutor; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -94,8 +89,7 @@ protected boolean isFetchingStats; protected int numRegistered; protected int numDeRegistered; - protected volatile Future<Void> recoveryTask; - protected volatile boolean cancelRecovery; + protected volatile RecoveryTask rt; protected volatile boolean suspended = false; // failures protected Exception jobFailure; @@ -199,7 +193,8 @@ jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) : exceptions.get(0); setState(ActivityState.TEMPORARILY_FAILED); - if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING) { + if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING + && prevState != ActivityState.RESUMING) { recover(); } } else { @@ -356,112 +351,16 @@ @Override public synchronized void recover() throws HyracksDataException { LOGGER.log(level, "Recover is called on " + entityId); - if (recoveryTask != null) { - LOGGER.log(level, "But recovery task for " + entityId + " is already there!! throwing an exception"); - throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS); - } if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); setState(ActivityState.PERMANENTLY_FAILED); } else { ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); - IRetryPolicy policy = retryPolicyFactory.create(this); - cancelRecovery = false; setState(ActivityState.TEMPORARILY_FAILED); LOGGER.log(level, "Recovery task has been submitted"); - recoveryTask = executor.submit(() -> { - String nameBefore = Thread.currentThread().getName(); - try { - Thread.currentThread().setName("RecoveryTask (" + entityId + ")"); - doRecover(policy); - } finally { - Thread.currentThread().setName(nameBefore); - } - return null; - }); + rt = new RecoveryTask(appCtx, this, retryPolicyFactory); + executor.submit(rt.recover()); } - } - - protected Void doRecover(IRetryPolicy policy) - throws AlgebricksException, HyracksDataException, InterruptedException { - LOGGER.log(level, "Actual Recovery task has started"); - if (getState() != ActivityState.TEMPORARILY_FAILED) { - LOGGER.log(level, "but its state is not temp failure and so we're just returning"); - return null; - } - LOGGER.log(level, "calling the policy"); - while (policy.retry()) { - synchronized (this) { - if (cancelRecovery) { - recoveryTask = null; - notifyAll(); - return null; - } - while (clusterStateManager.getState() != ClusterState.ACTIVE) { - if (cancelRecovery) { - recoveryTask = null; - notifyAll(); - return null; - } - wait(); - } - } - waitForNonTransitionState(); - IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); - lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), - entityId.getDataverse() + '.' + entityId.getEntityName()); - for (Dataset dataset : getDatasets()) { - MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), - dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset)); - } - synchronized (this) { - try { - if (cancelRecovery) { - recoveryTask = null; - notifyAll(); - return null; - } - setState(ActivityState.RECOVERING); - doStart(metadataProvider); - recoveryTask = null; - notifyAll(); - return null; - } catch (Exception e) { - LOGGER.log(level, "Attempt to revive " + entityId + " failed", e); - setState(ActivityState.TEMPORARILY_FAILED); - recoverFailure = e; - } finally { - metadataProvider.getLocks().reset(); - } - notifyAll(); - } - } - // Recovery task is essntially over now either through failure or through cancellation(stop) - synchronized (this) { - recoveryTask = null; - notifyAll(); - if (state != ActivityState.TEMPORARILY_FAILED) { - return null; - } - } - IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); - try { - lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), - entityId.getDataverse() + '.' + entityId.getEntityName()); - for (Dataset dataset : getDatasets()) { - MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(), - DatasetUtil.getFullyQualifiedName(dataset)); - } - synchronized (this) { - if (state == ActivityState.TEMPORARILY_FAILED) { - setState(ActivityState.PERMANENTLY_FAILED); - } - notifyAll(); - } - } finally { - metadataProvider.getLocks().reset(); - } - return null; } @Override @@ -503,13 +402,10 @@ throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); } if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) { - if (recoveryTask != null) { + if (rt != null) { setState(ActivityState.STOPPING); - cancelRecovery = true; - recoveryTask.cancel(true); - while (recoveryTask != null) { - wait(); - } + rt.cancel(); + rt = null; } setState(ActivityState.STOPPED); try { @@ -531,6 +427,10 @@ } else { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); } + } + + public RecoveryTask getRecoveryTask() { + return rt; } @Override @@ -602,8 +502,9 @@ setState(ActivityState.RESUMING); WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED)); - recoveryTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService() - .getExecutor().submit(() -> resumeOrRecover(metadataProvider)); + rt = new RecoveryTask(appCtx, this, retryPolicyFactory); + metadataProvider.getApplicationContext().getServiceContext().getControllerService().getExecutor() + .submit(() -> rt.resumeOrRecover(metadataProvider)); try { subscriber.sync(); } catch (Exception e) { @@ -614,28 +515,6 @@ suspended = false; notifyAll(); } - } - - protected Void resumeOrRecover(MetadataProvider metadataProvider) - throws HyracksDataException, AlgebricksException, InterruptedException { - try { - doResume(metadataProvider); - synchronized (this) { - setState(ActivityState.RUNNING); - recoveryTask = null; - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "First attempt to resume " + entityId + " Failed", e); - setState(ActivityState.TEMPORARILY_FAILED); - if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { - setState(ActivityState.PERMANENTLY_FAILED); - } else { - IRetryPolicy policy = retryPolicyFactory.create(this); - cancelRecovery = false; - doRecover(policy); - } - } - return null; } @Override @@ -650,15 +529,6 @@ public void setLocations(AlgebricksAbsolutePartitionConstraint locations) { this.locations = locations; - } - - public Future<Void> getRecoveryTask() { - return recoveryTask; - } - - public synchronized void cancelRecovery() { - cancelRecovery = true; - notifyAll(); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java new file mode 100644 index 0000000..7b7de93 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.active; + +import java.util.concurrent.Callable; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.IRetryPolicy; +import org.apache.asterix.active.IRetryPolicyFactory; +import org.apache.asterix.active.NoRetryPolicyFactory; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.metadata.utils.MetadataLockUtil; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class RecoveryTask { + + private static final Logger LOGGER = Logger.getLogger(RecoveryTask.class.getName()); + private static final Level level = Level.INFO; + private final ActiveEntityEventsListener listener; + private volatile boolean cancelRecovery = false; + private final IRetryPolicyFactory retryPolicyFactory; + private final MetadataProvider metadataProvider; + private final IClusterStateManager clusterStateManager; + private Exception failure; + + public RecoveryTask(ICcApplicationContext appCtx, ActiveEntityEventsListener listener, + IRetryPolicyFactory retryPolicyFactory) { + this.listener = listener; + this.retryPolicyFactory = retryPolicyFactory; + this.metadataProvider = new MetadataProvider(appCtx, null); + this.clusterStateManager = appCtx.getClusterStateManager(); + } + + public Callable<Void> recover() { + if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { + return () -> { + return null; + }; + } + IRetryPolicy policy = retryPolicyFactory.create(listener); + return () -> { + String nameBefore = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("RecoveryTask (" + listener.getEntityId() + ")"); + doRecover(policy); + } finally { + Thread.currentThread().setName(nameBefore); + } + return null; + }; + } + + public void cancel() { + cancelRecovery = true; + } + + protected Void resumeOrRecover(MetadataProvider metadataProvider) + throws HyracksDataException, AlgebricksException, InterruptedException { + try { + synchronized (listener) { + listener.doResume(metadataProvider); + listener.setState(ActivityState.RUNNING); + } + } catch (Exception e) { + LOGGER.log(Level.WARNING, "First attempt to resume " + listener.getEntityId() + " Failed", e); + synchronized (listener) { + if (listener.getState() == ActivityState.RESUMING) { + // This will be the case if compilation failure + // If the failure is a runtime failure, then the state + // would've been set to temporarily failed already + listener.setState(ActivityState.TEMPORARILY_FAILED); + } + } + if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { + synchronized (listener) { + if (!cancelRecovery) { + listener.setState(ActivityState.PERMANENTLY_FAILED); + } + } + } else { + IRetryPolicy policy = retryPolicyFactory.create(listener); + doRecover(policy); + } + } + return null; + } + + protected Void doRecover(IRetryPolicy policy) + throws AlgebricksException, HyracksDataException, InterruptedException { + LOGGER.log(level, "Actual Recovery task has started"); + if (listener.getState() != ActivityState.TEMPORARILY_FAILED) { + LOGGER.log(level, "but its state is not temp failure and so we're just returning"); + return null; + } + LOGGER.log(level, "calling the policy"); + while (policy.retry()) { + synchronized (listener) { + if (cancelRecovery) { + return null; + } + while (clusterStateManager.getState() != ClusterState.ACTIVE) { + if (cancelRecovery) { + return null; + } + wait(); + } + } + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), + listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName()); + for (Dataset dataset : listener.getDatasets()) { + MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), + dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset)); + } + synchronized (listener) { + try { + if (cancelRecovery) { + return null; + } + listener.setState(ActivityState.RECOVERING); + listener.doStart(metadataProvider); + return null; + } catch (Exception e) { + LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e); + listener.setState(ActivityState.TEMPORARILY_FAILED); + failure = e; + } finally { + metadataProvider.getLocks().reset(); + } + listener.notifyAll(); + } + } + // Recovery task is essntially over now either through failure or through cancellation(stop) + synchronized (listener) { + listener.notifyAll(); + if (listener.getState() != ActivityState.TEMPORARILY_FAILED) { + return null; + } + } + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + try { + lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), + listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName()); + for (Dataset dataset : listener.getDatasets()) { + MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(), + DatasetUtil.getFullyQualifiedName(dataset)); + } + synchronized (listener) { + if (listener.getState() == ActivityState.TEMPORARILY_FAILED) { + listener.setState(ActivityState.PERMANENTLY_FAILED); + } + listener.notifyAll(); + } + } finally { + metadataProvider.getLocks().reset(); + } + return null; + } + + public Exception getFailure() { + return failure; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java new file mode 100644 index 0000000..fb6ec37 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.hyracks.bootstrap.CCApplication; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.asterix.translator.IStatementExecutorContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class CancelQueryRequest implements ICcAddressedMessage { + + private static final Logger LOGGER = Logger.getLogger(CancelQueryRequest.class.getName()); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final long reqId; + private final String contextId; + + public CancelQueryRequest(String nodeId, long reqId, String contextId) { + this.nodeId = nodeId; + this.reqId = reqId; + this.contextId = contextId; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); + CCApplication application = (CCApplication) ccs.getApplication(); + IStatementExecutorContext executorsCtx = application.getStatementExecutorContext(); + JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId); + + if (jobId == null) { + LOGGER.log(Level.WARN, "No job found for context id " + contextId); + } else { + try { + IHyracksClientConnection hcc = application.getHcc(); + hcc.cancelJob(jobId); + executorsCtx.removeJobIdFromClientContextId(contextId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); + } + } + CancelQueryResponse response = new CancelQueryResponse(reqId); + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + try { + messageBroker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure sending response to nc", e); + } + } + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java new file mode 100644 index 0000000..4fbcf22 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class CancelQueryResponse implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private final long reqId; + + public CancelQueryResponse(long reqId) { + this.reqId = reqId; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(reqId); + if (future != null) { + future.complete(this); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index e7919fa..5cee3d9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -22,6 +22,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,23 +55,18 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage { private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName()); - + //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2062 + public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); + //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2063 + public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); private final String requestNodeId; - private final long requestMessageId; - private final ILangExtension.Language lang; - private final String statementsText; - private final SessionConfig sessionConfig; - private final IStatementExecutor.ResultDelivery delivery; - private final String clientContextID; - private final String handleUrl; public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, @@ -102,47 +98,41 @@ IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider(); IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory(); IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext(); - - ccSrv.getExecutor().submit(() -> { - ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); - try { - IParser parser = compilationProvider.getParserFactory().createParser(statementsText); - List<Statement> statements = parser.parse(); - StringWriter outWriter = new StringWriter(256); - PrintWriter outPrinter = new PrintWriter(outWriter); - SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); - SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); - SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); - SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); - SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix, - appendHandle, appendStatus); - - IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata(); - - MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, - compilationProvider, storageComponentProvider); - translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, - new IStatementExecutor.Stats(), clientContextID, statementExecutorContext); - - outPrinter.close(); - responseMsg.setResult(outWriter.toString()); - responseMsg.setMetadata(outMetadata); - } catch (AlgebricksException | HyracksException | TokenMgrError - | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { - // we trust that "our" exceptions are serializable and have a comprehensible error message - GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe); - responseMsg.setError(pe); - } catch (Exception e) { - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); - responseMsg.setError(new Exception(e.toString())); - } - try { - messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); - } catch (Exception e) { - LOGGER.log(Level.WARNING, e.toString(), e); - } - }); + ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); + try { + IParser parser = compilationProvider.getParserFactory().createParser(statementsText); + List<Statement> statements = parser.parse(); + StringWriter outWriter = new StringWriter(256); + PrintWriter outPrinter = new PrintWriter(outWriter); + SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); + SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); + SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); + SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); + SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix, + appendHandle, appendStatus); + IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata(); + MetadataManager.INSTANCE.init(); + IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, + compilationProvider, storageComponentProvider); + translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, new IStatementExecutor.Stats(), + clientContextID, statementExecutorContext); + outPrinter.close(); + responseMsg.setResult(outWriter.toString()); + responseMsg.setMetadata(outMetadata); + } catch (AlgebricksException | HyracksException | TokenMgrError + | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { + // we trust that "our" exceptions are serializable and have a comprehensible error message + GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe); + responseMsg.setError(pe); + } catch (Exception e) { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); + responseMsg.setError(new Exception(e.toString())); + } + try { + messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e.toString(), e); + } } private String getRejectionReason(ClusterControllerService ccSrv) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java index 4f9aa0c..54f0a4e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java @@ -19,8 +19,6 @@ package org.apache.asterix.app.message; -import java.util.concurrent.TimeUnit; - import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.messaging.api.MessageFuture; @@ -30,8 +28,6 @@ public final class ExecuteStatementResponseMessage implements INcAddressedMessage { private static final long serialVersionUID = 1L; - - public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); private final long requestMessageId; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index e8636c8..9040ad1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -21,8 +21,8 @@ import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL; import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.util.Arrays; import java.util.List; @@ -47,10 +47,10 @@ import org.apache.asterix.api.http.server.QueryStatusApiServlet; import org.apache.asterix.api.http.server.QueryWebInterfaceServlet; import org.apache.asterix.api.http.server.RebalanceApiServlet; +import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.api.http.server.ShutdownApiServlet; import org.apache.asterix.api.http.server.UpdateApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; -import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.cc.ResourceIdManager; @@ -106,6 +106,7 @@ protected WebManager webManager; protected CcApplicationContext appCtx; private IJobCapacityController jobCapacityController; + private IHyracksClientConnection hcc; @Override public void start(IServiceContext serviceCtx, String[] args) throws Exception { @@ -124,6 +125,9 @@ ccServiceCtx.setThreadFactory( new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); + String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress(); + int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); + hcc = new HyracksConnection(strIP, port); ILibraryManager libraryManager = new ExternalLibraryManager(); ResourceIdManager resourceIdManager = new ResourceIdManager(); IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy(); @@ -196,7 +200,6 @@ protected HttpServer setupWebServer(ExternalProperties externalProperties) throws Exception { HttpServer webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getWebInterfacePort()); - IHyracksClientConnection hcc = getHcc(); webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, appCtx, ccExtensionManager.getCompilationProvider(AQL), ccExtensionManager.getCompilationProvider(SQLPP), @@ -207,7 +210,6 @@ protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception { HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort()); - IHyracksClientConnection hcc = getHcc(); jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx); jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR, @@ -252,7 +254,6 @@ protected HttpServer setupQueryWebServer(ExternalProperties externalProperties) throws Exception { HttpServer queryWebServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getQueryWebInterfacePort()); - IHyracksClientConnection hcc = getHcc(); queryWebServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); queryWebServer.addServlet(new QueryWebInterfaceServlet(appCtx, queryWebServer.ctx(), new String[] { "/*" })); return queryWebServer; @@ -357,9 +358,7 @@ return appCtx; } - protected IHyracksClientConnection getHcc() throws Exception { - String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress(); - int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); - return new HyracksConnection(strIP, port); + public IHyracksClientConnection getHcc() { + return hcc; } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java index 5abbe40..cd58d8f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.asterix.api.http.server.ConnectorApiServlet; +import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java index 5f40a85..3cb46fe 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java @@ -29,6 +29,7 @@ import org.apache.asterix.api.http.ctx.StatementExecutorContext; import org.apache.asterix.api.http.server.QueryCancellationServlet; +import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobId; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java deleted file mode 100644 index e0539ac..0000000 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.api.http.servlet; - -import org.apache.asterix.api.http.server.QueryServiceServlet; -import org.junit.Assert; -import org.junit.Test; - -public class QueryServiceServletTest { - - @Test - public void testTimeUnitFormatNanos() throws Exception { - Assert.assertEquals("123.456789012s", QueryServiceServlet.TimeUnit.formatNanos(123456789012l)); - Assert.assertEquals("12.345678901s", QueryServiceServlet.TimeUnit.formatNanos(12345678901l)); - Assert.assertEquals("1.234567890s", QueryServiceServlet.TimeUnit.formatNanos(1234567890l)); - Assert.assertEquals("123.456789ms", QueryServiceServlet.TimeUnit.formatNanos(123456789l)); - Assert.assertEquals("12.345678ms", QueryServiceServlet.TimeUnit.formatNanos(12345678l)); - Assert.assertEquals("1.234567ms", QueryServiceServlet.TimeUnit.formatNanos(1234567l)); - Assert.assertEquals("123.456µs", QueryServiceServlet.TimeUnit.formatNanos(123456l)); - Assert.assertEquals("12.345µs", QueryServiceServlet.TimeUnit.formatNanos(12345l)); - Assert.assertEquals("1.234µs", QueryServiceServlet.TimeUnit.formatNanos(1234l)); - Assert.assertEquals("123ns", QueryServiceServlet.TimeUnit.formatNanos(123l)); - Assert.assertEquals("12ns", QueryServiceServlet.TimeUnit.formatNanos(12l)); - Assert.assertEquals("1ns", QueryServiceServlet.TimeUnit.formatNanos(1l)); - Assert.assertEquals("-123.456789012s", QueryServiceServlet.TimeUnit.formatNanos(-123456789012l)); - Assert.assertEquals("120.000000000s", QueryServiceServlet.TimeUnit.formatNanos(120000000000l)); - Assert.assertEquals("-12ns", QueryServiceServlet.TimeUnit.formatNanos(-12l)); - } -} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java index f994e98..e583c75 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java @@ -19,8 +19,8 @@ package org.apache.asterix.api.http.servlet; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java new file mode 100644 index 0000000..12c61d6 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ParseDurationTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime; + +import org.apache.asterix.api.http.server.Duration; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.junit.Assert; +import org.junit.Test; + +public class ParseDurationTest { + + @Test + public void test() throws Exception { + // simple + Assert.assertEquals(0, Duration.parseDurationStringToNanos("0")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5), + Duration.parseDurationStringToNanos("5s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(30), + Duration.parseDurationStringToNanos("30s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1478), + Duration.parseDurationStringToNanos("1478s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(-5), + Duration.parseDurationStringToNanos("-5s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5), + Duration.parseDurationStringToNanos("+5s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0), + Duration.parseDurationStringToNanos("-0")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(0), + Duration.parseDurationStringToNanos("+0")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5), + Duration.parseDurationStringToNanos("5.0s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(5) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(600), + Duration.parseDurationStringToNanos("5.6s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(5), + Duration.parseDurationStringToNanos("5.s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500), + Duration.parseDurationStringToNanos(".5s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1), + Duration.parseDurationStringToNanos("1.0s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(1), + Duration.parseDurationStringToNanos("1.00s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(1) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4), + Duration.parseDurationStringToNanos("1.004s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(1) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4), + Duration.parseDurationStringToNanos("1.0040s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(100) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(1), + Duration.parseDurationStringToNanos("100.00100s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(10), + Duration.parseDurationStringToNanos("10ns")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(11), + Duration.parseDurationStringToNanos("11us")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12), + Duration.parseDurationStringToNanos("12µs")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(12), + Duration.parseDurationStringToNanos("12μs")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(13), + Duration.parseDurationStringToNanos("13ms")); + Assert.assertEquals(java.util.concurrent.TimeUnit.SECONDS.toNanos(14), + Duration.parseDurationStringToNanos("14s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.MINUTES.toNanos(15), + Duration.parseDurationStringToNanos("15m")); + Assert.assertEquals(java.util.concurrent.TimeUnit.HOURS.toNanos(16), + Duration.parseDurationStringToNanos("16h")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.HOURS.toNanos(3) + java.util.concurrent.TimeUnit.MINUTES.toNanos(30), + Duration.parseDurationStringToNanos("3h30m")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.SECONDS.toNanos(10) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(500) + + java.util.concurrent.TimeUnit.MINUTES.toNanos(4), + Duration.parseDurationStringToNanos("10.5s4m")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.MINUTES.toNanos(-2) + java.util.concurrent.TimeUnit.SECONDS.toNanos(-3) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(-400), + Duration.parseDurationStringToNanos("-2m3.4s")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.HOURS.toNanos(1) + java.util.concurrent.TimeUnit.MINUTES.toNanos(2) + + java.util.concurrent.TimeUnit.SECONDS.toNanos(3) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(4) + + java.util.concurrent.TimeUnit.MICROSECONDS.toNanos(5) + + java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(6), + Duration.parseDurationStringToNanos("1h2m3s4ms5us6ns")); + Assert.assertEquals( + java.util.concurrent.TimeUnit.HOURS.toNanos(39) + java.util.concurrent.TimeUnit.MINUTES.toNanos(9) + + java.util.concurrent.TimeUnit.SECONDS.toNanos(14) + + java.util.concurrent.TimeUnit.MILLISECONDS.toNanos(425), + Duration.parseDurationStringToNanos("39h9m14.425s")); + Assert.assertEquals(java.util.concurrent.TimeUnit.NANOSECONDS.toNanos(52763797000L), + Duration.parseDurationStringToNanos("52763797000ns")); + Assert.assertEquals(1199999998800L, Duration.parseDurationStringToNanos("0.3333333333333333333333h")); + Assert.assertEquals(9007199254740993L, Duration.parseDurationStringToNanos("9007199254740993ns")); + Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775807ns")); + Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036854775.807us")); + Assert.assertEquals(9223372036854775807L, Duration.parseDurationStringToNanos("9223372036s854ms775us807ns")); + Assert.assertEquals(-9223372036854775807L, Duration.parseDurationStringToNanos("-9223372036854775807ns")); + assertFail(""); + assertFail("3"); + assertFail("-"); + assertFail("s"); + assertFail("."); + assertFail("-."); + assertFail(".s"); + assertFail("+.s"); + assertFail("3000000h"); + assertFail("9223372036854775808ns"); + assertFail("9223372036854775.808us"); + assertFail("9223372036854ms775us808ns"); + assertFail("-9223372036854775808ns"); + } + + @Test + public void testDurationFormatNanos() throws Exception { + Assert.assertEquals("123.456789012s", Duration.formatNanos(123456789012l)); + Assert.assertEquals("12.345678901s", Duration.formatNanos(12345678901l)); + Assert.assertEquals("1.234567890s", Duration.formatNanos(1234567890l)); + Assert.assertEquals("123.456789ms", Duration.formatNanos(123456789l)); + Assert.assertEquals("12.345678ms", Duration.formatNanos(12345678l)); + Assert.assertEquals("1.234567ms", Duration.formatNanos(1234567l)); + Assert.assertEquals("123.456µs", Duration.formatNanos(123456l)); + Assert.assertEquals("12.345µs", Duration.formatNanos(12345l)); + Assert.assertEquals("1.234µs", Duration.formatNanos(1234l)); + Assert.assertEquals("123ns", Duration.formatNanos(123l)); + Assert.assertEquals("12ns", Duration.formatNanos(12l)); + Assert.assertEquals("1ns", Duration.formatNanos(1l)); + Assert.assertEquals("-123.456789012s", Duration.formatNanos(-123456789012l)); + Assert.assertEquals("120.000000000s", Duration.formatNanos(120000000000l)); + Assert.assertEquals("-12ns", Duration.formatNanos(-12l)); + } + + private void assertFail(String duration) { + try { + Duration.parseDurationStringToNanos(duration); + Assert.fail("Expected parseDuration(" + duration + ") to fail but it didn't"); + } catch (HyracksDataException hde) { + } + } +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 143f7d1..fcfb428 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -68,6 +68,9 @@ public static final int POLYGON_3_POINTS = 25; public static final int POLYGON_INVALID = 26; public static final int OPERATION_NOT_SUPPORTED = 27; + public static final int INVALID_DURATION = 28; + public static final int UNKNOWN_DURATION_UNIT = 29; + public static final int QUERY_TIMEOUT = 30; public static final int INSTANTIATION_ERROR = 100; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 715c27d..5bd5482 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -61,6 +61,9 @@ 25 = Polygon must have at least 3 points 26 = %1$s can not be an instance of polygon 27 = Operation not supported +28 = Invalid duration %1$s +29 = Unknown duration unit %1$s +30 = Query timed out 100 = Unable to instantiate class %1$s diff --git a/asterixdb/asterix-metadata/pom.xml b/asterixdb/asterix-metadata/pom.xml index 5497cf4..458bb67 100644 --- a/asterixdb/asterix-metadata/pom.xml +++ b/asterixdb/asterix-metadata/pom.xml @@ -16,7 +16,8 @@ ! specific language governing permissions and limitations ! under the License. !--> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>apache-asterixdb</artifactId> diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index e2868ae..95479c1 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -130,6 +130,9 @@ public CancelJobFunction(JobId jobId) { this.jobId = jobId; + if (jobId == null) { + throw new IllegalArgumentException("jobId"); + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index dbbaf9f..a3078b6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -65,6 +65,8 @@ import org.apache.hyracks.control.cc.work.JobCleanupWork; import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; +import org.apache.hyracks.control.common.work.NoOpCallback; +import org.apache.hyracks.control.common.work.IResultCallback; public class JobExecutor { private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName()); @@ -114,9 +116,10 @@ ccs.getContext().notifyJobStart(jobRun.getJobId()); } - public void cancelJob() throws HyracksException { + public void cancelJob(IResultCallback<Void> callback) throws HyracksException { // If the job is already terminated or failed, do nothing here. if (jobRun.getPendingStatus() != null) { + callback.setValue(null); return; } // Sets the cancelled flag. @@ -124,7 +127,8 @@ // Aborts on-ongoing task clusters. abortOngoingTaskClusters(ta -> false, ta -> null); // Aborts the whole job. - abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId()))); + abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())), + callback); } private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots) @@ -196,8 +200,8 @@ "Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " + inProgressTaskClusters); } if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) { - ccs.getWorkQueue() - .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, null)); + ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, + null, NoOpCallback.INSTANCE)); return; } startRunnableTaskClusters(taskClusterRoots); @@ -520,14 +524,14 @@ } } - public void abortJob(List<Exception> exceptions) { + public void abortJob(List<Exception> exceptions, IResultCallback<Void> callback) { Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<>(inProgressTaskClusters); for (TaskCluster tc : inProgressTaskClustersCopy) { abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED); } assert inProgressTaskClusters.isEmpty(); - ccs.getWorkQueue() - .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions)); + ccs.getWorkQueue().schedule( + new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions, callback)); } private void abortTaskCluster(TaskClusterAttempt tcAttempt, @@ -686,7 +690,7 @@ + " as failed and the number of max re-attempts = " + maxReattempts); if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) { LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId()); - abortJob(exceptions); + abortJob(exceptions, NoOpCallback.INSTANCE); return; } LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of " + ta.getTaskAttemptId()); @@ -696,7 +700,7 @@ "Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt); } } catch (Exception e) { - abortJob(Collections.singletonList(e)); + abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE); } } @@ -720,7 +724,7 @@ ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId())); startRunnableActivityClusters(); } catch (Exception e) { - abortJob(Collections.singletonList(e)); + abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java index 8fe542f..a9ddee3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.control.common.work.IResultCallback; /** * This interface abstracts the job lifecycle management and job scheduling for a cluster. @@ -47,10 +48,12 @@ /** * Cancel a job with a given job id. * + * @param callback + * * @param jobId, * the id of the job. */ - void cancel(JobId jobId) throws HyracksException; + void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException; /** * This method is called when the master process decides to complete job. diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index abf1d57..4ba847d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -46,6 +46,8 @@ import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue; import org.apache.hyracks.control.cc.scheduler.IJobQueue; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.work.NoOpCallback; +import org.apache.hyracks.control.common.work.IResultCallback; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -115,17 +117,14 @@ } @Override - public void cancel(JobId jobId) throws HyracksException { - if (jobId == null) { - return; - } + public void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException { // Cancels a running job. if (activeRunMap.containsKey(jobId)) { JobRun jobRun = activeRunMap.get(jobId); // The following call will abort all ongoing tasks and then consequently // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job. // Therefore, we do not remove the job out of activeRunMap here. - jobRun.getExecutor().cancelJob(); + jobRun.getExecutor().cancelJob(callback); return; } // Removes a pending job. @@ -138,6 +137,7 @@ runMapArchive.put(jobId, jobRun); runMapHistory.put(jobId, exceptions); } + callback.setValue(null); } @Override @@ -322,7 +322,7 @@ // fail the job then abort it run.setStatus(JobStatus.FAILURE, exceptions); // abort job will trigger JobCleanupWork - run.getExecutor().abortJob(exceptions); + run.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java index f3b67c9..e3135df 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java @@ -42,10 +42,7 @@ @Override protected void doRun() throws Exception { try { - if (jobId != null) { - jobManager.cancel(jobId); - } - callback.setValue(null); + jobManager.cancel(jobId, callback); } catch (Exception e) { callback.setException(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java index 502ac50..bb85c13 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java @@ -29,6 +29,7 @@ import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.common.work.IResultCallback; public class JobCleanupWork extends AbstractWork { private static final Logger LOGGER = Logger.getLogger(JobCleanupWork.class.getName()); @@ -37,12 +38,15 @@ private JobId jobId; private JobStatus status; private List<Exception> exceptions; + private IResultCallback<Void> callback; - public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions) { + public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions, + IResultCallback<Void> callback) { this.jobManager = jobManager; this.jobId = jobId; this.status = status; this.exceptions = exceptions; + this.callback = callback; } @Override @@ -53,6 +57,7 @@ try { JobRun jobRun = jobManager.get(jobId); jobManager.prepareComplete(jobRun, status, exceptions); + callback.setValue(null); } catch (HyracksException e) { // Fail the job with the caught exception during final completion. JobRun run = jobManager.get(jobId); @@ -62,6 +67,7 @@ } completionException.add(0, e); run.setStatus(JobStatus.FAILURE, completionException); + callback.setException(e); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java index ed2a740..0d46d64 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java @@ -48,6 +48,7 @@ import org.apache.hyracks.control.common.base.INodeController; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.logs.LogFile; +import org.apache.hyracks.control.common.work.NoOpCallback; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -207,7 +208,7 @@ } @Test - public void testCancel() throws HyracksException { + public void testCancel() throws Exception { CCConfig ccConfig = new CCConfig(); IJobCapacityController jobCapacityController = mock(IJobCapacityController.class); IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController)); @@ -247,12 +248,12 @@ // Cancels deferred jobs. for (JobRun run : deferredRuns) { - jobManager.cancel(run.getJobId()); + jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE); } // Cancels runnable jobs. for (JobRun run : acceptedRuns) { - jobManager.cancel(run.getJobId()); + jobManager.cancel(run.getJobId(), NoOpCallback.INSTANCE); } Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java index 150e0e8..ca0c7c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/FutureValue.java @@ -19,6 +19,7 @@ package org.apache.hyracks.control.common.work; public class FutureValue<T> implements IResultCallback<T> { + private boolean done; private T value; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java new file mode 100644 index 0000000..041cee0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/NoOpCallback.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.work; + +public class NoOpCallback implements IResultCallback<Void> { + + public static final NoOpCallback INSTANCE = new NoOpCallback(); + + private NoOpCallback() { + } + + @Override + public void setValue(Void result) { + // Dummy is used when no callback is provided + } + + @Override + public void setException(Exception e) { + // Dummy is used when no callback is provided + } + +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1961 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5 Gerrit-PatchSet: 14 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Michael Carey <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
