Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][FUN] Add Completed_Requests Function ......................................................................
[NO ISSUE][FUN] Add Completed_Requests Function - user model changes: no - storage format changes: no - interface changes: yes Details: - Add completed_requests to get recently completed requests. - Use enum for requests state. - Add new config to specify the requests archive size. - Add test case for completed_requests function. Change-Id: I3f47d523c683c3879ec52ce5bdaf16ce338e8e46 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3301 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsFunction.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsReader.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsDatasource.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsRewriter.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsRequest.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsResponse.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.1.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.1.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.2.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java M asterixdb/asterix-runtime/pom.xml M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java 20 files changed, 250 insertions(+), 26 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java index 50e6cc2..b7ef4e6 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -33,7 +33,7 @@ private boolean complete; private final IRequestReference requestReference; private boolean cancellable = false; - protected volatile String state = "received"; + protected volatile State state = State.RECEIVED; public BaseClientRequest(IRequestReference requestReference) { this.requestReference = requestReference; @@ -45,6 +45,7 @@ return; } complete = true; + state = State.COMPLETED; } @Override @@ -53,6 +54,7 @@ return; } complete(); + state = State.CANCELLED; if (cancellable) { doCancel(appCtx); } @@ -76,7 +78,7 @@ } public void setRunning() { - state = "running"; + state = State.RUNNING; } @Override @@ -90,7 +92,7 @@ json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString()); json.put("elapsedTime", getElapsedTime()); json.put("node", requestReference.getNode()); - json.put("state", state); + json.put("state", state.getLabel()); json.put("userAgent", ((RequestReference) requestReference).getUserAgent()); json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr()); json.put("cancellable", cancellable); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java index 6d32763..eb5416b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.app.function; +import org.apache.asterix.app.message.ClientRequestsRequest; import org.apache.asterix.metadata.api.IDatasourceFunction; import org.apache.asterix.metadata.declared.DataSourceId; import org.apache.asterix.metadata.declared.FunctionDataSource; @@ -40,6 +41,6 @@ AlgebricksAbsolutePartitionConstraint locations) { AlgebricksAbsolutePartitionConstraint randomLocation = AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations()); - return new ActiveRequestsFunction(randomLocation); + return new ClientRequestsFunction(randomLocation, ClientRequestsRequest.RequestType.RUNNING); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsFunction.java similarity index 77% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsFunction.java index 7c621f1..d0e1b27 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsFunction.java @@ -18,12 +18,13 @@ */ package org.apache.asterix.app.function; +import static org.apache.asterix.app.message.ClientRequestsRequest.RequestType; import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; import java.util.concurrent.TimeUnit; -import org.apache.asterix.app.message.ActiveRequestsRequest; -import org.apache.asterix.app.message.ActiveRequestsResponse; +import org.apache.asterix.app.message.ClientRequestsRequest; +import org.apache.asterix.app.message.ClientRequestsResponse; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.external.api.IRecordReader; @@ -35,13 +36,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class ActiveRequestsFunction extends AbstractDatasourceFunction { +public class ClientRequestsFunction extends AbstractDatasourceFunction { private static final Logger LOGGER = LogManager.getLogger(); private static final long serialVersionUID = 1L; + private final RequestType requestType; - public ActiveRequestsFunction(AlgebricksAbsolutePartitionConstraint locations) { + public ClientRequestsFunction(AlgebricksAbsolutePartitionConstraint locations, RequestType requestType) { super(locations); + this.requestType = requestType; } @Override @@ -51,12 +54,12 @@ INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker(); MessageFuture messageFuture = messageBroker.registerMessageFuture(); long futureId = messageFuture.getFutureId(); - ActiveRequestsRequest request = new ActiveRequestsRequest(serviceCtx.getNodeId(), futureId); + ClientRequestsRequest request = new ClientRequestsRequest(serviceCtx.getNodeId(), futureId, requestType); try { messageBroker.sendMessageToPrimaryCC(request); - ActiveRequestsResponse response = - (ActiveRequestsResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - return new ActiveRequestsReader(response.getRequests()); + ClientRequestsResponse response = + (ClientRequestsResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + return new ClientRequestsReader(response.getRequests()); } catch (Exception e) { LOGGER.warn("Could not retrieve active requests", e); throw HyracksDataException.create(e); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsReader.java similarity index 80% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsReader.java index 9c9ebd9..a5b6370 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsReader.java @@ -23,26 +23,26 @@ import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.input.record.CharArrayRecord; -public class ActiveRequestsReader extends FunctionReader { +public class ClientRequestsReader extends FunctionReader { - private final String[] activeRequests; + private final String[] requests; private CharArrayRecord record; private int recordIndex; - public ActiveRequestsReader(String[] activeRequests) { - this.activeRequests = activeRequests; + public ClientRequestsReader(String[] requests) { + this.requests = requests; record = new CharArrayRecord(); } @Override public boolean hasNext() throws Exception { - return recordIndex < activeRequests.length; + return recordIndex < requests.length; } @Override public IRawRecord<char[]> next() throws IOException { record.reset(); - record.append((activeRequests[recordIndex++]).toCharArray()); + record.append((requests[recordIndex++]).toCharArray()); record.endRecord(); return record; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsDatasource.java new file mode 100644 index 0000000..4c3672b --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsDatasource.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.app.message.ClientRequestsRequest; +import org.apache.asterix.metadata.api.IDatasourceFunction; +import org.apache.asterix.metadata.declared.DataSourceId; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; + +public class CompletedRequestsDatasource extends FunctionDataSource { + + private static final DataSourceId COMPLETED_REQUESTS_DATASOURCE_ID = + new DataSourceId(ActiveRequestsRewriter.ACTIVE_REQUESTS.getNamespace(), + CompletedRequestsRewriter.COMPLETED_REQUESTS.getName()); + + public CompletedRequestsDatasource(INodeDomain domain) throws AlgebricksException { + super(COMPLETED_REQUESTS_DATASOURCE_ID, domain); + } + + @Override + protected IDatasourceFunction createFunction(MetadataProvider metadataProvider, + AlgebricksAbsolutePartitionConstraint locations) { + AlgebricksAbsolutePartitionConstraint randomLocation = + AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations()); + return new ClientRequestsFunction(randomLocation, ClientRequestsRequest.RequestType.COMPLETED); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsRewriter.java new file mode 100644 index 0000000..df6bd3e --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsRewriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.common.functions.FunctionConstants; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +public class CompletedRequestsRewriter extends FunctionRewriter { + + public static final FunctionIdentifier COMPLETED_REQUESTS = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "completed-requests", 0); + public static final CompletedRequestsRewriter INSTANCE = new CompletedRequestsRewriter(COMPLETED_REQUESTS); + + private CompletedRequestsRewriter(FunctionIdentifier functionId) { + super(functionId); + } + + @Override + protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f) + throws AlgebricksException { + return new CompletedRequestsDatasource(context.getComputationNodeDomain()); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsRequest.java similarity index 68% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsRequest.java index f1c2a9c..32667d8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsRequest.java @@ -29,23 +29,40 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class ActiveRequestsRequest implements ICcAddressedMessage { +public class ClientRequestsRequest implements ICcAddressedMessage { + + public enum RequestType { + RUNNING, + COMPLETED + } private static final Logger LOGGER = LogManager.getLogger(); private static final long serialVersionUID = 1L; private final String nodeId; private final long reqId; + private final RequestType requestType; - public ActiveRequestsRequest(String nodeId, long reqId) { + public ClientRequestsRequest(String nodeId, long reqId, RequestType requestType) { this.nodeId = nodeId; this.reqId = reqId; + this.requestType = requestType; } @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - final Collection<IClientRequest> runningRequests = appCtx.getRequestTracker().getRunningRequests(); - final String[] requests = runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new); - ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, requests); + Collection<IClientRequest> clientRequests; + switch (requestType) { + case RUNNING: + clientRequests = appCtx.getRequestTracker().getRunningRequests(); + break; + case COMPLETED: + clientRequests = appCtx.getRequestTracker().getCompletedRequests(); + break; + default: + throw new IllegalStateException("unrecognized request type: " + requestType); + } + final String[] requests = clientRequests.stream().map(IClientRequest::toJson).toArray(String[]::new); + ClientRequestsResponse response = new ClientRequestsResponse(reqId, requests); CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); try { messageBroker.sendApplicationMessageToNC(response, nodeId); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsResponse.java similarity index 93% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsResponse.java index 0bf7976..5c25208 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsResponse.java @@ -24,13 +24,13 @@ import org.apache.asterix.messaging.NCMessageBroker; import org.apache.hyracks.api.exceptions.HyracksDataException; -public class ActiveRequestsResponse implements INcAddressedMessage { +public class ClientRequestsResponse implements INcAddressedMessage { private static final long serialVersionUID = 1L; private final long reqId; private final String[] requests; - public ActiveRequestsResponse(long reqId, String[] requests) { + public ClientRequestsResponse(long reqId, String[] requests) { this.reqId = reqId; this.requests = requests; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java index a8314ec..738d6b4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java @@ -19,6 +19,7 @@ package org.apache.asterix.util; import org.apache.asterix.app.function.ActiveRequestsRewriter; +import org.apache.asterix.app.function.CompletedRequestsRewriter; import org.apache.asterix.app.function.DatasetResourcesRewriter; import org.apache.asterix.app.function.DatasetRewriter; import org.apache.asterix.app.function.FeedRewriter; @@ -66,6 +67,12 @@ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); BuiltinFunctions.addUnnestFun(JobSummariesRewriter.JOBSUMMARIES, true); BuiltinFunctions.addDatasourceFunction(JobSummariesRewriter.JOBSUMMARIES, JobSummariesRewriter.INSTANCE); + // completed requests function + BuiltinFunctions.addFunction(CompletedRequestsRewriter.COMPLETED_REQUESTS, + (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); + BuiltinFunctions.addUnnestFun(CompletedRequestsRewriter.COMPLETED_REQUESTS, true); + BuiltinFunctions.addDatasourceFunction(CompletedRequestsRewriter.COMPLETED_REQUESTS, + CompletedRequestsRewriter.INSTANCE); } private MetadataBuiltinFunctions() { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java index 801bd0f..348b947 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java @@ -31,6 +31,7 @@ import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.app.translator.RequestParameters; import org.apache.asterix.common.api.RequestReference; +import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.runtime.utils.RequestTracker; import org.apache.asterix.translator.ClientRequest; @@ -50,6 +51,9 @@ @Test public void testDelete() throws Exception { ICcApplicationContext appCtx = mock(ICcApplicationContext.class); + ExternalProperties externalProperties = mock(ExternalProperties.class); + Mockito.when(externalProperties.getRequestsArchiveSize()).thenReturn(50); + Mockito.when(appCtx.getExternalProperties()).thenReturn(externalProperties); RequestTracker tracker = new RequestTracker(appCtx); Mockito.when(appCtx.getRequestTracker()).thenReturn(tracker); // Creates a query cancellation servlet. diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.1.query.sqlpp new file mode 100644 index 0000000..aaf08c8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.1.query.sqlpp @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +-- param client_context_id=completed_requests_query + +select value "anything"; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.3.query.sqlpp new file mode 100644 index 0000000..1bfb32f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.3.query.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + -- param client_context_id=ensure_completed_query + +LET request_count = (SELECT VALUE COUNT(*) FROM completed_requests() r +WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0] +SELECT VALUE request_count > 0; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.1.adm new file mode 100644 index 0000000..ea7c79d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.1.adm @@ -0,0 +1 @@ +"anything" \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.2.adm new file mode 100644 index 0000000..f32a580 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.2.adm @@ -0,0 +1 @@ +true \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index aeb79e4..5258102 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -5752,6 +5752,11 @@ <output-dir compare="Text">jobs</output-dir> </compilation-unit> </test-case> + <test-case FilePath="misc"> + <compilation-unit name="completed_requests"> + <output-dir compare="Text">completed_requests</output-dir> + </compilation-unit> + </test-case> </test-group> <test-group name="index"> <test-group name="index/validations"> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java index 430cd2a..515c837 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java @@ -23,6 +23,24 @@ public interface IClientRequest { + enum State { + + RECEIVED("received"), + RUNNING("running"), + CANCELLED("cancelled"), + COMPLETED("completed"); + + private final String label; + + State(String label) { + this.label = label; + } + + public String getLabel() { + return label; + } + } + /** * A system wide unique id representing this {@link IClientRequest} * diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java index dc165b4e..a3ddb30 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java @@ -68,4 +68,11 @@ * @return the currently running requests */ Collection<IClientRequest> getRunningRequests(); + + /** + * Gets the recently completed requests + * + * @return the recently completed requests + */ + Collection<IClientRequest> getCompletedRequests(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java index 620e47c..1533c9f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java @@ -48,7 +48,8 @@ MAX_WEB_REQUEST_SIZE( UNSIGNED_INTEGER, StorageUtil.getIntSizeInBytes(50, StorageUtil.StorageUnit.MEGABYTE), - "The maximum accepted web request size in bytes"); + "The maximum accepted web request size in bytes"), + REQUESTS_ARCHIVE_SIZE(UNSIGNED_INTEGER, 50, "The maximum number of archived requests to maintain"); private final IOptionType type; private final Object defaultValue; @@ -67,6 +68,7 @@ case WEB_QUERYINTERFACE_PORT: case API_PORT: case ACTIVE_PORT: + case REQUESTS_ARCHIVE_SIZE: return Section.CC; case NC_API_PORT: return Section.NC; @@ -141,4 +143,8 @@ public int getMaxWebRequestSize() { return accessor.getInt(Option.MAX_WEB_REQUEST_SIZE); } + + public int getRequestsArchiveSize() { + return accessor.getInt(Option.REQUESTS_ARCHIVE_SIZE); + } } diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml index c7e7071..7b6faf0 100644 --- a/asterixdb/asterix-runtime/pom.xml +++ b/asterixdb/asterix-runtime/pom.xml @@ -200,5 +200,9 @@ <artifactId>fastutil</artifactId> <version>8.2.2</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java index a0ab559..a8749b1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.runtime.utils; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -27,16 +28,19 @@ import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.hyracks.api.exceptions.HyracksDataException; public class RequestTracker implements IRequestTracker { private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>(); private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>(); + private final CircularFifoQueue<IClientRequest> completedRequests; private final ICcApplicationContext ccAppCtx; public RequestTracker(ICcApplicationContext ccAppCtx) { this.ccAppCtx = ccAppCtx; + completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize()); } @Override @@ -84,6 +88,11 @@ return Collections.unmodifiableCollection(runningRequests.values()); } + @Override + public synchronized Collection<IClientRequest> getCompletedRequests() { + return Collections.unmodifiableCollection(new ArrayList<>(completedRequests)); + } + private void cancel(IClientRequest request) throws HyracksDataException { request.cancel(ccAppCtx); untrack(request); @@ -95,5 +104,10 @@ if (clientContextId != null) { clientIdRequests.remove(request.getClientContextId()); } + archive(request); + } + + private synchronized void archive(IClientRequest request) { + completedRequests.add(request); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3301 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I3f47d523c683c3879ec52ce5bdaf16ce338e8e46 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
