Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2249][API] Add Max Result Reads to API ......................................................................
[ASTERIXDB-2249][API] Add Max Result Reads to API - user model changes: no - storage format changes: no - interface changes: yes - IRequestParameters: add ResultProperties - IDatasetPartitionManager: add maxReads Details: - Add option to specify max result reads and default it to 1. - Fix exception handling in DatasetPartitionReader. - Add option to specify maxResultReads in tests. - Use new option in async-repeated test. - Add test case for exhausted result. Change-Id: I86f75c791f034142c5b046445870bd91378c5b3a Reviewed-on: https://asterix-gerrit.ics.uci.edu/2292 Reviewed-by: Michael Blow <[email protected]> Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java A asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex A asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java 38 files changed, 349 insertions(+), 122 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java index 8d0f20b..a1fbac6 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java @@ -31,9 +31,11 @@ IHyracksDataset getHyracksDataset(); /** - * @return The {@code ResultDelivery} kind required for queries in the list of statements + * Gets the required result properties of the request. + * + * @return the result properties */ - IStatementExecutor.ResultDelivery getResultDelivery(); + ResultProperties getResultProperties(); /** * @return a reference to write the stats of executed queries diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java new file mode 100644 index 0000000..4866c6d --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.translator; + +import java.io.Serializable; + +public class ResultProperties implements Serializable { + + public static final long DEFAULT_MAX_READS = 1; + private final IStatementExecutor.ResultDelivery delivery; + private final long maxReads; + + public ResultProperties(IStatementExecutor.ResultDelivery delivery) { + this(delivery, DEFAULT_MAX_READS); + } + + public ResultProperties(IStatementExecutor.ResultDelivery delivery, long maxReads) { + this.delivery = delivery; + this.maxReads = maxReads; + } + + public IStatementExecutor.ResultDelivery getDelivery() { + return delivery; + } + + public long getMaxReads() { + return maxReads; + } + + public ResultProperties getNcToCcResultProperties() { + if (delivery != IStatementExecutor.ResultDelivery.IMMEDIATE) { + return this; + } + // switch IMMEDIATE to DEFERRED since the result will be severed by the NC + return new ResultProperties(IStatementExecutor.ResultDelivery.DEFERRED, maxReads); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java index 87c1c57..df2a2a1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java @@ -48,6 +48,7 @@ import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.asterix.translator.ResultProperties; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; import org.apache.asterix.translator.SessionConfig.PlanFormat; @@ -164,7 +165,7 @@ double duration; long startTime = System.currentTimeMillis(); final IRequestParameters requestParameters = - new RequestParameters(hds, IStatementExecutor.ResultDelivery.IMMEDIATE, + new RequestParameters(hds, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), new IStatementExecutor.Stats(), null, null, null); translator.compileAndExecute(hcc, null, requestParameters); long endTime = System.currentTimeMillis(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 83a40f0..76f489c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -41,6 +41,7 @@ import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.ResultProperties; import org.apache.asterix.translator.SessionOutput; import org.apache.commons.lang3.tuple.Triple; import org.apache.hyracks.api.application.INCServiceContext; @@ -66,14 +67,12 @@ @Override protected void executeStatement(String statementsText, SessionOutput sessionOutput, - IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param, + ResultProperties resultProperties, IStatementExecutor.Stats stats, RequestParameters param, RequestExecutionState execution, Map<String, String> optionalParameters) throws Exception { // Running on NC -> send 'execute' message to CC INCServiceContext ncCtx = (INCServiceContext) serviceCtx; INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker(); - IStatementExecutor.ResultDelivery ccDelivery = - delivery == IStatementExecutor.ResultDelivery.IMMEDIATE ? IStatementExecutor.ResultDelivery.DEFERRED - : delivery; + final IStatementExecutor.ResultDelivery delivery = resultProperties.getDelivery(); ExecuteStatementResponseMessage responseMsg; MessageFuture responseFuture = ncMb.registerMessageFuture(); final String handleUrl = getHandleUrl(param.host, param.path, delivery); @@ -86,8 +85,8 @@ timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout)); } ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(), - responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery, - param.clientContextID, handleUrl, optionalParameters); + responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), + resultProperties.getNcToCcResultProperties(), param.clientContextID, handleUrl, optionalParameters); execution.start(); ncMb.sendMessageToCC(requestMsg); try { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index a8f14b5..23a7ba7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.List; import java.util.Map; @@ -48,6 +47,7 @@ import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.asterix.translator.ResultProperties; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -137,7 +137,8 @@ PRETTY("pretty"), MODE("mode"), TIMEOUT("timeout"), - PLAN_FORMAT("plan-format"); + PLAN_FORMAT("plan-format"), + MAX_RESULT_READS("max-result-reads"); private final String str; @@ -193,6 +194,7 @@ boolean pretty; String clientContextID; String mode; + String maxResultReads; @Override public String toString() { @@ -207,6 +209,7 @@ on.put("clientContextID", clientContextID); on.put("format", format); on.put("timeout", timeout); + on.put("maxResultReads", maxResultReads); return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on); } catch (JsonProcessingException e) { // NOSONAR return e.getMessage(); @@ -383,6 +386,7 @@ param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str())); param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str()); param.timeout = getOptText(jsonRequest, Parameter.TIMEOUT.str()); + param.maxResultReads = getOptText(jsonRequest, Parameter.MAX_RESULT_READS.str()); } catch (JsonParseException | JsonMappingException e) { // if the JSON parsing fails, the statement is empty and we get an empty statement error GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), e); @@ -397,6 +401,7 @@ param.mode = toLower(request.getParameter(Parameter.MODE.str())); param.clientContextID = request.getParameter(Parameter.CLIENT_ID.str()); param.timeout = request.getParameter(Parameter.TIMEOUT.str()); + param.maxResultReads = request.getParameter(Parameter.MAX_RESULT_READS.str()); } return param; } @@ -448,6 +453,10 @@ ResultDelivery delivery = parseResultDelivery(param.mode); + final ResultProperties resultProperties = param.maxResultReads == null ? + new ResultProperties(delivery) : + new ResultProperties(delivery, Long.parseLong(param.maxResultReads)); + String handleUrl = getHandleUrl(param.host, param.path, delivery); SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter); SessionConfig sessionConfig = sessionOutput.config(); @@ -478,7 +487,7 @@ "http://" + hostName + ":" + appCtx.getExternalProperties().getQueryWebInterfacePort()); response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept"); response.setStatus(execution.getHttpStatus()); - executeStatement(statementsText, sessionOutput, delivery, stats, param, execution, optionalParams); + executeStatement(statementsText, sessionOutput, resultProperties, stats, param, execution, optionalParams); if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) { ResultUtil.printStatus(sessionOutput, execution.getResultStatus()); } @@ -502,9 +511,9 @@ } } - protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery, - IStatementExecutor.Stats stats, RequestParameters param, RequestExecutionState execution, - Map<String, String> optionalParameters) throws Exception { + protected void executeStatement(String statementsText, SessionOutput sessionOutput, + ResultProperties resultProperties, IStatementExecutor.Stats stats, RequestParameters param, + RequestExecutionState execution, Map<String, String> optionalParameters) throws Exception { IClusterManagementWork.ClusterState clusterState = ((ICcApplicationContext) appCtx).getClusterStateManager().getState(); if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { @@ -518,8 +527,8 @@ sessionOutput, compilationProvider, componentProvider); execution.start(); final IRequestParameters requestParameters = - new org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), delivery, stats, null, - param.clientContextID, optionalParameters); + new org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), resultProperties, stats, + null, param.clientContextID, optionalParameters); translator.compileAndExecute(getHyracksClientConnection(), queryCtx, requestParameters); execution.end(); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java index 3359b9f..360c522 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java @@ -42,6 +42,7 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.asterix.translator.ResultProperties; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; import org.apache.asterix.translator.SessionConfig.PlanFormat; @@ -209,7 +210,8 @@ IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput, compilationProvider, componentProvider); final IRequestParameters requestParameters = - new RequestParameters(hds, resultDelivery, new IStatementExecutor.Stats(), null, null, null); + new RequestParameters(hds, new ResultProperties(resultDelivery), new IStatementExecutor.Stats(), + null, null, null); translator.compileAndExecute(hcc, null, requestParameters); } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java index 58a7f09..4ecd978 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java @@ -35,6 +35,7 @@ import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.asterix.translator.ResultProperties; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; import org.apache.asterix.translator.SessionConfig.PlanFormat; @@ -120,8 +121,8 @@ IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider, storageComponentProvider); final IRequestParameters requestParameters = - new RequestParameters(null, IStatementExecutor.ResultDelivery.IMMEDIATE, new IStatementExecutor.Stats(), - null, null, null); + new RequestParameters(null, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), + new IStatementExecutor.Stats(), null, null, null); translator.compileAndExecute(hcc, null, requestParameters); writer.flush(); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index 0b8c34c..5b0eb97 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -46,6 +46,7 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.asterix.translator.ResultProperties; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -69,20 +70,20 @@ private final ILangExtension.Language lang; private final String statementsText; private final SessionConfig sessionConfig; - private final IStatementExecutor.ResultDelivery delivery; + private final ResultProperties resultProperties; private final String clientContextID; private final String handleUrl; private final Map<String, String> optionalParameters; public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, - String statementsText, SessionConfig sessionConfig, IStatementExecutor.ResultDelivery delivery, + String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties, String clientContextID, String handleUrl, Map<String, String> optionalParameters) { this.requestNodeId = requestNodeId; this.requestMessageId = requestMessageId; this.lang = lang; this.statementsText = statementsText; this.sessionConfig = sessionConfig; - this.delivery = delivery; + this.resultProperties = resultProperties; this.clientContextID = clientContextID; this.handleUrl = handleUrl; this.optionalParameters = optionalParameters; @@ -122,7 +123,8 @@ compilationProvider, storageComponentProvider); final IStatementExecutor.Stats stats = new IStatementExecutor.Stats(); final IRequestParameters requestParameters = - new RequestParameters(null, delivery, stats, outMetadata, clientContextID, optionalParameters); + new RequestParameters(null, resultProperties, stats, outMetadata, clientContextID, + optionalParameters); translator.compileAndExecute(ccApp.getHcc(), statementExecutorContext, requestParameters); outPrinter.close(); responseMsg.setResult(outWriter.toString()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 9b96883..4e9cb47 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -283,7 +283,8 @@ Thread.currentThread().setName(QueryTranslator.class.getSimpleName()); Map<String, String> config = new HashMap<>(); final IHyracksDataset hdc = requestParameters.getHyracksDataset(); - final ResultDelivery resultDelivery = requestParameters.getResultDelivery(); + final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery(); + final long maxResultReads = requestParameters.getResultProperties().getMaxReads(); final Stats stats = requestParameters.getStats(); final ResultMetadata outMetadata = requestParameters.getOutMetadata(); final String clientContextId = requestParameters.getClientContextId(); @@ -351,6 +352,7 @@ metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); + metadataProvider.setMaxResultReads(maxResultReads); } handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, outMetadata, stats, false, clientContextId); @@ -386,6 +388,7 @@ metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); metadataProvider.setResultAsyncMode( resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); + metadataProvider.setMaxResultReads(maxResultReads); handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, outMetadata, stats, clientContextId, ctx); break; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java index 5b8da8b..9592492 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java @@ -22,24 +22,24 @@ import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; -import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.asterix.translator.ResultProperties; import org.apache.hyracks.api.dataset.IHyracksDataset; public class RequestParameters implements IRequestParameters { private final IHyracksDataset hdc; - private final ResultDelivery resultDelivery; + private final ResultProperties resultProperties; private final Stats stats; private final Map<String, String> optionalParameters; private final IStatementExecutor.ResultMetadata outMetadata; private final String clientContextId; - public RequestParameters(IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, + public RequestParameters(IHyracksDataset hdc, ResultProperties resultProperties, Stats stats, IStatementExecutor.ResultMetadata outMetadata, String clientContextId, Map<String, String> optionalParameters) { this.hdc = hdc; - this.resultDelivery = resultDelivery; + this.resultProperties = resultProperties; this.stats = stats; this.outMetadata = outMetadata; this.clientContextId = clientContextId; @@ -52,8 +52,8 @@ } @Override - public IStatementExecutor.ResultDelivery getResultDelivery() { - return resultDelivery; + public ResultProperties getResultProperties() { + return resultProperties; } @Override diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 30336d1..222e098 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -57,6 +57,7 @@ import java.util.regex.Pattern; import java.util.stream.Stream; +import org.apache.asterix.api.http.server.QueryServiceServlet; import org.apache.asterix.app.external.IExternalUDFLibrarian; import org.apache.asterix.common.api.Duration; import org.apache.asterix.common.config.GlobalConfig; @@ -122,7 +123,7 @@ private static final Pattern HTTP_PARAM_PATTERN = Pattern.compile("param (\\w+)=(.*)", Pattern.MULTILINE); private static final Pattern HTTP_BODY_PATTERN = Pattern.compile("body=(.*)", Pattern.MULTILINE); private static final Pattern HTTP_STATUSCODE_PATTERN = Pattern.compile("statuscode (.*)", Pattern.MULTILINE); - + private static final Pattern MAX_RESULT_READS_PATTERN = Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE); public static final int TRUNCATE_THRESHOLD = 16384; public static final String DELIVERY_ASYNC = "async"; @@ -556,7 +557,12 @@ public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception { - final List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType()); + List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType()); + final Optional<String> maxReadsOptional = extractMaxResultReads(str); + if (maxReadsOptional.isPresent()) { + newParams = upsertParam(newParams, QueryServiceServlet.Parameter.MAX_RESULT_READS.str(), + maxReadsOptional.get()); + } HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams) : constructPostMethodUrl(str, uri, "statement", newParams); // Set accepted output response type @@ -1392,6 +1398,14 @@ return tmpStmt; } + protected static Optional<String> extractMaxResultReads(String statement) { + final Matcher m = MAX_RESULT_READS_PATTERN.matcher(statement); + while (m.find()) { + return Optional.of(m.group(1)); + } + return Optional.empty(); + } + protected static Optional<String> extractBody(String statement) { final Matcher m = HTTP_BODY_PATTERN.matcher(statement); while (m.find()) { diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml index 9a2ad3c..fe030a8 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml @@ -49,4 +49,10 @@ <output-dir compare="Text">async-running</output-dir> </compilation-unit> </test-case> + <test-case FilePath="async-deferred"> + <compilation-unit name="async-exhausted-result"> + <output-dir compare="Text">async-exhausted-result</output-dir> + <expected-error>Job Failed</expected-error> + </compilation-unit> + </test-case> </test-group> diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp new file mode 100644 index 0000000..f8ec2cf --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.1.async.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +-- maxresultreads=1 +-- handlevariable=status + +select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri new file mode 100644 index 0000000..bca879b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.2.pollget.uri @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +-- polltimeoutsecs=20 +-- handlevariable=result + +$status diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri new file mode 100644 index 0000000..b613531 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.3.get.uri @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +$result diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri new file mode 100644 index 0000000..b613531 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-exhausted-result/async-exhausted-result.4.get.uri @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +$result diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp index 1e18f66..8055915 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp @@ -17,6 +17,7 @@ * under the License. */ +-- maxresultreads=2 -- handlevariable=status select i, i * i as i2 from range(1, 10) i; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.1.ignore diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex new file mode 100644 index 0000000..4308ba2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.2.regex @@ -0,0 +1,2 @@ +/"status": "success"/ +/"handle": ".*"/ diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json new file mode 100644 index 0000000..09e86cc --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-exhausted-result/async-exhausted-result.3.json @@ -0,0 +1,10 @@ +{ "i": 1, "i2": 1 } +{ "i": 2, "i2": 4 } +{ "i": 3, "i2": 9 } +{ "i": 4, "i2": 16 } +{ "i": 5, "i2": 25 } +{ "i": 6, "i2": 36 } +{ "i": 7, "i2": 49 } +{ "i": 8, "i2": 64 } +{ "i": 9, "i2": 81 } +{ "i": 10, "i2": 100 } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index b8790e5..6f58b0a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -153,6 +153,7 @@ private IAWriterFactory writerFactory; private FileSplit outputFile; private boolean asyncResults; + private long maxResultReads; private ResultSetId resultSetId; private IResultSerializerFactoryProvider resultSerializerFactoryProvider; private TxnId txnId; @@ -236,6 +237,14 @@ public void setResultAsyncMode(boolean asyncResults) { this.asyncResults = asyncResults; + } + + public void setMaxResultReads(long maxResultReads) { + this.maxResultReads = maxResultReads; + } + + public long getMaxResultReads() { + return maxResultReads; } public ResultSetId getResultSetId() { @@ -536,7 +545,7 @@ IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory()); resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, getResultAsyncMode(), - resultSerializedAppenderFactory); + resultSerializedAppenderFactory, getMaxResultReads()); } catch (IOException e) { throw new AlgebricksException(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java index 008f0be..e6cf6d3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java @@ -25,7 +25,7 @@ public interface IDatasetPartitionManager extends IDatasetManager { IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult, - boolean asyncMode, int partition, int nPartitions) throws HyracksException; + boolean asyncMode, int partition, int nPartitions, long maxReads) throws HyracksException; void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions, boolean orderedResult, boolean emptyResult) throws HyracksException; diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java index 60e2e35..56c4576 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java @@ -52,7 +52,8 @@ } @Override - public void open() throws HyracksDataException { + public void open() { + // no op } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index bc980e1..d381a67 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -70,12 +70,12 @@ @Override public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult, - boolean asyncMode, int partition, int nPartitions) throws HyracksException { + boolean asyncMode, int partition, int nPartitions, long maxReads) { DatasetPartitionWriter dpw; JobId jobId = ctx.getJobletContext().getJobId(); synchronized (this) { dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions, - datasetMemoryManager, fileFactory); + datasetMemoryManager, fileFactory, maxReads); ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap()); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java index ec33b05..24edeb2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.partitions.ResultSetPartitionId; import org.apache.hyracks.comm.channels.NetworkOutputChannel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -31,11 +32,8 @@ private static final Logger LOGGER = LogManager.getLogger(); private final DatasetPartitionManager datasetPartitionManager; - private final DatasetMemoryManager datasetMemoryManager; - private final Executor executor; - private final ResultState resultState; public DatasetPartitionReader(DatasetPartitionManager datasetPartitionManager, @@ -47,56 +45,67 @@ } public void writeTo(final IFrameWriter writer) { - executor.execute(new Runnable() { - @Override - public void run() { - NetworkOutputChannel channel = (NetworkOutputChannel) writer; - channel.setFrameSize(resultState.getFrameSize()); - try { - resultState.readOpen(); - channel.open(); - try { - long offset = 0; - ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize()); - while (true) { - buffer.clear(); - long size = read(offset, buffer); - if (size <= 0) { - break; - } else if (size < buffer.limit()) { - throw new HyracksDataException("Premature end of file - readSize: " + size - + " buffer limit: " + buffer.limit()); - } - offset += size; - buffer.flip(); - channel.nextFrame(buffer); - } - LOGGER.info("Result Reader read + " + offset + " bytes"); - } finally { - channel.close(); - resultState.readClose(); - // If the query is a synchronous query, remove its partition as soon as it is read. - if (!resultState.getAsyncMode()) { - datasetPartitionManager.removePartition(resultState.getResultSetPartitionId().getJobId(), - resultState.getResultSetPartitionId().getResultSetId(), resultState - .getResultSetPartitionId().getPartition()); - } - } - } catch (HyracksDataException e) { - throw new RuntimeException(e); - } - if (LOGGER.isInfoEnabled()) { - LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")"); - } - } + executor.execute(new ResultPartitionSender((NetworkOutputChannel) writer)); + } - private long read(long offset, ByteBuffer buffer) throws HyracksDataException { - if (datasetMemoryManager == null) { - return resultState.read(offset, buffer); - } else { - return resultState.read(datasetMemoryManager, offset, buffer); + private class ResultPartitionSender implements Runnable { + + private final NetworkOutputChannel channel; + + ResultPartitionSender(final NetworkOutputChannel channel) { + this.channel = channel; + } + + @Override + public void run() { + channel.setFrameSize(resultState.getFrameSize()); + channel.open(); + try { + resultState.readOpen(); + long offset = 0; + final ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize()); + while (true) { + buffer.clear(); + final long size = read(offset, buffer); + if (size <= 0) { + break; + } else if (size < buffer.limit()) { + throw new IllegalStateException( + "Premature end of file - readSize: " + size + " buffer limit: " + buffer.limit()); + } + offset += size; + buffer.flip(); + channel.nextFrame(buffer); } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("result reading successful(" + resultState.getResultSetPartitionId() + ")"); + } + } catch (Exception e) { + LOGGER.error(() -> "failed to send result partition " + resultState.getResultSetPartitionId(), e); + channel.abort(); + } finally { + close(); } - }); + } + + private long read(long offset, ByteBuffer buffer) throws HyracksDataException { + return datasetMemoryManager != null ? + resultState.read(datasetMemoryManager, offset, buffer) : + resultState.read(offset, buffer); + } + + private void close() { + try { + channel.close(); + resultState.readClose(); + if (resultState.isExhausted()) { + final ResultSetPartitionId partitionId = resultState.getResultSetPartitionId(); + datasetPartitionManager.removePartition(partitionId.getJobId(), partitionId.getResultSetId(), + partitionId.getPartition()); + } + } catch (HyracksDataException e) { + LOGGER.error("unexpected failure in partition reader clean up", e); + } + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java index 2bf5326..d49a1a6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java @@ -59,7 +59,7 @@ public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId, ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions, - DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) { + DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory, long maxReads) { this.manager = manager; this.jobId = jobId; this.resultSetId = rsId; @@ -70,7 +70,7 @@ resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition); resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIoManager(), fileFactory, - ctx.getInitialFrameSize()); + ctx.getInitialFrameSize(), maxReads); } public ResultState getResultState() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java index 43b1e9b..3e3f06b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java @@ -68,17 +68,22 @@ private long size; private long persistentSize; + private long remainingReads; ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager, - IWorkspaceFileFactory fileFactory, int frameSize) { + IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) { + if (maxReads <= 0) { + throw new IllegalArgumentException("maxReads must be > 0"); + } this.resultSetPartitionId = resultSetPartitionId; this.asyncMode = asyncMode; this.ioManager = ioManager; this.fileFactory = fileFactory; this.frameSize = frameSize; + remainingReads = maxReads; eos = new AtomicBoolean(false); failed = new AtomicBoolean(false); - localPageList = new ArrayList<Page>(); + localPageList = new ArrayList<>(); fileRef = null; writeFileHandle = null; @@ -102,6 +107,7 @@ closeWriteFileHandle(); if (fileRef != null) { fileRef.delete(); + fileRef = null; } } @@ -152,7 +158,10 @@ } public synchronized void readOpen() { - // It is a noOp for now, leaving here to keep the API stable for future usage. + if (isExhausted()) { + throw new IllegalStateException("Result reads exhausted"); + } + remainingReads--; } public synchronized void readClose() throws HyracksDataException { @@ -339,6 +348,7 @@ ObjectNode on = om.createObjectNode(); on.put("rspid", resultSetPartitionId.toString()); on.put("async", asyncMode); + on.put("remainingReads", remainingReads); on.put("eos", eos.get()); on.put("failed", failed.get()); on.put("fileRef", String.valueOf(fileRef)); @@ -347,4 +357,8 @@ return e.getMessage(); } } + + public synchronized boolean isExhausted() { + return remainingReads == 0; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java index 58eee79..d081bdb 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java @@ -51,14 +51,16 @@ private final boolean asyncMode; private final IResultSerializerFactory resultSerializerFactory; + private final long maxReads; public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered, - boolean asyncMode, IResultSerializerFactory resultSerializerFactory) throws IOException { + boolean asyncMode, IResultSerializerFactory resultSerializerFactory, long maxReads) throws IOException { super(spec, 1, 0); this.rsId = rsId; this.ordered = ordered; this.asyncMode = asyncMode; this.resultSerializerFactory = resultSerializerFactory; + this.maxReads = maxReads; } @Override @@ -87,7 +89,7 @@ public void open() throws HyracksDataException { try { datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition, - nPartitions); + nPartitions, maxReads); datasetPartitionWriter.open(); resultSerializer.init(); } catch (HyracksException e) { @@ -139,7 +141,8 @@ sb.append("{ "); sb.append("\"rsId\": \"").append(rsId).append("\", "); sb.append("\"ordered\": ").append(ordered).append(", "); - sb.append("\"asyncMode\": ").append(asyncMode).append(" }"); + sb.append("\"asyncMode\": ").append(asyncMode).append(", "); + sb.append("\"maxReads\": ").append(maxReads).append(" }"); return sb.toString(); } }; diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java index f169054..080746c 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java @@ -100,7 +100,7 @@ ResultSetId rsId = new ResultSetId(1); AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); spec.addResultSetId(rsId); return printer; diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java index b5c6238..c05b504 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java @@ -101,7 +101,7 @@ ResultSetId rsId = new ResultSetId(1); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); spec.addResultSetId(rsId); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID); @@ -173,7 +173,7 @@ ResultSetId rsId = new ResultSetId(1); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); spec.addResultSetId(rsId); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); @@ -246,7 +246,7 @@ ResultSetId rsId = new ResultSetId(1); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); spec.addResultSetId(rsId); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java index 56bf853..b693b09 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java @@ -159,7 +159,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java index 67642f4..67845c0 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java @@ -229,7 +229,7 @@ ResultSetId rsId = new ResultSetId(1); AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); spec.addResultSetId(rsId); return printer; diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java index b62c011..d7d4219 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java @@ -95,7 +95,7 @@ spec.addResultSetId(rsId); outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java index dc91dd2..06d7b04 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java @@ -71,7 +71,7 @@ ResultSetId rsId = new ResultSetId(1); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); spec.addResultSetId(rsId); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID); @@ -107,7 +107,7 @@ ResultSetId rsId = new ResultSetId(1); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); spec.addResultSetId(rsId); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID); @@ -146,7 +146,7 @@ ResultSetId rsId = new ResultSetId(1); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); spec.addResultSetId(rsId); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java index 3043cba..df9c0d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java @@ -85,7 +85,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0); @@ -135,7 +135,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java index 7075fe9..2c055c2 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java @@ -132,7 +132,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); @@ -215,7 +215,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); @@ -300,7 +300,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); @@ -385,7 +385,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); @@ -471,7 +471,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec, @@ -563,7 +563,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec, @@ -654,7 +654,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec, @@ -750,7 +750,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordPartConn = new MToNPartitioningConnectorDescriptor(spec, diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java index 03cd5d4..dc5d0bc 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java @@ -182,7 +182,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); @@ -263,7 +263,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec); @@ -344,7 +344,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec); @@ -430,7 +430,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java index 542f037..e4d6398 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java @@ -79,7 +79,7 @@ spec.addResultSetId(rsId); IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider()); + ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID); spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0); -- To view, visit https://asterix-gerrit.ics.uci.edu/2292 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I86f75c791f034142c5b046445870bd91378c5b3a Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
