abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1961
Change subject: [ASTERIXDB-2058][HYR] Only Complete job cancellation after
cleanup
......................................................................
[ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup
Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
R
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
M
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
M
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
M
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
M
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
M
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
M
hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
M
hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
M
hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
M
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
M
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
M
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
M
hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
53 files changed, 552 insertions(+), 308 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/61/1961/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index cfd4e87..b09d139 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -111,18 +111,20 @@
private final boolean executeQuery;
private final boolean generateJobSpec;
private final boolean optimize;
+ private final long timeout;
// Flags.
private final Map<String, Boolean> flags;
public SessionConfig(OutputFormat fmt) {
- this(fmt, true, true, true);
+ this(fmt, true, true, true, Long.MAX_VALUE);
}
/**
* Create a SessionConfig object with all optional values set to defaults:
* - All format flags set to "false".
* - All out-of-band outputs set to "false".
+ *
* @param fmt
* Output format for execution output.
* @param optimize
@@ -132,12 +134,14 @@
* @param generateJobSpec
* Whether to generate the Hyracks job specification (if
*/
- public SessionConfig(OutputFormat fmt, boolean optimize, boolean
executeQuery, boolean generateJobSpec) {
+ public SessionConfig(OutputFormat fmt, boolean optimize, boolean
executeQuery, boolean generateJobSpec,
+ long timeout) {
this.fmt = fmt;
this.optimize = optimize;
this.executeQuery = executeQuery;
this.generateJobSpec = generateJobSpec;
this.flags = new HashMap<>();
+ this.timeout = timeout;
}
/**
@@ -203,4 +207,8 @@
Boolean value = flags.get(flag);
return value == null ? false : value.booleanValue();
}
+
+ public long getTimeout() {
+ return timeout;
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 583302b..f31e993 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -356,13 +356,13 @@
return spec;
}
- public void executeJobArray(IHyracksClientConnection hcc,
JobSpecification[] specs, PrintWriter out)
+ public void executeJobArray(IHyracksClientConnection hcc,
JobSpecification[] specs, PrintWriter out, long timeout)
throws Exception {
for (JobSpecification spec : specs) {
spec.setMaxReattempts(0);
JobId jobId = hcc.startJob(spec);
long startTime = System.currentTimeMillis();
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, timeout);
long endTime = System.currentTimeMillis();
double duration = (endTime - startTime) / 1000.00;
out.println("<pre>Duration: " + duration + " sec</pre>");
@@ -370,7 +370,8 @@
}
- public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs,
PrintWriter out) throws Exception {
+ public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs,
PrintWriter out, long timeout)
+ throws Exception {
for (Job job : jobs) {
job.getJobSpec().setMaxReattempts(0);
long startTime = System.currentTimeMillis();
@@ -379,7 +380,7 @@
if (job.getSubmissionMode() == SubmissionMode.ASYNCHRONOUS) {
continue;
}
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, timeout);
} catch (Exception e) {
e.printStackTrace();
continue;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 7874aa3..e7b7435 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -138,7 +138,7 @@
}
IParser parser = parserFactory.createParser(query);
List<Statement> aqlStatements = parser.parse();
- SessionConfig sessionConfig = new SessionConfig(format, true,
isSet(executeQuery), true);
+ SessionConfig sessionConfig = new SessionConfig(format, true,
isSet(executeQuery), true, Long.MAX_VALUE);
sessionConfig.set(SessionConfig.FORMAT_HTML, true);
sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader);
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY,
isSet(wrapperArray));
@@ -150,8 +150,8 @@
compilationProvider, componentProvider);
double duration;
long startTime = System.currentTimeMillis();
- translator.compileAndExecute(hcc, hds,
IStatementExecutor.ResultDelivery.IMMEDIATE,
- null, new IStatementExecutor.Stats());
+ translator.compileAndExecute(hcc, hds,
IStatementExecutor.ResultDelivery.IMMEDIATE, null,
+ new IStatementExecutor.Stats());
long endTime = System.currentTimeMillis();
duration = (endTime - startTime) / 1000.00;
out.println(HTML_STATEMENT_SEPARATOR);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1f1d282..e39b63f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -121,7 +121,7 @@
}
}
- private enum Attribute {
+ protected enum Attribute {
HEADER("header"),
LOSSLESS("lossless");
@@ -182,14 +182,14 @@
}
}
- static class RequestParameters {
- String host;
- String path;
- String statement;
- String format;
- boolean pretty;
- String clientContextID;
- String mode;
+ protected static class RequestParameters {
+ public String host;
+ public String path;
+ public String statement;
+ public String format;
+ public boolean pretty;
+ public String clientContextID;
+ public String mode;
@Override
public String toString() {
@@ -209,7 +209,7 @@
}
}
- private static String getParameterValue(String content, String attribute) {
+ protected static String getParameterValue(String content, String
attribute) {
if (content == null || attribute == null) {
return null;
}
@@ -231,7 +231,7 @@
return s != null ? s.toLowerCase() : s;
}
- private static SessionConfig.OutputFormat getFormat(String format) {
+ protected static SessionConfig.OutputFormat getFormat(String format) {
if (format != null) {
if (format.startsWith(HttpUtil.ContentType.CSV)) {
return SessionConfig.OutputFormat.CSV;
@@ -247,8 +247,7 @@
return SessionConfig.OutputFormat.CLEAN_JSON;
}
- private static SessionOutput createSessionOutput(RequestParameters param,
String handleUrl,
- PrintWriter resultWriter) {
+ protected SessionOutput createSessionOutput(RequestParameters param,
String handleUrl, PrintWriter resultWriter) {
SessionOutput.ResultDecorator resultPrefix =
ResultUtil.createPreResultDecorator();
SessionOutput.ResultDecorator resultPostfix =
ResultUtil.createPostResultDecorator();
SessionOutput.ResultAppender appendHandle =
ResultUtil.createResultHandleAppender(handleUrl);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java
similarity index 62%
rename from
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
rename to
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java
index 9547514..4f4221e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java
@@ -17,14 +17,18 @@
* under the License.
*/
-package org.apache.asterix.api.http.server;
+package org.apache.asterix.api.http.servlet;
+import java.io.PrintWriter;
+import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
-import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.server.QueryServiceServlet;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.message.CancelQueryRequest;
import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
import org.apache.asterix.app.result.ResultReader;
@@ -34,6 +38,7 @@
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -41,11 +46,14 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.ipc.exceptions.IPCException;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
/**
* Query service servlet that can run on NC nodes.
* Delegates query execution to CC, then serves the result.
*/
public class NCQueryServiceServlet extends QueryServiceServlet {
+
public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[]
paths, IApplicationContext appCtx,
ILangExtension.Language queryLanguage) {
super(ctx, paths, appCtx, queryLanguage, null, null, null);
@@ -63,13 +71,23 @@
ExecuteStatementResponseMessage responseMsg;
MessageFuture responseFuture = ncMb.registerMessageFuture();
try {
+ if (param.clientContextID == null) {
+ param.clientContextID = UUID.randomUUID().toString();
+ }
ExecuteStatementRequestMessage requestMsg =
new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
responseFuture.getFutureId(), queryLanguage,
statementsText, sessionOutput.config(),
ccDelivery, param.clientContextID, handleUrl);
outExecStartEnd[0] = System.nanoTime();
ncMb.sendMessageToCC(requestMsg);
- responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
- ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS,
java.util.concurrent.TimeUnit.MILLISECONDS);
+ try {
+ responseMsg = (ExecuteStatementResponseMessage)
responseFuture.get(
+
ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS,
+ java.util.concurrent.TimeUnit.MILLISECONDS);
+ } catch (TimeoutException exception) {
+ // cancel query
+ cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID,
exception);
+ throw exception;
+ }
outExecStartEnd[1] = System.nanoTime();
} finally {
ncMb.deregisterMessageFuture(responseFuture.getFutureId());
@@ -97,6 +115,39 @@
}
}
+ private void cancelQuery(INCMessageBroker messageBroker, String nodeId,
String clientContextID,
+ TimeoutException exception) {
+ MessageFuture cancelQueryFuture =
messageBroker.registerMessageFuture();
+ CancelQueryRequest cancelQueryMessage =
+ new CancelQueryRequest(nodeId,
cancelQueryFuture.getFutureId(), clientContextID);
+ try {
+ messageBroker.sendMessageToCC(cancelQueryMessage);
+
cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS,
+ java.util.concurrent.TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ exception.addSuppressed(e);
+ }
+ }
+
+ @Override
+ protected SessionOutput createSessionOutput(RequestParameters param,
String handleUrl, PrintWriter resultWriter) {
+ SessionOutput.ResultDecorator resultPrefix =
ResultUtil.createPreResultDecorator();
+ SessionOutput.ResultDecorator resultPostfix =
ResultUtil.createPostResultDecorator();
+ SessionOutput.ResultAppender appendHandle =
ResultUtil.createResultHandleAppender(handleUrl);
+ SessionOutput.ResultAppender appendStatus =
ResultUtil.createResultStatusAppender();
+
+ SessionConfig.OutputFormat format =
QueryServiceServlet.getFormat(param.format);
+ SessionConfig sessionConfig =
+ new SessionConfig(format, true, true, true,
ExecuteStatementRequestMessage.DEFAULT_JOB_TIMEOUT_MILLIS);
+ sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
+ sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
+ sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
+ format != SessionConfig.OutputFormat.CLEAN_JSON && format !=
SessionConfig.OutputFormat.LOSSLESS_JSON);
+ sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format ==
SessionConfig.OutputFormat.CSV
+ &&
"present".equals(QueryServiceServlet.getParameterValue(param.format,
Attribute.HEADER.str())));
+ return new SessionOutput(sessionConfig, resultWriter, resultPrefix,
resultPostfix, appendHandle, appendStatus);
+ }
+
@Override
protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
if (t instanceof IPCException || t instanceof TimeoutException) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index a9d24b9..d6bbb6e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -100,7 +100,7 @@
List<Statement> statements = parser.parse();
MetadataManager.INSTANCE.init();
- SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize,
true, generateBinaryRuntime);
+ SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize,
true, generateBinaryRuntime, Long.MAX_VALUE);
conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan,
printOptimizedPlan, printJob);
if (printPhysicalOpsOnly) {
conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true);
@@ -109,17 +109,17 @@
IStatementExecutor translator =
statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
storageComponentProvider);
- translator.compileAndExecute(hcc, null,
QueryTranslator.ResultDelivery.IMMEDIATE,
- null, new IStatementExecutor.Stats());
+ translator.compileAndExecute(hcc, null,
QueryTranslator.ResultDelivery.IMMEDIATE, null,
+ new IStatementExecutor.Stats());
writer.flush();
}
public void execute() throws Exception {
if (dmlJobs != null) {
- apiFramework.executeJobArray(hcc, dmlJobs, writer);
+ apiFramework.executeJobArray(hcc, dmlJobs, writer, Long.MAX_VALUE);
}
if (queryJobSpec != null) {
- apiFramework.executeJobArray(hcc, new JobSpecification[] {
queryJobSpec }, writer);
+ apiFramework.executeJobArray(hcc, new JobSpecification[] {
queryJobSpec }, writer, Long.MAX_VALUE);
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 124e56e..e01b1c5 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -97,7 +97,7 @@
// We will need to design general exception handling mechanism for
feeds.
setLocations(jobInfo.getRight());
boolean wait =
Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
- JobUtils.runJob(hcc, feedJob, false);
+ JobUtils.runJob(hcc, feedJob);
eventSubscriber.sync();
if (wait) {
IActiveEntityEventSubscriber stoppedSubscriber = new
WaitForStateSubscriber(this,
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
new file mode 100644
index 0000000..fb6ec37
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class CancelQueryRequest implements ICcAddressedMessage {
+
+ private static final Logger LOGGER =
Logger.getLogger(CancelQueryRequest.class.getName());
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final long reqId;
+ private final String contextId;
+
+ public CancelQueryRequest(String nodeId, long reqId, String contextId) {
+ this.nodeId = nodeId;
+ this.reqId = reqId;
+ this.contextId = contextId;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
+ ClusterControllerService ccs = (ClusterControllerService)
appCtx.getServiceContext().getControllerService();
+ CCApplication application = (CCApplication) ccs.getApplication();
+ IStatementExecutorContext executorsCtx =
application.getStatementExecutorContext();
+ JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId);
+
+ if (jobId == null) {
+ LOGGER.log(Level.WARN, "No job found for context id " + contextId);
+ } else {
+ try {
+ IHyracksClientConnection hcc = application.getHcc();
+ hcc.cancelJob(jobId);
+ executorsCtx.removeJobIdFromClientContextId(contextId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "unexpected exception thrown from
cancel", e);
+ }
+ }
+ CancelQueryResponse response = new CancelQueryResponse(reqId);
+ CCMessageBroker messageBroker = (CCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ try {
+ messageBroker.sendApplicationMessageToNC(response, nodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+ }
+ }
+
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
new file mode 100644
index 0000000..4fbcf22
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CancelQueryResponse implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final long reqId;
+
+ public CancelQueryResponse(long reqId) {
+ this.reqId = reqId;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
+ NCMessageBroker mb = (NCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ MessageFuture future = mb.deregisterMessageFuture(reqId);
+ if (future != null) {
+ future.complete(this);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index e7919fa..e53cc83 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -22,6 +22,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -54,23 +55,17 @@
public final class ExecuteStatementRequestMessage implements
ICcAddressedMessage {
private static final long serialVersionUID = 1L;
-
private static final Logger LOGGER =
Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
-
+ public static final long DEFAULT_NC_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(6);
+ public static final long DEFAULT_JOB_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(5);
+ public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(1);
private final String requestNodeId;
-
private final long requestMessageId;
-
private final ILangExtension.Language lang;
-
private final String statementsText;
-
private final SessionConfig sessionConfig;
-
private final IStatementExecutor.ResultDelivery delivery;
-
private final String clientContextID;
-
private final String handleUrl;
public ExecuteStatementRequestMessage(String requestNodeId, long
requestMessageId, ILangExtension.Language lang,
@@ -102,47 +97,41 @@
IStorageComponentProvider storageComponentProvider =
ccAppCtx.getStorageComponentProvider();
IStatementExecutorFactory statementExecutorFactory =
ccApp.getStatementExecutorFactory();
IStatementExecutorContext statementExecutorContext =
ccApp.getStatementExecutorContext();
-
- ccSrv.getExecutor().submit(() -> {
- ExecuteStatementResponseMessage responseMsg = new
ExecuteStatementResponseMessage(requestMessageId);
- try {
- IParser parser =
compilationProvider.getParserFactory().createParser(statementsText);
- List<Statement> statements = parser.parse();
- StringWriter outWriter = new StringWriter(256);
- PrintWriter outPrinter = new PrintWriter(outWriter);
- SessionOutput.ResultDecorator resultPrefix =
ResultUtil.createPreResultDecorator();
- SessionOutput.ResultDecorator resultPostfix =
ResultUtil.createPostResultDecorator();
- SessionOutput.ResultAppender appendHandle =
ResultUtil.createResultHandleAppender(handleUrl);
- SessionOutput.ResultAppender appendStatus =
ResultUtil.createResultStatusAppender();
- SessionOutput sessionOutput = new SessionOutput(sessionConfig,
outPrinter, resultPrefix, resultPostfix,
- appendHandle, appendStatus);
-
- IStatementExecutor.ResultMetadata outMetadata = new
IStatementExecutor.ResultMetadata();
-
- MetadataManager.INSTANCE.init();
- IStatementExecutor translator =
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
- compilationProvider, storageComponentProvider);
- translator.compileAndExecute(ccAppCtx.getHcc(), null,
delivery, outMetadata,
- new IStatementExecutor.Stats(), clientContextID,
statementExecutorContext);
-
- outPrinter.close();
- responseMsg.setResult(outWriter.toString());
- responseMsg.setMetadata(outMetadata);
- } catch (AlgebricksException | HyracksException | TokenMgrError
- | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
- // we trust that "our" exceptions are serializable and have a
comprehensible error message
- GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING,
pe.getMessage(), pe);
- responseMsg.setError(pe);
- } catch (Exception e) {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected
exception", e);
- responseMsg.setError(new Exception(e.toString()));
- }
- try {
- messageBroker.sendApplicationMessageToNC(responseMsg,
requestNodeId);
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, e.toString(), e);
- }
- });
+ ExecuteStatementResponseMessage responseMsg = new
ExecuteStatementResponseMessage(requestMessageId);
+ try {
+ IParser parser =
compilationProvider.getParserFactory().createParser(statementsText);
+ List<Statement> statements = parser.parse();
+ StringWriter outWriter = new StringWriter(256);
+ PrintWriter outPrinter = new PrintWriter(outWriter);
+ SessionOutput.ResultDecorator resultPrefix =
ResultUtil.createPreResultDecorator();
+ SessionOutput.ResultDecorator resultPostfix =
ResultUtil.createPostResultDecorator();
+ SessionOutput.ResultAppender appendHandle =
ResultUtil.createResultHandleAppender(handleUrl);
+ SessionOutput.ResultAppender appendStatus =
ResultUtil.createResultStatusAppender();
+ SessionOutput sessionOutput = new SessionOutput(sessionConfig,
outPrinter, resultPrefix, resultPostfix,
+ appendHandle, appendStatus);
+ IStatementExecutor.ResultMetadata outMetadata = new
IStatementExecutor.ResultMetadata();
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator =
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+ compilationProvider, storageComponentProvider);
+ translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery,
outMetadata, new IStatementExecutor.Stats(),
+ clientContextID, statementExecutorContext);
+ outPrinter.close();
+ responseMsg.setResult(outWriter.toString());
+ responseMsg.setMetadata(outMetadata);
+ } catch (AlgebricksException | HyracksException | TokenMgrError
+ | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+ // we trust that "our" exceptions are serializable and have a
comprehensible error message
+ GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(),
pe);
+ responseMsg.setError(pe);
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected
exception", e);
+ responseMsg.setError(new Exception(e.toString()));
+ }
+ try {
+ messageBroker.sendApplicationMessageToNC(responseMsg,
requestNodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, e.toString(), e);
+ }
}
private String getRejectionReason(ClusterControllerService ccSrv) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
index 4f9aa0c..54f0a4e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -19,8 +19,6 @@
package org.apache.asterix.app.message;
-import java.util.concurrent.TimeUnit;
-
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -30,8 +28,6 @@
public final class ExecuteStatementResponseMessage implements
INcAddressedMessage {
private static final long serialVersionUID = 1L;
-
- public static final long DEFAULT_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(5);
private final long requestMessageId;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b97c014..81210d5 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -267,8 +267,8 @@
*/
String threadName = Thread.currentThread().getName();
Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
- Map<String, String> config = new HashMap<>();
try {
+ Map<String, String> config = new HashMap<>();
for (Statement stmt : statements) {
if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
@@ -925,7 +925,7 @@
"Failed to create job spec for replicating
Files Index For external dataset");
}
filesIndexReplicated = true;
- runJob(hcc, spec, jobFlags);
+ runJob(hcc, spec, jobFlags, Long.MAX_VALUE);
}
}
@@ -956,7 +956,7 @@
bActiveTxn = false;
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
// #. create the index artifact in NC.
- runJob(hcc, spec, jobFlags);
+ runJob(hcc, spec, jobFlags, Long.MAX_VALUE);
// #. flush the internal dataset for correlated policy
if (ds.isCorrelated() && ds.getDatasetType() ==
DatasetType.INTERNAL) {
@@ -972,7 +972,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, spec, jobFlags);
+ runJob(hcc, spec, jobFlags, Long.MAX_VALUE);
// #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1009,7 +1009,7 @@
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, jobSpec, jobFlags);
+ runJob(hcc, jobSpec, jobFlags, Long.MAX_VALUE);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -1028,7 +1028,7 @@
JobSpecification jobSpec =
IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- runJob(hcc, jobSpec, jobFlags);
+ runJob(hcc, jobSpec, jobFlags, Long.MAX_VALUE);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -1346,7 +1346,8 @@
// remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ JobId jobId = JobUtils.runJob(hcc, jobSpec);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
} catch (Exception e2) {
// do no throw exception since still the metadata needs to
be compensated.
@@ -2340,7 +2341,7 @@
final ResultReader resultReader = new ResultReader(hdc,
id, resultSetId);
ResultUtil.printResults(appCtx, resultReader,
sessionOutput, stats,
metadataProvider.findOutputRecordType());
- }, clientContextId, ctx);
+ }, clientContextId, ctx, sessionConfig.getTimeout());
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker,
resultDelivery, id -> {
@@ -2349,7 +2350,7 @@
outMetadata.getResultSets()
.add(Triple.of(id, resultSetId,
metadataProvider.findOutputRecordType()));
}
- }, clientContextId, ctx);
+ }, clientContextId, ctx, sessionConfig.getTimeout());
break;
default:
break;
@@ -2369,7 +2370,7 @@
printed.setTrue();
printed.notify();
}
- }, clientContextId, ctx);
+ }, clientContextId, ctx, sessionConfig.getTimeout());
} catch (Exception e) {
if (JobId.INVALID.equals(jobId.getValue())) {
// compilation failed
@@ -2390,24 +2391,25 @@
}
private void runJob(IHyracksClientConnection hcc, JobSpecification
jobSpec) throws Exception {
- runJob(hcc, jobSpec, jobFlags);
+ runJob(hcc, jobSpec, jobFlags, sessionOutput.config().getTimeout());
}
- private static void runJob(IHyracksClientConnection hcc, JobSpecification
jobSpec, EnumSet<JobFlag> jobFlags)
- throws Exception {
- JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+ private static void runJob(IHyracksClientConnection hcc, JobSpecification
jobSpec, EnumSet<JobFlag> jobFlags,
+ long timeout) throws Exception {
+ JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags);
+ hcc.waitForCompletion(jobId, timeout);
}
private static void createAndRunJob(IHyracksClientConnection hcc,
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IResultPrinter printer,
- String clientContextId, IStatementExecutorContext ctx) throws
Exception {
+ String clientContextId, IStatementExecutorContext ctx, long
timeout) throws Exception {
locker.lock();
try {
final JobSpecification jobSpec = compiler.compile();
if (jobSpec == null) {
return;
}
- final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags);
if (ctx != null && clientContextId != null) {
ctx.put(clientContextId, jobId); // Adds the running job into
the context.
}
@@ -2416,9 +2418,9 @@
}
if (ResultDelivery.ASYNC == resultDelivery) {
printer.print(jobId);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, timeout);
} else {
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, timeout);
printer.print(jobId);
}
} finally {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e8636c8..b1a3235 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -106,6 +106,7 @@
protected WebManager webManager;
protected CcApplicationContext appCtx;
private IJobCapacityController jobCapacityController;
+ private IHyracksClientConnection hcc;
@Override
public void start(IServiceContext serviceCtx, String[] args) throws
Exception {
@@ -121,9 +122,11 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix cluster controller");
}
-
ccServiceCtx.setThreadFactory(
new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new
LifeCycleComponentManager()));
+ String strIP =
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
+ int port =
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
+ hcc = new HyracksConnection(strIP, port);
ILibraryManager libraryManager = new ExternalLibraryManager();
ResourceIdManager resourceIdManager = new ResourceIdManager();
IReplicationStrategy repStrategy =
ClusterProperties.INSTANCE.getReplicationStrategy();
@@ -357,9 +360,7 @@
return appCtx;
}
- protected IHyracksClientConnection getHcc() throws Exception {
- String strIP =
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
- int port =
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
- return new HyracksConnection(strIP, port);
+ public IHyracksClientConnection getHcc() throws Exception {
+ return hcc;
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3209557..40b6c90 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -77,7 +77,7 @@
private void executeHyracksJob(JobSpecification spec) throws Exception {
spec.setMaxReattempts(0);
JobId jobId = hcc.startJob(spec);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index 958444c..d9ddb4f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -60,13 +61,13 @@
new IPushRuntimeFactory[] { new
EmptyTupleSourceRuntimeFactory() }, rDescs);
org.apache.asterix.common.transactions.JobId jobId =
JobIdFactory.generateJobId();
- FlushDatasetOperatorDescriptor flushOperator = new
FlushDatasetOperatorDescriptor(spec, jobId,
- dataset.getDatasetId());
+ FlushDatasetOperatorDescriptor flushOperator =
+ new FlushDatasetOperatorDescriptor(spec, jobId,
dataset.getDatasetId());
spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0,
flushOperator, 0);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
primarySplitsAndConstraint = metadataProvider
- .getSplitProviderAndConstraints(dataset,
dataset.getDatasetName());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
primarySplitsAndConstraint =
+ metadataProvider.getSplitProviderAndConstraints(dataset,
dataset.getDatasetName());
AlgebricksPartitionConstraint primaryPartitionConstraint =
primarySplitsAndConstraint.second;
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
emptySource,
@@ -74,7 +75,8 @@
JobEventListenerFactory jobEventListenerFactory = new
JobEventListenerFactory(jobId, true);
spec.setJobletEventListenerFactory(jobEventListenerFactory);
- JobUtils.runJob(hcc, spec, true);
+ JobId hyracksJobId = JobUtils.runJob(hcc, spec);
+ hcc.waitForCompletion(hyracksJobId, Long.MAX_VALUE);
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ceaf4cf..62ca32c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -114,9 +114,9 @@
if (!targetNcNames.isEmpty()) {
// Creates a node group for rebalance.
- String nodeGroupName = DatasetUtil
-
.createNodeGroupForNewDataset(sourceDataset.getDataverseName(),
sourceDataset.getDatasetName(),
- sourceDataset.getRebalanceCount() + 1,
targetNcNames, metadataProvider);
+ String nodeGroupName =
DatasetUtil.createNodeGroupForNewDataset(sourceDataset.getDataverseName(),
+ sourceDataset.getDatasetName(),
sourceDataset.getRebalanceCount() + 1, targetNcNames,
+ metadataProvider);
// The target dataset for rebalance.
targetDataset =
sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
@@ -266,7 +266,8 @@
private static void createRebalanceTarget(Dataset target, MetadataProvider
metadataProvider,
IHyracksClientConnection hcc) throws Exception {
JobSpecification spec = DatasetUtil.createDatasetJobSpec(target,
metadataProvider);
- JobUtils.runJob(hcc, spec, true);
+ org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, spec);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
// Populates the data from the source dataset to the rebalance target
dataset.
@@ -303,7 +304,8 @@
spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0,
commitOp, 0);
// Executes the job.
- JobUtils.runJob(hcc, spec, true);
+ org.apache.hyracks.api.job.JobId hyracksJobId = JobUtils.runJob(hcc,
spec);
+ hcc.waitForCompletion(hyracksJobId, Long.MAX_VALUE);
}
// Creates the primary index upsert operator for populating the target
dataset.
@@ -341,7 +343,8 @@
jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider,
dataset, true));
}
for (JobSpecification jobSpec : jobs) {
- JobUtils.runJob(hcc, jobSpec, true);
+ org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc,
jobSpec);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
}
@@ -355,12 +358,14 @@
// Creates the secondary index.
JobSpecification indexCreationJobSpec =
IndexUtil.buildSecondaryIndexCreationJobSpec(target,
index, metadataProvider);
- JobUtils.runJob(hcc, indexCreationJobSpec, true);
+ org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc,
indexCreationJobSpec);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
// Loads the secondary index.
JobSpecification indexLoadingJobSpec =
IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index,
metadataProvider);
- JobUtils.runJob(hcc, indexLoadingJobSpec, true);
+ jobId = JobUtils.runJob(hcc, indexLoadingJobSpec);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
}
@@ -393,9 +398,9 @@
dropDatasetFiles(dataset, metadataProvider, hcc);
// drop dataset entry from metadata
- runMetadataTransaction(metadataProvider, () ->
MetadataManager.INSTANCE
- .dropDataset(metadataProvider.getMetadataTxnContext(),
dataset.getDataverseName(),
- dataset.getDatasetName()));
+ runMetadataTransaction(metadataProvider,
+ () ->
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(),
+ dataset.getDataverseName(),
dataset.getDatasetName()));
// try to drop the dataset's node group
runMetadataTransaction(metadataProvider, () ->
tryDropDatasetNodegroup(dataset, metadataProvider));
});
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index cacbfbc..c60ebac 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -33,18 +33,13 @@
ADDED_PENDINGOP_RECORD_TO_METADATA
}
- public static JobId runJob(IHyracksClientConnection hcc, JobSpecification
spec, boolean waitForCompletion)
- throws Exception {
- return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class),
waitForCompletion);
+ public static JobId runJob(IHyracksClientConnection hcc, JobSpecification
spec) throws Exception {
+ return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class));
}
- public static JobId runJob(IHyracksClientConnection hcc, JobSpecification
spec, EnumSet<JobFlag> jobFlags,
- boolean waitForCompletion) throws Exception {
+ public static JobId runJob(IHyracksClientConnection hcc, JobSpecification
spec, EnumSet<JobFlag> jobFlags)
+ throws Exception {
spec.setMaxReattempts(0);
- final JobId jobId = hcc.startJob(spec, jobFlags);
- if (waitForCompletion) {
- hcc.waitForCompletion(jobId);
- }
- return jobId;
+ return hcc.startJob(spec, jobFlags);
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index fa60bba..910247b 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -350,12 +350,14 @@
// # disconnect the feeds
for (Pair<JobSpecification, Boolean> p :
disconnectJobList.values()) {
- JobUtils.runJob(hcc, p.first, true);
+ org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc,
p.first);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
// #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc,
jobSpec);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
@@ -389,7 +391,8 @@
// #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc,
jobSpec);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
if (!indexes.isEmpty()) {
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(this);
diff --git
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index 2971b72..d2af495 100644
---
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -65,8 +65,7 @@
ncConfig1.setClusterListenAddress("127.0.0.1");
ncConfig1.setDataListenAddress("127.0.0.1");
ncConfig1.setResultListenAddress("127.0.0.1");
- ncConfig1.setIODevices(new String [] {
joinPath(System.getProperty("user.dir"), "target", "data",
- "device0") });
+ ncConfig1.setIODevices(new String[] {
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
FileUtils.forceMkdir(new File(ncConfig1.getIODevices()[0]));
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -77,8 +76,7 @@
ncConfig2.setClusterListenAddress("127.0.0.1");
ncConfig2.setDataListenAddress("127.0.0.1");
ncConfig2.setResultListenAddress("127.0.0.1");
- ncConfig2.setIODevices(new String [] {
joinPath(System.getProperty("user.dir"), "target", "data",
- "device1") });
+ ncConfig2.setIODevices(new String[] {
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
FileUtils.forceMkdir(new File(ncConfig1.getIODevices()[0]));
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -96,7 +94,7 @@
AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e2868ae..15e5952 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -130,6 +130,9 @@
public CancelJobFunction(JobId jobId) {
this.jobId = jobId;
+ if (jobId == null) {
+ throw new NullPointerException("jobId");
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..2718ac3 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -37,6 +37,7 @@
public class HyracksClientInterfaceRemoteProxy implements
IHyracksClientInterface {
private static final int SHUTDOWN_CONNECTION_TIMEOUT_SECS = 30;
+ private static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
private final IIPCHandle ipcHandle;
@@ -51,112 +52,113 @@
public ClusterControllerInfo getClusterControllerInfo() throws Exception {
HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif
=
new
HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
- return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
+ return (ClusterControllerInfo) rpci.call(ipcHandle, gccif,
DEFAULT_TIMEOUT);
}
@Override
public JobStatus getJobStatus(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
new
HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId);
- return (JobStatus) rpci.call(ipcHandle, gjsf);
+ return (JobStatus) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT);
}
@Override
public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws
Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf =
new
HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags);
- return (JobId) rpci.call(ipcHandle, sjf);
+ return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
}
@Override
public void cancelJob(JobId jobId) throws Exception {
- HyracksClientInterfaceFunctions.CancelJobFunction cjf = new
HyracksClientInterfaceFunctions.CancelJobFunction(
- jobId);
- rpci.call(ipcHandle, cjf);
+ HyracksClientInterfaceFunctions.CancelJobFunction cjf =
+ new HyracksClientInterfaceFunctions.CancelJobFunction(jobId);
+ rpci.call(ipcHandle, cjf, DEFAULT_TIMEOUT);
}
@Override
public JobId startJob(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf =
new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
- return (JobId) rpci.call(ipcHandle, sjf);
+ return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
}
@Override
public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes,
EnumSet<JobFlag> jobFlags) throws Exception {
- HyracksClientInterfaceFunctions.StartJobFunction sjf = new
HyracksClientInterfaceFunctions.StartJobFunction(
- deploymentId, acggfBytes, jobFlags);
- return (JobId) rpci.call(ipcHandle, sjf);
+ HyracksClientInterfaceFunctions.StartJobFunction sjf =
+ new
HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes,
jobFlags);
+ return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
}
@Override
public JobId distributeJob(byte[] acggfBytes) throws Exception {
HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
new
HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
- return (JobId) rpci.call(ipcHandle, sjf);
+ return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
}
@Override
public JobId destroyJob(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
- return (JobId) rpci.call(ipcHandle, sjf);
+ return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
}
@Override
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction
gddsf =
new
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
- return (NetworkAddress) rpci.call(ipcHandle, gddsf);
+ return (NetworkAddress) rpci.call(ipcHandle, gddsf, DEFAULT_TIMEOUT);
}
@Override
- public void waitForCompletion(JobId jobId) throws Exception {
+ public void waitForCompletion(JobId jobId, long timeout) throws Exception {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
new
HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId);
- rpci.call(ipcHandle, wfcf);
+ rpci.call(ipcHandle, wfcf, timeout);
}
+ @SuppressWarnings("unchecked")
@Override
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws
Exception {
HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif =
new
HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
- return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
+ return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif,
DEFAULT_TIMEOUT);
}
@Override
public ClusterTopology getClusterTopology() throws Exception {
HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf =
new
HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
- return (ClusterTopology) rpci.call(ipcHandle, gctf);
+ return (ClusterTopology) rpci.call(ipcHandle, gctf, DEFAULT_TIMEOUT);
}
@Override
public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId)
throws Exception {
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
new
HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs,
deploymentId);
- rpci.call(ipcHandle, dbf);
+ rpci.call(ipcHandle, dbf, DEFAULT_TIMEOUT);
}
@Override
public void unDeployBinary(DeploymentId deploymentId) throws Exception {
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf =
new
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId);
- rpci.call(ipcHandle, dbf);
+ rpci.call(ipcHandle, dbf, DEFAULT_TIMEOUT);
}
@Override
public JobInfo getJobInfo(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId);
- return (JobInfo) rpci.call(ipcHandle, gjsf);
+ return (JobInfo) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT);
}
@Override
public void stopCluster(boolean terminateNCService) throws Exception {
HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf =
new
HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService);
- rpci.call(ipcHandle, csdf);
+ rpci.call(ipcHandle, csdf, DEFAULT_TIMEOUT);
int i = 0;
// give the CC some time to do final settling after it returns our
request
while (ipcHandle.isConnected() && i++ <
SHUTDOWN_CONNECTION_TIMEOUT_SECS) {
@@ -165,8 +167,8 @@
}
}
if (ipcHandle.isConnected()) {
- throw new IPCException("CC refused to release connection after " +
SHUTDOWN_CONNECTION_TIMEOUT_SECS
- + " seconds");
+ throw new IPCException(
+ "CC refused to release connection after " +
SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds");
}
}
@@ -174,13 +176,13 @@
public String getNodeDetailsJSON(String nodeId, boolean includeStats,
boolean includeConfig) throws Exception {
HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gjsf =
new
HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction(nodeId,
includeStats, includeConfig);
- return (String) rpci.call(ipcHandle, gjsf);
+ return (String) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT);
}
@Override
public String getThreadDump(String node) throws Exception {
HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
- return (String)rpci.call(ipcHandle, tdf);
+ return (String) rpci.call(ipcHandle, tdf, DEFAULT_TIMEOUT);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 75cbf61..7b5dfe8 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -124,6 +125,7 @@
return hci.startJob(jobId);
}
+ @Override
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
}
@@ -132,15 +134,16 @@
return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
}
+ @Override
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
return hci.getDatasetDirectoryServiceInfo();
}
@Override
- public void waitForCompletion(JobId jobId) throws Exception {
+ public void waitForCompletion(JobId jobId, long timeout) throws Exception {
try {
- hci.waitForCompletion(jobId);
- } catch (InterruptedException e) {
+ hci.waitForCompletion(jobId, timeout);
+ } catch (InterruptedException | TimeoutException e) {
// Cancels an on-going job if the current thread gets interrupted.
hci.cancelJob(jobId);
throw e;
@@ -152,7 +155,7 @@
try {
return hci.getNodeControllersInfo();
} catch (Exception e) {
- throw new HyracksException(e);
+ throw HyracksException.create(e);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..49371c4 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -141,8 +141,7 @@
* JobId of the Job
* @throws Exception
*/
- public void waitForCompletion(JobId jobId) throws Exception;
-
+ public void waitForCompletion(JobId jobId, long timeout) throws Exception;
/**
* Deploy the user-defined jars to the cluster
@@ -201,16 +200,19 @@
/**
* Shuts down all NCs and then the CC.
+ *
* @param terminateNCService
*/
public void stopCluster(boolean terminateNCService) throws Exception;
/**
* Get details of specified node as JSON object
+ *
* @param nodeId
- * id the subject node
+ * id the subject node
* @param includeStats
- * @param includeConfig @return serialized JSON containing the node details
+ * @param includeConfig
+ * @return serialized JSON containing the node details
* @throws Exception
*/
public String getNodeDetailsJSON(String nodeId, boolean includeStats,
boolean includeConfig) throws Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..7e3715a 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -48,7 +48,7 @@
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
- public void waitForCompletion(JobId jobId) throws Exception;
+ public void waitForCompletion(JobId jobId, long timeout) throws Exception;
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws
Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 4310cd0..7abb22f 100644
---
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -40,16 +40,16 @@
@Override
public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws
Exception {
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
new HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(
- jobId, rsId);
- return (Status) rpci.call(ipcHandle, gdrlf);
+ HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
+ new
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(jobId, rsId);
+ return (Status) rpci.call(ipcHandle, gdrlf, Long.MAX_VALUE);
}
@Override
public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId,
ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction
gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
- jobId, rsId, knownRecords);
- return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
+ HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction
gdrlf =
+ new
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(jobId, rsId,
knownRecords);
+ return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf,
Long.MAX_VALUE);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
index 54ae838..1b253d9 100644
---
a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
@@ -94,7 +94,7 @@
public static void runJob(JobSpecification spec, String appName) throws
Exception {
spec.setFrameSize(FRAME_SIZE);
JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index dbbaf9f..bde9b9b 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -65,6 +65,8 @@
import org.apache.hyracks.control.cc.work.JobCleanupWork;
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.work.DummyCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
public class JobExecutor {
private static final Logger LOGGER =
Logger.getLogger(JobExecutor.class.getName());
@@ -114,9 +116,10 @@
ccs.getContext().notifyJobStart(jobRun.getJobId());
}
- public void cancelJob() throws HyracksException {
+ public void cancelJob(IResultCallback<Void> callback) throws
HyracksException {
// If the job is already terminated or failed, do nothing here.
if (jobRun.getPendingStatus() != null) {
+ callback.setValue(null);
return;
}
// Sets the cancelled flag.
@@ -124,7 +127,8 @@
// Aborts on-ongoing task clusters.
abortOngoingTaskClusters(ta -> false, ta -> null);
// Aborts the whole job.
-
abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED,
jobRun.getJobId())));
+
abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED,
jobRun.getJobId())),
+ callback);
}
private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier,
Collection<ActivityCluster> roots)
@@ -196,8 +200,8 @@
"Runnable TC roots: " + taskClusterRoots + ",
inProgressTaskClusters: " + inProgressTaskClusters);
}
if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
- ccs.getWorkQueue()
- .schedule(new JobCleanupWork(ccs.getJobManager(),
jobRun.getJobId(), JobStatus.TERMINATED, null));
+ ccs.getWorkQueue().schedule(new
JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED,
+ null, DummyCallback.INSTANCE));
return;
}
startRunnableTaskClusters(taskClusterRoots);
@@ -520,14 +524,14 @@
}
}
- public void abortJob(List<Exception> exceptions) {
+ public void abortJob(List<Exception> exceptions, IResultCallback<Void>
callback) {
Set<TaskCluster> inProgressTaskClustersCopy = new
HashSet<>(inProgressTaskClusters);
for (TaskCluster tc : inProgressTaskClustersCopy) {
abortTaskCluster(findLastTaskClusterAttempt(tc),
TaskClusterAttempt.TaskClusterStatus.ABORTED);
}
assert inProgressTaskClusters.isEmpty();
- ccs.getWorkQueue()
- .schedule(new JobCleanupWork(ccs.getJobManager(),
jobRun.getJobId(), JobStatus.FAILURE, exceptions));
+ ccs.getWorkQueue().schedule(
+ new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(),
JobStatus.FAILURE, exceptions, callback));
}
private void abortTaskCluster(TaskClusterAttempt tcAttempt,
@@ -686,7 +690,7 @@
+ " as failed and the number of max re-attempts = " +
maxReattempts);
if (lastAttempt.getAttempt() >= maxReattempts ||
isCancelled()) {
LOGGER.log(Level.INFO, "Aborting the job of " +
ta.getTaskAttemptId());
- abortJob(exceptions);
+ abortJob(exceptions, DummyCallback.INSTANCE);
return;
}
LOGGER.log(Level.INFO, "We will try to start runnable activity
clusters of " + ta.getTaskAttemptId());
@@ -696,7 +700,7 @@
"Ignoring task failure notification: " + taId + " --
Current last attempt = " + lastAttempt);
}
} catch (Exception e) {
- abortJob(Collections.singletonList(e));
+ abortJob(Collections.singletonList(e), DummyCallback.INSTANCE);
}
}
@@ -720,7 +724,7 @@
ta -> HyracksException.create(ErrorCode.NODE_FAILED,
ta.getNodeId()));
startRunnableActivityClusters();
} catch (Exception e) {
- abortJob(Collections.singletonList(e));
+ abortJob(Collections.singletonList(e), DummyCallback.INSTANCE);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 8fe542f..5ae19a1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.work.IResultCallback;
/**
* This interface abstracts the job lifecycle management and job scheduling
for a cluster.
@@ -46,11 +47,13 @@
/**
* Cancel a job with a given job id.
+ *
+ * @param callback
*
* @param jobId,
* the id of the job.
*/
- void cancel(JobId jobId) throws HyracksException;
+ void cancel(JobId jobId, IResultCallback<Void> callback) throws
HyracksException;
/**
* This method is called when the master process decides to complete job.
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index abf1d57..117cbe1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -46,6 +46,8 @@
import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
import org.apache.hyracks.control.cc.scheduler.IJobQueue;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.DummyCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -115,17 +117,14 @@
}
@Override
- public void cancel(JobId jobId) throws HyracksException {
- if (jobId == null) {
- return;
- }
+ public void cancel(JobId jobId, IResultCallback<Void> callback) throws
HyracksException {
// Cancels a running job.
if (activeRunMap.containsKey(jobId)) {
JobRun jobRun = activeRunMap.get(jobId);
// The following call will abort all ongoing tasks and then
consequently
// trigger JobCleanupWork and JobCleanupNotificationWork which
will update the lifecyle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
- jobRun.getExecutor().cancelJob();
+ jobRun.getExecutor().cancelJob(callback);
return;
}
// Removes a pending job.
@@ -138,6 +137,7 @@
runMapArchive.put(jobId, jobRun);
runMapHistory.put(jobId, exceptions);
}
+ callback.setValue(null);
}
@Override
@@ -322,7 +322,7 @@
// fail the job then abort it
run.setStatus(JobStatus.FAILURE, exceptions);
// abort job will trigger JobCleanupWork
- run.getExecutor().abortJob(exceptions);
+ run.getExecutor().abortJob(exceptions, DummyCallback.INSTANCE);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
index f3b67c9..e3135df 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -42,10 +42,7 @@
@Override
protected void doRun() throws Exception {
try {
- if (jobId != null) {
- jobManager.cancel(jobId);
- }
- callback.setValue(null);
+ jobManager.cancel(jobId, callback);
} catch (Exception e) {
callback.setException(e);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 502ac50..bb85c13 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.common.work.IResultCallback;
public class JobCleanupWork extends AbstractWork {
private static final Logger LOGGER =
Logger.getLogger(JobCleanupWork.class.getName());
@@ -37,12 +38,15 @@
private JobId jobId;
private JobStatus status;
private List<Exception> exceptions;
+ private IResultCallback<Void> callback;
- public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus
status, List<Exception> exceptions) {
+ public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus
status, List<Exception> exceptions,
+ IResultCallback<Void> callback) {
this.jobManager = jobManager;
this.jobId = jobId;
this.status = status;
this.exceptions = exceptions;
+ this.callback = callback;
}
@Override
@@ -53,6 +57,7 @@
try {
JobRun jobRun = jobManager.get(jobId);
jobManager.prepareComplete(jobRun, status, exceptions);
+ callback.setValue(null);
} catch (HyracksException e) {
// Fail the job with the caught exception during final completion.
JobRun run = jobManager.get(jobId);
@@ -62,6 +67,7 @@
}
completionException.add(0, e);
run.setStatus(JobStatus.FAILURE, completionException);
+ callback.setException(e);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index ed2a740..f99d465 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -48,6 +48,7 @@
import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.logs.LogFile;
+import org.apache.hyracks.control.common.work.DummyCallback;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -207,7 +208,7 @@
}
@Test
- public void testCancel() throws HyracksException {
+ public void testCancel() throws Exception {
CCConfig ccConfig = new CCConfig();
IJobCapacityController jobCapacityController =
mock(IJobCapacityController.class);
IJobManager jobManager = spy(new JobManager(ccConfig,
mockClusterControllerService(), jobCapacityController));
@@ -247,12 +248,12 @@
// Cancels deferred jobs.
for (JobRun run : deferredRuns) {
- jobManager.cancel(run.getJobId());
+ jobManager.cancel(run.getJobId(), DummyCallback.INSTANCE);
}
// Cancels runnable jobs.
for (JobRun run : acceptedRuns) {
- jobManager.cancel(run.getJobId());
+ jobManager.cancel(run.getJobId(), DummyCallback.INSTANCE);
}
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java
new file mode 100644
index 0000000..24946f0
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.work;
+
+public class DummyCallback implements IResultCallback<Void> {
+
+ public static final DummyCallback INSTANCE = new DummyCallback();
+
+ private DummyCallback() {
+ }
+
+ @Override
+ public void setValue(Void result) {
+ // Dummy is used when no callback is provided
+ }
+
+ @Override
+ public void setException(Exception e) {
+ // Dummy is used when no callback is provided
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
index 0ccaf1d..b8eefe7 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.common.work;
public interface IResultCallback<T> {
+
public void setValue(T result);
public void setException(Exception e);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
index 48377e3..4cf1d12 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -89,7 +89,7 @@
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
@@ -144,7 +144,8 @@
// B-Tree tuple, etc.
IFileSplitProvider primarySplitProvider =
JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
- IIndexDataflowHelperFactory primaryHelperFactory = new
IndexDataflowHelperFactory(storageManager, primarySplitProvider);
+ IIndexDataflowHelperFactory primaryHelperFactory =
+ new IndexDataflowHelperFactory(storageManager,
primarySplitProvider);
// create operator descriptor
TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert =
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 203d22c..dc13c32 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -84,7 +84,7 @@
JobSpecification job = createJob(options);
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
@@ -145,7 +145,8 @@
// to field 0 of B-Tree tuple,
// etc.
IFileSplitProvider btreeSplitProvider =
JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
- IIndexDataflowHelperFactory dataflowHelperFactory = new
IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
+ IIndexDataflowHelperFactory dataflowHelperFactory =
+ new IndexDataflowHelperFactory(storageManager,
btreeSplitProvider);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new
TreeIndexBulkLoadOperatorDescriptor(spec, recDesc,
fieldPermutation, 0.7f, false, 1000L, true,
dataflowHelperFactory);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index 603dc6b..bf3b41e 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -80,7 +80,7 @@
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
@@ -139,7 +139,8 @@
// into search op
IFileSplitProvider btreeSplitProvider =
JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
- IIndexDataflowHelperFactory dataflowHelperFactory = new
IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
+ IIndexDataflowHelperFactory dataflowHelperFactory =
+ new IndexDataflowHelperFactory(storageManager,
btreeSplitProvider);
BTreeSearchOperatorDescriptor btreeSearchOp = new
BTreeSearchOperatorDescriptor(spec, recDesc, lowKeyFields,
highKeyFields, true, true, dataflowHelperFactory, false,
false, null,
NoOpOperationCallbackFactory.INSTANCE, null, null, false);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 7507f10..52a0ace 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -83,7 +83,7 @@
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
@@ -117,7 +117,8 @@
// use a disk-order scan to read primary index
IFileSplitProvider primarySplitProvider =
JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
- IIndexDataflowHelperFactory primaryHelperFactory = new
IndexDataflowHelperFactory(storageManager, primarySplitProvider);
+ IIndexDataflowHelperFactory primaryHelperFactory =
+ new IndexDataflowHelperFactory(storageManager,
primarySplitProvider);
TreeIndexDiskOrderScanOperatorDescriptor btreeScanOp = new
TreeIndexDiskOrderScanOperatorDescriptor(spec,
recDesc, primaryHelperFactory,
NoOpOperationCallbackFactory.INSTANCE);
JobHelper.createPartitionConstraint(spec, btreeScanOp, splitNCs);
@@ -139,7 +140,8 @@
// tuple
int[] fieldPermutation = { 1, 0 };
IFileSplitProvider btreeSplitProvider =
JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
- IIndexDataflowHelperFactory secondaryHelperFactory = new
IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
+ IIndexDataflowHelperFactory secondaryHelperFactory =
+ new IndexDataflowHelperFactory(storageManager,
btreeSplitProvider);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new
TreeIndexBulkLoadOperatorDescriptor(spec, null,
fieldPermutation, 0.7f, false, 1000L, true,
secondaryHelperFactory);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 1e909ef..5d72703 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -83,7 +83,7 @@
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
@@ -183,7 +183,8 @@
// op
IFileSplitProvider primarySplitProvider =
JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
- IIndexDataflowHelperFactory primaryHelperFactory = new
IndexDataflowHelperFactory(storageManager, primarySplitProvider);
+ IIndexDataflowHelperFactory primaryHelperFactory =
+ new IndexDataflowHelperFactory(storageManager,
primarySplitProvider);
BTreeSearchOperatorDescriptor primarySearchOp = new
BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
primaryLowKeyFields, primaryHighKeyFields, true, true,
primaryHelperFactory, false, false, null,
NoOpOperationCallbackFactory.INSTANCE, null, null, false);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index 82fd737..48ca36f 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -106,7 +106,7 @@
ncConfig1.setClusterListenAddress("127.0.0.1");
ncConfig1.setDataListenAddress("127.0.0.1");
ncConfig1.setResultListenAddress("127.0.0.1");
- ncConfig1.setIODevices(new String [] {
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
+ ncConfig1.setIODevices(new String[] {
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -116,7 +116,7 @@
ncConfig2.setClusterListenAddress("127.0.0.1");
ncConfig2.setDataListenAddress("127.0.0.1");
ncConfig2.setResultListenAddress("127.0.0.1");
- ncConfig2.setIODevices(new String [] {
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
+ ncConfig2.setIODevices(new String[] {
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -146,7 +146,7 @@
protected void runTest(JobSpecification spec) throws Exception {
JobId jobId = executeTest(spec);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
protected List<String> readResults(JobSpecification spec, JobId jobId,
ResultSetId resultSetId) throws Exception {
@@ -209,7 +209,7 @@
expectedFile.close();
}
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
return true;
}
@@ -226,7 +226,7 @@
}
output.close();
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
protected FileSplit createFile(NodeControllerService ncs) throws
IOException {
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 18479e2..1c0220d 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -127,7 +127,7 @@
}
protected void waitForCompletion(JobId jobId) throws Exception {
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
}
protected JobStatus getJobStatus(JobId jobId) throws Exception {
@@ -188,7 +188,7 @@
}
boolean expectedExceptionThrown = false;
try {
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
} catch (HyracksDataException hde) {
if (expectedErrorMessage != null) {
if (hde.toString().contains(expectedErrorMessage)) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
index f2f8061..c4636fb 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
@@ -104,7 +104,7 @@
WaitingOperatorDescriptor.CONTINUE_RUNNING.setTrue();
WaitingOperatorDescriptor.CONTINUE_RUNNING.notify();
}
- hcc.waitForCompletion(jId);
+ hcc.waitForCompletion(jId, Long.MAX_VALUE);
}
private int countJobs(String status) throws IOException,
URISyntaxException {
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 4a01fdb..0c27b3d 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -79,7 +79,7 @@
ncConfig1.setDataListenAddress("127.0.0.1");
ncConfig1.setResultListenAddress("127.0.0.1");
ncConfig1.setResultSweepThreshold(5000);
- ncConfig1.setIODevices(new String [] {
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
+ ncConfig1.setIODevices(new String[] {
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
nc1 = Mockito.spy(nc1Base);
nc1.start();
@@ -91,7 +91,7 @@
ncConfig2.setDataListenAddress("127.0.0.1");
ncConfig2.setResultListenAddress("127.0.0.1");
ncConfig2.setResultSweepThreshold(5000);
- ncConfig2.setIODevices(new String [] {
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
+ ncConfig2.setIODevices(new String[] {
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
nc2 = Mockito.spy(nc2Base);
nc2.start();
@@ -127,7 +127,7 @@
//run the first job
hcc.startJob(jobId1);
- hcc.waitForCompletion(jobId1);
+ hcc.waitForCompletion(jobId1, Long.MAX_VALUE);
//destroy the first job
hcc.destroyJob(jobId1);
@@ -143,7 +143,7 @@
//run the second job
hcc.startJob(jobId2);
- hcc.waitForCompletion(jobId2);
+ hcc.waitForCompletion(jobId2, Long.MAX_VALUE);
//wait ten seconds to ensure the result sweeper does not break the job
//The result sweeper runs every 5 seconds during the tests
@@ -151,7 +151,7 @@
//run the second job again
hcc.startJob(jobId2);
- hcc.waitForCompletion(jobId2);
+ hcc.waitForCompletion(jobId2, Long.MAX_VALUE);
//destroy the second job
hcc.destroyJob(jobId2);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index 650c60d..d31be22 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -113,7 +113,7 @@
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
options.runtimeProfiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME)
: EnumSet.noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
@@ -139,11 +139,11 @@
JobSpecification spec = new JobSpecification(frameSize);
IFileSplitProvider splitsProvider = new
ConstantFileSplitProvider(inSplits);
- RecordDescriptor wordDesc = new RecordDescriptor(
- new ISerializerDeserializer[] { new
UTF8StringSerializerDeserializer() });
+ RecordDescriptor wordDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] { new
UTF8StringSerializerDeserializer() });
- FileScanOperatorDescriptor wordScanner = new
FileScanOperatorDescriptor(spec, splitsProvider,
- new WordTupleParserFactory(), wordDesc);
+ FileScanOperatorDescriptor wordScanner =
+ new FileScanOperatorDescriptor(spec, splitsProvider, new
WordTupleParserFactory(), wordDesc);
createPartitionConstraint(spec, wordScanner, inSplits);
RecordDescriptor groupResultDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
@@ -170,8 +170,8 @@
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
} else {
- IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] {
-
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+ IBinaryComparatorFactory[] cfs =
+ new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo)
? new InMemorySortOperatorDescriptor(spec, keys, new
UTF8StringNormalizedKeyComputerFactory(), cfs,
wordDesc)
@@ -195,9 +195,9 @@
}
IFileSplitProvider outSplitProvider = new
ConstantFileSplitProvider(outSplits);
- IOperatorDescriptor writer = "text".equalsIgnoreCase(format)
- ? new PlainFileWriterOperatorDescriptor(spec,
outSplitProvider, ",")
- : new FrameFileWriterOperatorDescriptor(spec,
outSplitProvider);
+ IOperatorDescriptor writer =
+ "text".equalsIgnoreCase(format) ? new
PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
+ : new FrameFileWriterOperatorDescriptor(spec,
outSplitProvider);
createPartitionConstraint(spec, writer, outSplits);
IConnectorDescriptor gbyPrinterConn = new
OneToOneConnectorDescriptor(spec);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
index 42fe8c9..8a86692 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -118,7 +118,7 @@
System.out.print("CreateJobTime:" + (System.currentTimeMillis() -
start));
start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
System.out.println("JobExecuteTime:" + (System.currentTimeMillis()
- start));
}
}
@@ -134,8 +134,8 @@
createPartitionConstraint(spec, fileScanner, inSplits);
// Output: each unique string with an integer count
- RecordDescriptor outDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE,
+ RecordDescriptor outDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE,
// IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE });
@@ -187,9 +187,9 @@
spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
IFileSplitProvider outSplitProvider = new
ConstantFileSplitProvider(outSplits);
- AbstractSingleActivityOperatorDescriptor writer = outPlain ? new
PlainFileWriterOperatorDescriptor(spec,
- outSplitProvider, "|")
- : new FrameFileWriterOperatorDescriptor(spec,
outSplitProvider);
+ AbstractSingleActivityOperatorDescriptor writer =
+ outPlain ? new PlainFileWriterOperatorDescriptor(spec,
outSplitProvider, "|")
+ : new FrameFileWriterOperatorDescriptor(spec,
outSplitProvider);
createPartitionConstraint(spec, writer, outSplits);
IConnectorDescriptor groupOutConn = new
OneToOneConnectorDescriptor(spec);
spec.connect(groupOutConn, grouper, 0, writer, 0);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index c3d0df1..2a9f351 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -140,7 +140,7 @@
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) :
EnumSet.noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index 8ab0708..347ade5 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -89,13 +89,13 @@
}
static int[] SortFields = new int[] { 1, 0 };
- static IBinaryComparatorFactory[] SortFieldsComparatorFactories = new
IBinaryComparatorFactory[] {
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+ static IBinaryComparatorFactory[] SortFieldsComparatorFactories =
+ new IBinaryComparatorFactory[] {
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
- static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories = new
IBinaryHashFunctionFactory[] {
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY)
};
+ static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories =
+ new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) };
public static void main(String[] args) throws Exception {
Options options = new Options();
@@ -109,13 +109,13 @@
IHyracksClientConnection hcc = new HyracksConnection(options.host,
options.port);
JobSpecification job =
createJob(parseFileSplits(options.inFileOrderSplits),
- parseFileSplits(options.outFileSplits),
- options.memBufferAlg, options.frameLimit, options.frameSize,
options.topK, options.usingHeapSorter);
+ parseFileSplits(options.outFileSplits), options.memBufferAlg,
options.frameLimit, options.frameSize,
+ options.topK, options.usingHeapSorter);
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(job,
options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) :
EnumSet.noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, Long.MAX_VALUE);
long end = System.currentTimeMillis();
System.err.println("finished in:" + (end - start) + "ms");
}
@@ -135,8 +135,8 @@
SortFieldsComparatorFactories, ordersDesc);
} else {
if (memBufferAlg.equalsIgnoreCase("bestfit")) {
- sorter = new ExternalSortOperatorDescriptor(spec, frameLimit,
SortFields,
- null, SortFieldsComparatorFactories, ordersDesc,
Algorithm.MERGE_SORT,
+ sorter = new ExternalSortOperatorDescriptor(spec, frameLimit,
SortFields, null,
+ SortFieldsComparatorFactories, ordersDesc,
Algorithm.MERGE_SORT,
EnumFreeSlotPolicy.SMALLEST_FIT, limit);
} else if (memBufferAlg.equalsIgnoreCase("biggestfit")) {
sorter = new ExternalSortOperatorDescriptor(spec, frameLimit,
SortFields, null,
@@ -158,8 +158,8 @@
spec.connect(
new MToNPartitioningMergingConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(SortFields,
orderBinaryHashFunctionFactories),
- SortFields, SortFieldsComparatorFactories, new
UTF8StringNormalizedKeyComputerFactory()),
+ new FieldHashPartitionComputerFactory(SortFields,
orderBinaryHashFunctionFactories), SortFields,
+ SortFieldsComparatorFactories, new
UTF8StringNormalizedKeyComputerFactory()),
sorter, 0, printer, 0);
spec.addRoot(printer);
diff --git
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
index 0d0cd3e..b0aad4f 100644
---
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
@@ -73,7 +73,6 @@
private static final String PATH_TO_HADOOP_CONF =
FileUtil.joinPath(TEST_RESOURCES, "hadoop", "conf");
protected static final String BUILD_DIR = FileUtil.joinPath("target",
"build");
-
private static final String DATA_PATH = FileUtil.joinPath(TEST_RESOURCES,
"data", "customer.tbl");
protected static final String HDFS_INPUT_PATH = "/customer/";
protected static final String HDFS_OUTPUT_PATH = "/customer_result/";
@@ -151,11 +150,11 @@
String[] readSchedule = scheduler.getLocationConstraints(splits);
JobSpecification jobSpec = new JobSpecification();
- RecordDescriptor recordDesc = new RecordDescriptor(
- new ISerializerDeserializer[] { new
UTF8StringSerializerDeserializer() });
+ RecordDescriptor recordDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] { new
UTF8StringSerializerDeserializer() });
- String[] locations = new String[] { HyracksUtils.NC1_ID,
HyracksUtils.NC1_ID, HyracksUtils.NC2_ID,
- HyracksUtils.NC2_ID };
+ String[] locations =
+ new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID,
HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
HDFSReadOperatorDescriptor readOperator = new
HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
readSchedule, new TextKeyValueParserFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
readOperator, locations);
@@ -164,21 +163,23 @@
new IBinaryComparatorFactory[] {
RawBinaryComparatorFactory.INSTANCE }, recordDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
sortOperator, locations);
- HDFSWriteOperatorDescriptor writeOperator = new
HDFSWriteOperatorDescriptor(jobSpec, conf,
- new TextTupleWriterFactory());
+ HDFSWriteOperatorDescriptor writeOperator =
+ new HDFSWriteOperatorDescriptor(jobSpec, conf, new
TextTupleWriterFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
writeOperator, HyracksUtils.NC1_ID);
jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec),
readOperator, 0, sortOperator, 0);
- jobSpec.connect(new
MToNPartitioningMergingConnectorDescriptor(jobSpec, new
FieldHashPartitionComputerFactory(
- new int[] { 0 }, new IBinaryHashFunctionFactory[] {
RawBinaryHashFunctionFactory.INSTANCE }),
- new int[] { 0 }, new IBinaryComparatorFactory[] {
RawBinaryComparatorFactory.INSTANCE }, null),
+ jobSpec.connect(
+ new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] {
RawBinaryHashFunctionFactory.INSTANCE }),
+ new int[] { 0 }, new IBinaryComparatorFactory[] {
RawBinaryComparatorFactory.INSTANCE }, null),
sortOperator, 0, writeOperator, 0);
jobSpec.addRoot(writeOperator);
- IHyracksClientConnection client = new
HyracksConnection(HyracksUtils.CC_HOST,
- HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+ IHyracksClientConnection client =
+ new HyracksConnection(HyracksUtils.CC_HOST,
HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
JobId jobId = client.startJob(jobSpec);
- client.waitForCompletion(jobId);
+ client.waitForCompletion(jobId, Long.MAX_VALUE);
Assert.assertEquals(true, checkResults());
}
@@ -195,8 +196,8 @@
Path actual = new Path(ACTUAL_RESULT_DIR);
dfs.copyToLocalFile(result, actual);
- TestUtils.compareWithResult(new
File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")), new File(
- FileUtil.joinPath(ACTUAL_RESULT_DIR, "customer_result",
"part-0")));
+ TestUtils.compareWithResult(new
File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")),
+ new File(FileUtil.joinPath(ACTUAL_RESULT_DIR,
"customer_result", "part-0")));
return true;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
index 1fddc46..287baeb 100644
---
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
@@ -90,10 +90,10 @@
cc.stop();
}
- public static void runJob(JobSpecification spec, String appName) throws
Exception {
+ public static void runJob(JobSpecification spec, String appName, long
timeout) throws Exception {
spec.setFrameSize(FRAME_SIZE);
JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
- hcc.waitForCompletion(jobId);
+ hcc.waitForCompletion(jobId, timeout);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
index 3c9b1c0..84cdb65 100644
---
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -21,7 +21,6 @@
import java.util.List;
-import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -55,6 +54,8 @@
import org.apache.hyracks.hdfs.utils.HyracksUtils;
import org.apache.hyracks.hdfs2.scheduler.Scheduler;
+import junit.framework.Assert;
+
/**
* Test the org.apache.hyracks.hdfs2.dataflow package,
* the operators for the Hadoop new API.
@@ -86,6 +87,7 @@
*
* @throws Exception
*/
+ @Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testHDFSReadWriteOperators() throws Exception {
FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
@@ -98,11 +100,11 @@
String[] readSchedule = scheduler.getLocationConstraints(splits);
JobSpecification jobSpec = new JobSpecification();
- RecordDescriptor recordDesc = new RecordDescriptor(
- new ISerializerDeserializer[] { new
UTF8StringSerializerDeserializer() });
+ RecordDescriptor recordDesc =
+ new RecordDescriptor(new ISerializerDeserializer[] { new
UTF8StringSerializerDeserializer() });
- String[] locations = new String[] { HyracksUtils.NC1_ID,
HyracksUtils.NC1_ID, HyracksUtils.NC2_ID,
- HyracksUtils.NC2_ID };
+ String[] locations =
+ new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID,
HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
HDFSReadOperatorDescriptor readOperator = new
HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
readSchedule, new TextKeyValueParserFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
readOperator, locations);
@@ -111,21 +113,23 @@
new IBinaryComparatorFactory[] {
RawBinaryComparatorFactory.INSTANCE }, recordDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
sortOperator, locations);
- HDFSWriteOperatorDescriptor writeOperator = new
HDFSWriteOperatorDescriptor(jobSpec, conf,
- new TextTupleWriterFactory());
+ HDFSWriteOperatorDescriptor writeOperator =
+ new HDFSWriteOperatorDescriptor(jobSpec, conf, new
TextTupleWriterFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
writeOperator, HyracksUtils.NC1_ID);
jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec),
readOperator, 0, sortOperator, 0);
- jobSpec.connect(new
MToNPartitioningMergingConnectorDescriptor(jobSpec, new
FieldHashPartitionComputerFactory(
- new int[] { 0 }, new IBinaryHashFunctionFactory[] {
RawBinaryHashFunctionFactory.INSTANCE }),
- new int[] { 0 }, new IBinaryComparatorFactory[] {
RawBinaryComparatorFactory.INSTANCE }, null),
+ jobSpec.connect(
+ new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] {
RawBinaryHashFunctionFactory.INSTANCE }),
+ new int[] { 0 }, new IBinaryComparatorFactory[] {
RawBinaryComparatorFactory.INSTANCE }, null),
sortOperator, 0, writeOperator, 0);
jobSpec.addRoot(writeOperator);
- IHyracksClientConnection client = new
HyracksConnection(HyracksUtils.CC_HOST,
- HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+ IHyracksClientConnection client =
+ new HyracksConnection(HyracksUtils.CC_HOST,
HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
JobId jobId = client.startJob(jobSpec);
- client.waitForCompletion(jobId);
+ client.waitForCompletion(jobId, Long.MAX_VALUE);
Assert.assertEquals(true, checkResults());
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 45634ad..dfd5bad 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -260,7 +260,7 @@
return b && (path.length() == cpl || '/' == path.charAt(cpl));
}
- protected HttpServerHandler<HttpServer> createHttpHandler(int chunkSize) {
+ protected HttpServerHandler createHttpHandler(int chunkSize) {
return new HttpServerHandler<>(this, chunkSize);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
index aebe2f5..3c0f699 100644
---
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -27,10 +28,10 @@
private final Map<Long, Request> reqMap;
public RPCInterface() {
- reqMap = new HashMap<Long, RPCInterface.Request>();
+ reqMap = new HashMap<>();
}
- public Object call(IIPCHandle handle, Object request) throws Exception {
+ public Object call(IIPCHandle handle, Object request, long timeout) throws
Exception {
Request req;
long mid;
synchronized (this) {
@@ -41,7 +42,7 @@
mid = handle.send(-1, request, null);
reqMap.put(mid, req);
}
- return req.getResponse();
+ return req.getResponse(timeout);
}
@Override
@@ -88,10 +89,15 @@
notifyAll();
}
- synchronized Object getResponse() throws Exception {
- while (pending) {
+ synchronized Object getResponse(long timeout) throws Exception {
+ long start = System.currentTimeMillis();
+ long now = start;
+ while (pending && now - start < timeout) {
wait();
}
+ if (pending) {
+ throw new TimeoutException();
+ }
if (exception != null) {
throw exception;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
index b454520..1125c64 100644
---
a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
@@ -23,16 +23,15 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import junit.framework.Assert;
-
-import org.junit.Test;
-
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
import org.apache.hyracks.ipc.api.RPCInterface;
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
import
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.junit.Test;
+
+import junit.framework.Assert;
public class IPCTest {
@Test
@@ -48,11 +47,11 @@
IIPCHandle handle = client.getHandle(serverAddr);
for (int i = 0; i < 100; ++i) {
- Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)),
Integer.valueOf(2 * i));
+ Assert.assertEquals(rpci.call(handle, Integer.valueOf(i),
Long.MAX_VALUE), Integer.valueOf(2 * i));
}
try {
- rpci.call(handle, "Foo");
+ rpci.call(handle, "Foo", Long.MAX_VALUE);
Assert.assertTrue(false);
} catch (Exception e) {
Assert.assertTrue(true);
@@ -63,8 +62,8 @@
final Executor executor = Executors.newCachedThreadPool();
IIPCI ipci = new IIPCI() {
@Override
- public void deliverIncomingMessage(final IIPCHandle handle, final
long mid, long rmid,
- final Object payload, Exception exception) {
+ public void deliverIncomingMessage(final IIPCHandle handle, final
long mid, long rmid, final Object payload,
+ Exception exception) {
executor.execute(new Runnable() {
@Override
public void run() {
--
To view, visit https://asterix-gerrit.ics.uci.edu/1961
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>