Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2292
Change subject: [ASTERIXDB-2249][API] Add Result Max Reads to API
......................................................................
[ASTERIXDB-2249][API] Add Result Max Reads to API
- user model changes: no
- storage format changes: no
- interface changes: yes
- IRequestParameters: add ResultProperties
- IDatasetPartitionManager: add maxReads
Details:
- Add option to specify max result reads and default
it to 1.
- Fix exception handling in DatasetPartitionReader.
- Add option to specify maxResultReads in tests.
- Use new option in async-repeated test.
Change-Id: I86f75c791f034142c5b046445870bd91378c5b3a
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
A
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
29 files changed, 194 insertions(+), 80 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/92/2292/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 8d0f20b..6d8ba92 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -33,7 +33,7 @@
/**
* @return The {@code ResultDelivery} kind required for queries in the
list of statements
*/
- IStatementExecutor.ResultDelivery getResultDelivery();
+ ResultProperties getResultProperties();
/**
* @return a reference to write the stats of executed queries
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
new file mode 100644
index 0000000..4866c6d
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.io.Serializable;
+
+public class ResultProperties implements Serializable {
+
+ public static final long DEFAULT_MAX_READS = 1;
+ private final IStatementExecutor.ResultDelivery delivery;
+ private final long maxReads;
+
+ public ResultProperties(IStatementExecutor.ResultDelivery delivery) {
+ this(delivery, DEFAULT_MAX_READS);
+ }
+
+ public ResultProperties(IStatementExecutor.ResultDelivery delivery, long
maxReads) {
+ this.delivery = delivery;
+ this.maxReads = maxReads;
+ }
+
+ public IStatementExecutor.ResultDelivery getDelivery() {
+ return delivery;
+ }
+
+ public long getMaxReads() {
+ return maxReads;
+ }
+
+ public ResultProperties getNcToCcResultProperties() {
+ if (delivery != IStatementExecutor.ResultDelivery.IMMEDIATE) {
+ return this;
+ }
+ // switch IMMEDIATE to DEFERRED since the result will be severed by
the NC
+ return new
ResultProperties(IStatementExecutor.ResultDelivery.DEFERRED, maxReads);
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 87c1c57..df2a2a1 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -48,6 +48,7 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -164,7 +165,7 @@
double duration;
long startTime = System.currentTimeMillis();
final IRequestParameters requestParameters =
- new RequestParameters(hds,
IStatementExecutor.ResultDelivery.IMMEDIATE,
+ new RequestParameters(hds, new
ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
new IStatementExecutor.Stats(), null, null, null);
translator.compileAndExecute(hcc, null, requestParameters);
long endTime = System.currentTimeMillis();
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 83a40f0..a137bdc 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -26,6 +26,8 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
+import javax.xml.transform.Result;
+
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.app.message.CancelQueryRequest;
import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
@@ -41,6 +43,7 @@
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionOutput;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -66,14 +69,12 @@
@Override
protected void executeStatement(String statementsText, SessionOutput
sessionOutput,
- IStatementExecutor.ResultDelivery delivery,
IStatementExecutor.Stats stats, RequestParameters param,
+ ResultProperties resultProperties, IStatementExecutor.Stats stats,
RequestParameters param,
RequestExecutionState execution, Map<String, String>
optionalParameters) throws Exception {
// Running on NC -> send 'execute' message to CC
INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
- IStatementExecutor.ResultDelivery ccDelivery =
- delivery == IStatementExecutor.ResultDelivery.IMMEDIATE ?
IStatementExecutor.ResultDelivery.DEFERRED
- : delivery;
+ final IStatementExecutor.ResultDelivery delivery =
resultProperties.getDelivery();
ExecuteStatementResponseMessage responseMsg;
MessageFuture responseFuture = ncMb.registerMessageFuture();
final String handleUrl = getHandleUrl(param.host, param.path,
delivery);
@@ -86,8 +87,8 @@
timeout =
TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout));
}
ExecuteStatementRequestMessage requestMsg = new
ExecuteStatementRequestMessage(ncCtx.getNodeId(),
- responseFuture.getFutureId(), queryLanguage,
statementsText, sessionOutput.config(), ccDelivery,
- param.clientContextID, handleUrl, optionalParameters);
+ responseFuture.getFutureId(), queryLanguage,
statementsText, sessionOutput.config(),
+ resultProperties.getNcToCcResultProperties(),
param.clientContextID, handleUrl, optionalParameters);
execution.start();
ncMb.sendMessageToCC(requestMsg);
try {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index a8f14b5..3a68e83 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
@@ -48,6 +47,7 @@
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -137,7 +137,8 @@
PRETTY("pretty"),
MODE("mode"),
TIMEOUT("timeout"),
- PLAN_FORMAT("plan-format");
+ PLAN_FORMAT("plan-format"),
+ MAX_RESULT_READS("max_result_reads");
private final String str;
@@ -193,6 +194,7 @@
boolean pretty;
String clientContextID;
String mode;
+ String maxResultReads;
@Override
public String toString() {
@@ -207,6 +209,7 @@
on.put("clientContextID", clientContextID);
on.put("format", format);
on.put("timeout", timeout);
+ on.put("maxResultReads", maxResultReads);
return om.writer(new
MinimalPrettyPrinter()).writeValueAsString(on);
} catch (JsonProcessingException e) { // NOSONAR
return e.getMessage();
@@ -383,6 +386,7 @@
param.mode = toLower(getOptText(jsonRequest,
Parameter.MODE.str()));
param.clientContextID = getOptText(jsonRequest,
Parameter.CLIENT_ID.str());
param.timeout = getOptText(jsonRequest,
Parameter.TIMEOUT.str());
+ param.maxResultReads = getOptText(jsonRequest,
Parameter.MAX_RESULT_READS.str());
} catch (JsonParseException | JsonMappingException e) {
// if the JSON parsing fails, the statement is empty and we
get an empty statement error
GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(),
e);
@@ -397,6 +401,7 @@
param.mode = toLower(request.getParameter(Parameter.MODE.str()));
param.clientContextID =
request.getParameter(Parameter.CLIENT_ID.str());
param.timeout = request.getParameter(Parameter.TIMEOUT.str());
+ param.maxResultReads =
request.getParameter(Parameter.MAX_RESULT_READS.str());
}
return param;
}
@@ -448,6 +453,10 @@
ResultDelivery delivery = parseResultDelivery(param.mode);
+ final ResultProperties resultProperties = param.maxResultReads == null
?
+ new ResultProperties(delivery) :
+ new ResultProperties(delivery,
Long.parseLong(param.maxResultReads));
+
String handleUrl = getHandleUrl(param.host, param.path, delivery);
SessionOutput sessionOutput = createSessionOutput(param, handleUrl,
httpWriter);
SessionConfig sessionConfig = sessionOutput.config();
@@ -478,7 +487,7 @@
"http://" + hostName + ":" +
appCtx.getExternalProperties().getQueryWebInterfacePort());
response.setHeader("Access-Control-Allow-Headers", "Origin,
X-Requested-With, Content-Type, Accept");
response.setStatus(execution.getHttpStatus());
- executeStatement(statementsText, sessionOutput, delivery, stats,
param, execution, optionalParams);
+ executeStatement(statementsText, sessionOutput, resultProperties,
stats, param, execution, optionalParams);
if (ResultDelivery.IMMEDIATE == delivery ||
ResultDelivery.DEFERRED == delivery) {
ResultUtil.printStatus(sessionOutput,
execution.getResultStatus());
}
@@ -502,9 +511,9 @@
}
}
- protected void executeStatement(String statementsText, SessionOutput
sessionOutput, ResultDelivery delivery,
- IStatementExecutor.Stats stats, RequestParameters param,
RequestExecutionState execution,
- Map<String, String> optionalParameters) throws Exception {
+ protected void executeStatement(String statementsText, SessionOutput
sessionOutput,
+ ResultProperties resultProperties, IStatementExecutor.Stats stats,
RequestParameters param,
+ RequestExecutionState execution, Map<String, String>
optionalParameters) throws Exception {
IClusterManagementWork.ClusterState clusterState =
((ICcApplicationContext)
appCtx).getClusterStateManager().getState();
if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -518,8 +527,8 @@
sessionOutput, compilationProvider, componentProvider);
execution.start();
final IRequestParameters requestParameters =
- new
org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(),
delivery, stats, null,
- param.clientContextID, optionalParameters);
+ new
org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(),
resultProperties, stats,
+ null, param.clientContextID, optionalParameters);
translator.compileAndExecute(getHyracksClientConnection(), queryCtx,
requestParameters);
execution.end();
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 3359b9f..360c522 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -42,6 +42,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -209,7 +210,8 @@
IStatementExecutor translator =
statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
compilationProvider, componentProvider);
final IRequestParameters requestParameters =
- new RequestParameters(hds, resultDelivery, new
IStatementExecutor.Stats(), null, null, null);
+ new RequestParameters(hds, new
ResultProperties(resultDelivery), new IStatementExecutor.Stats(),
+ null, null, null);
translator.compileAndExecute(hcc, null, requestParameters);
} catch (AsterixException | TokenMgrError |
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index 58a7f09..4ecd978 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -35,6 +35,7 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -120,8 +121,8 @@
IStatementExecutor translator =
statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
storageComponentProvider);
final IRequestParameters requestParameters =
- new RequestParameters(null,
IStatementExecutor.ResultDelivery.IMMEDIATE, new IStatementExecutor.Stats(),
- null, null, null);
+ new RequestParameters(null, new
ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
+ new IStatementExecutor.Stats(), null, null, null);
translator.compileAndExecute(hcc, null, requestParameters);
writer.flush();
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 0b8c34c..5b0eb97 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -46,6 +46,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -69,20 +70,20 @@
private final ILangExtension.Language lang;
private final String statementsText;
private final SessionConfig sessionConfig;
- private final IStatementExecutor.ResultDelivery delivery;
+ private final ResultProperties resultProperties;
private final String clientContextID;
private final String handleUrl;
private final Map<String, String> optionalParameters;
public ExecuteStatementRequestMessage(String requestNodeId, long
requestMessageId, ILangExtension.Language lang,
- String statementsText, SessionConfig sessionConfig,
IStatementExecutor.ResultDelivery delivery,
+ String statementsText, SessionConfig sessionConfig,
ResultProperties resultProperties,
String clientContextID, String handleUrl, Map<String, String>
optionalParameters) {
this.requestNodeId = requestNodeId;
this.requestMessageId = requestMessageId;
this.lang = lang;
this.statementsText = statementsText;
this.sessionConfig = sessionConfig;
- this.delivery = delivery;
+ this.resultProperties = resultProperties;
this.clientContextID = clientContextID;
this.handleUrl = handleUrl;
this.optionalParameters = optionalParameters;
@@ -122,7 +123,8 @@
compilationProvider, storageComponentProvider);
final IStatementExecutor.Stats stats = new
IStatementExecutor.Stats();
final IRequestParameters requestParameters =
- new RequestParameters(null, delivery, stats, outMetadata,
clientContextID, optionalParameters);
+ new RequestParameters(null, resultProperties, stats,
outMetadata, clientContextID,
+ optionalParameters);
translator.compileAndExecute(ccApp.getHcc(),
statementExecutorContext, requestParameters);
outPrinter.close();
responseMsg.setResult(outWriter.toString());
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 9b96883..4e9cb47 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -283,7 +283,8 @@
Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
Map<String, String> config = new HashMap<>();
final IHyracksDataset hdc = requestParameters.getHyracksDataset();
- final ResultDelivery resultDelivery =
requestParameters.getResultDelivery();
+ final ResultDelivery resultDelivery =
requestParameters.getResultProperties().getDelivery();
+ final long maxResultReads =
requestParameters.getResultProperties().getMaxReads();
final Stats stats = requestParameters.getStats();
final ResultMetadata outMetadata = requestParameters.getOutMetadata();
final String clientContextId = requestParameters.getClientContextId();
@@ -351,6 +352,7 @@
metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(resultDelivery
== ResultDelivery.ASYNC
|| resultDelivery ==
ResultDelivery.DEFERRED);
+ metadataProvider.setMaxResultReads(maxResultReads);
}
handleInsertUpsertStatement(metadataProvider, stmt,
hcc, hdc, resultDelivery, outMetadata,
stats, false, clientContextId);
@@ -386,6 +388,7 @@
metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC ||
resultDelivery == ResultDelivery.DEFERRED);
+ metadataProvider.setMaxResultReads(maxResultReads);
handleQuery(metadataProvider, (Query) stmt, hcc, hdc,
resultDelivery, outMetadata, stats,
clientContextId, ctx);
break;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index 5b8da8b..9592492 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -22,24 +22,24 @@
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.ResultProperties;
import org.apache.hyracks.api.dataset.IHyracksDataset;
public class RequestParameters implements IRequestParameters {
private final IHyracksDataset hdc;
- private final ResultDelivery resultDelivery;
+ private final ResultProperties resultProperties;
private final Stats stats;
private final Map<String, String> optionalParameters;
private final IStatementExecutor.ResultMetadata outMetadata;
private final String clientContextId;
- public RequestParameters(IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
+ public RequestParameters(IHyracksDataset hdc, ResultProperties
resultProperties, Stats stats,
IStatementExecutor.ResultMetadata outMetadata, String
clientContextId,
Map<String, String> optionalParameters) {
this.hdc = hdc;
- this.resultDelivery = resultDelivery;
+ this.resultProperties = resultProperties;
this.stats = stats;
this.outMetadata = outMetadata;
this.clientContextId = clientContextId;
@@ -52,8 +52,8 @@
}
@Override
- public IStatementExecutor.ResultDelivery getResultDelivery() {
- return resultDelivery;
+ public ResultProperties getResultProperties() {
+ return resultProperties;
}
@Override
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 30336d1..6db7ff7 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -122,7 +122,7 @@
private static final Pattern HTTP_PARAM_PATTERN = Pattern.compile("param
(\\w+)=(.*)", Pattern.MULTILINE);
private static final Pattern HTTP_BODY_PATTERN =
Pattern.compile("body=(.*)", Pattern.MULTILINE);
private static final Pattern HTTP_STATUSCODE_PATTERN =
Pattern.compile("statuscode (.*)", Pattern.MULTILINE);
-
+ private static final Pattern MAX_RESULT_READS_PATTERN =
Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE);
public static final int TRUNCATE_THRESHOLD = 16384;
public static final String DELIVERY_ASYNC = "async";
@@ -556,7 +556,11 @@
public InputStream executeQueryService(String str, OutputFormat fmt, URI
uri, List<Parameter> params,
boolean jsonEncoded, Predicate<Integer> responseCodeValidator,
boolean cancellable) throws Exception {
- final List<Parameter> newParams = upsertParam(params, "format",
fmt.mimeType());
+ List<Parameter> newParams = upsertParam(params, "format",
fmt.mimeType());
+ final Optional<String> maxReadsOptional = extractMaxResultReads(str);
+ if (maxReadsOptional.isPresent()) {
+ newParams = upsertParam(newParams, "max_result_reads",
maxReadsOptional.get());
+ }
HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str,
uri, "statement", newParams)
: constructPostMethodUrl(str, uri, "statement", newParams);
// Set accepted output response type
@@ -1392,6 +1396,14 @@
return tmpStmt;
}
+ protected static Optional<String> extractMaxResultReads(String statement) {
+ final Matcher m = MAX_RESULT_READS_PATTERN.matcher(statement);
+ while (m.find()) {
+ return Optional.of(m.group(1));
+ }
+ return Optional.empty();
+ }
+
protected static Optional<String> extractBody(String statement) {
final Matcher m = HTTP_BODY_PATTERN.matcher(statement);
while (m.find()) {
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
index 1e18f66..8055915 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
@@ -17,6 +17,7 @@
* under the License.
*/
+-- maxresultreads=2
-- handlevariable=status
select i, i * i as i2 from range(1, 10) i;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b8790e5..6f58b0a 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -153,6 +153,7 @@
private IAWriterFactory writerFactory;
private FileSplit outputFile;
private boolean asyncResults;
+ private long maxResultReads;
private ResultSetId resultSetId;
private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
private TxnId txnId;
@@ -236,6 +237,14 @@
public void setResultAsyncMode(boolean asyncResults) {
this.asyncResults = asyncResults;
+ }
+
+ public void setMaxResultReads(long maxResultReads) {
+ this.maxResultReads = maxResultReads;
+ }
+
+ public long getMaxResultReads() {
+ return maxResultReads;
}
public ResultSetId getResultSetId() {
@@ -536,7 +545,7 @@
IResultSerializerFactory resultSerializedAppenderFactory =
resultSerializerFactoryProvider
.getAqlResultSerializerFactoryProvider(printColumns,
printerFactories, getWriterFactory());
resultWriter = new ResultWriterOperatorDescriptor(spec, rsId,
ordered, getResultAsyncMode(),
- resultSerializedAppenderFactory);
+ resultSerializedAppenderFactory, getMaxResultReads());
} catch (IOException e) {
throw new AlgebricksException(e);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index 008f0be..e6cf6d3 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -25,7 +25,7 @@
public interface IDatasetPartitionManager extends IDatasetManager {
IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx,
ResultSetId rsId, boolean orderedResult,
- boolean asyncMode, int partition, int nPartitions) throws
HyracksException;
+ boolean asyncMode, int partition, int nPartitions, long maxReads)
throws HyracksException;
void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int
partition, int nPartitions,
boolean orderedResult, boolean emptyResult) throws
HyracksException;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index bc980e1..d381a67 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -70,12 +70,12 @@
@Override
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx,
ResultSetId rsId, boolean orderedResult,
- boolean asyncMode, int partition, int nPartitions) throws
HyracksException {
+ boolean asyncMode, int partition, int nPartitions, long maxReads) {
DatasetPartitionWriter dpw;
JobId jobId = ctx.getJobletContext().getJobId();
synchronized (this) {
dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId,
asyncMode, orderedResult, partition, nPartitions,
- datasetMemoryManager, fileFactory);
+ datasetMemoryManager, fileFactory, maxReads);
ResultSetMap rsIdMap =
partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
index ec33b05..26c19e8 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -31,11 +31,8 @@
private static final Logger LOGGER = LogManager.getLogger();
private final DatasetPartitionManager datasetPartitionManager;
-
private final DatasetMemoryManager datasetMemoryManager;
-
private final Executor executor;
-
private final ResultState resultState;
public DatasetPartitionReader(DatasetPartitionManager
datasetPartitionManager,
@@ -64,26 +61,32 @@
if (size <= 0) {
break;
} else if (size < buffer.limit()) {
- throw new HyracksDataException("Premature end
of file - readSize: " + size
- + " buffer limit: " + buffer.limit());
+ throw new IllegalStateException(
+ "Premature end of file - readSize: " +
size + " buffer limit: " + buffer
+ .limit());
}
offset += size;
buffer.flip();
channel.nextFrame(buffer);
}
- LOGGER.info("Result Reader read + " + offset + "
bytes");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Result Reader read + " + offset + "
bytes");
+ }
+ } catch (Exception e) {
+ LOGGER.error("partition reader failed", e);
+ channel.abort();
} finally {
channel.close();
resultState.readClose();
- // If the query is a synchronous query, remove its
partition as soon as it is read.
- if (!resultState.getAsyncMode()) {
+ // if resultState has been exhausted, delete the
result partition
+ if (resultState.isExhausted()) {
datasetPartitionManager.removePartition(resultState.getResultSetPartitionId().getJobId(),
-
resultState.getResultSetPartitionId().getResultSetId(), resultState
-
.getResultSetPartitionId().getPartition());
+
resultState.getResultSetPartitionId().getResultSetId(),
+
resultState.getResultSetPartitionId().getPartition());
}
}
} catch (HyracksDataException e) {
- throw new RuntimeException(e);
+ LOGGER.error("unexpected failure in partition reader", e);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("result reading successful(" +
resultState.getResultSetPartitionId() + ")");
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 2bf5326..d49a1a6 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -59,7 +59,7 @@
public DatasetPartitionWriter(IHyracksTaskContext ctx,
IDatasetPartitionManager manager, JobId jobId,
ResultSetId rsId, boolean asyncMode, boolean orderedResult, int
partition, int nPartitions,
- DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory
fileFactory) {
+ DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory
fileFactory, long maxReads) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
@@ -70,7 +70,7 @@
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId,
partition);
resultState = new ResultState(resultSetPartitionId, asyncMode,
ctx.getIoManager(), fileFactory,
- ctx.getInitialFrameSize());
+ ctx.getInitialFrameSize(), maxReads);
}
public ResultState getResultState() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index 43b1e9b..3e3f06b 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -68,17 +68,22 @@
private long size;
private long persistentSize;
+ private long remainingReads;
ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode,
IIOManager ioManager,
- IWorkspaceFileFactory fileFactory, int frameSize) {
+ IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) {
+ if (maxReads <= 0) {
+ throw new IllegalArgumentException("maxReads must be > 0");
+ }
this.resultSetPartitionId = resultSetPartitionId;
this.asyncMode = asyncMode;
this.ioManager = ioManager;
this.fileFactory = fileFactory;
this.frameSize = frameSize;
+ remainingReads = maxReads;
eos = new AtomicBoolean(false);
failed = new AtomicBoolean(false);
- localPageList = new ArrayList<Page>();
+ localPageList = new ArrayList<>();
fileRef = null;
writeFileHandle = null;
@@ -102,6 +107,7 @@
closeWriteFileHandle();
if (fileRef != null) {
fileRef.delete();
+ fileRef = null;
}
}
@@ -152,7 +158,10 @@
}
public synchronized void readOpen() {
- // It is a noOp for now, leaving here to keep the API stable for
future usage.
+ if (isExhausted()) {
+ throw new IllegalStateException("Result reads exhausted");
+ }
+ remainingReads--;
}
public synchronized void readClose() throws HyracksDataException {
@@ -339,6 +348,7 @@
ObjectNode on = om.createObjectNode();
on.put("rspid", resultSetPartitionId.toString());
on.put("async", asyncMode);
+ on.put("remainingReads", remainingReads);
on.put("eos", eos.get());
on.put("failed", failed.get());
on.put("fileRef", String.valueOf(fileRef));
@@ -347,4 +357,8 @@
return e.getMessage();
}
}
+
+ public synchronized boolean isExhausted() {
+ return remainingReads == 0;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 58eee79..d081bdb 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -51,14 +51,16 @@
private final boolean asyncMode;
private final IResultSerializerFactory resultSerializerFactory;
+ private final long maxReads;
public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec,
ResultSetId rsId, boolean ordered,
- boolean asyncMode, IResultSerializerFactory
resultSerializerFactory) throws IOException {
+ boolean asyncMode, IResultSerializerFactory
resultSerializerFactory, long maxReads) throws IOException {
super(spec, 1, 0);
this.rsId = rsId;
this.ordered = ordered;
this.asyncMode = asyncMode;
this.resultSerializerFactory = resultSerializerFactory;
+ this.maxReads = maxReads;
}
@Override
@@ -87,7 +89,7 @@
public void open() throws HyracksDataException {
try {
datasetPartitionWriter =
dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
- nPartitions);
+ nPartitions, maxReads);
datasetPartitionWriter.open();
resultSerializer.init();
} catch (HyracksException e) {
@@ -139,7 +141,8 @@
sb.append("{ ");
sb.append("\"rsId\": \"").append(rsId).append("\", ");
sb.append("\"ordered\": ").append(ordered).append(", ");
- sb.append("\"asyncMode\": ").append(asyncMode).append(" }");
+ sb.append("\"asyncMode\": ").append(asyncMode).append(", ");
+ sb.append("\"maxReads\": ").append(maxReads).append(" }");
return sb.toString();
}
};
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index f169054..080746c 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -100,7 +100,7 @@
ResultSetId rsId = new ResultSetId(1);
AbstractSingleActivityOperatorDescriptor printer = new
ResultWriterOperatorDescriptor(spec, rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
spec.addResultSetId(rsId);
return printer;
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index b5c6238..c05b504 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -101,7 +101,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC2_ID);
@@ -173,7 +173,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
@@ -246,7 +246,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 56bf853..b693b09 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -159,7 +159,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0,
sorter, 0);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 67642f4..67845c0 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -229,7 +229,7 @@
ResultSetId rsId = new ResultSetId(1);
AbstractSingleActivityOperatorDescriptor printer = new
ResultWriterOperatorDescriptor(spec, rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
spec.addResultSetId(rsId);
return printer;
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index b62c011..d7d4219 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -95,7 +95,7 @@
spec.addResultSetId(rsId);
outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true,
false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
outputOp[i], locations);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
index dc91dd2..06d7b04 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
@@ -71,7 +71,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC2_ID, NC1_ID);
@@ -107,7 +107,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC2_ID);
@@ -146,7 +146,7 @@
ResultSetId rsId = new ResultSetId(1);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
spec.addResultSetId(rsId);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC2_ID);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
index 3043cba..df9c0d7 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
@@ -85,7 +85,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, true, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0,
sorter, 0);
@@ -135,7 +135,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0,
sorter, 0);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 7075fe9..2c055c2 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -132,7 +132,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
MToNBroadcastConnectorDescriptor(spec);
@@ -215,7 +215,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
MToNBroadcastConnectorDescriptor(spec);
@@ -300,7 +300,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
MToNBroadcastConnectorDescriptor(spec);
@@ -385,7 +385,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
MToNBroadcastConnectorDescriptor(spec);
@@ -471,7 +471,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
@@ -563,7 +563,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
@@ -654,7 +654,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
MToNPartitioningConnectorDescriptor(spec,
@@ -750,7 +750,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordPartConn = new
MToNPartitioningConnectorDescriptor(spec,
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 03cd5d4..dc5d0bc 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -182,7 +182,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
MToNBroadcastConnectorDescriptor(spec);
@@ -263,7 +263,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
@@ -344,7 +344,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
@@ -430,7 +430,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC1_ID);
IConnectorDescriptor ordJoinConn = new
OneToOneConnectorDescriptor(spec);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 542f037..e4d6398 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -79,7 +79,7 @@
spec.addResultSetId(rsId);
IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec,
rsId, false, false,
-
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(),
1);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
NC2_ID, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0,
unionAll, 0);
--
To view, visit https://asterix-gerrit.ics.uci.edu/2292
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I86f75c791f034142c5b046445870bd91378c5b3a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>