Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2513][FUN] Add Active_Requests Function ......................................................................
[ASTERIXDB-2513][FUN] Add Active_Requests Function - user model changes: no - storage format changes: no - interface changes: yes Details: - Add a datasource function (active_requests) which returns the active jobs that the user specified client_context_id for. - This function runs on a single NC and uses messaging to get the currently running jobs from CC. - Currently, the function returns the following fields: -- clientContextId: the user specified clientContextId. -- requestTime: a timestamp at which the request reference was created. -- jobId: optionally, the job id that belongs to this request. - The function may be improved later to return all jobs and it may return additional fields such as (request uuid, statement, executionTime, elapsedTime, nodeAddress, userAgent, etc..) - Add test case. - Do not allow cancellation test to cancel queries with clientContextId to avoid intermittent failures. Change-Id: I95962742161ed18c4cf2e09c8541c8ad3b35356c Reviewed-on: https://asterix-gerrit.ics.uci.edu/3136 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> --- M asterixdb/asterix-algebra/pom.xml M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore A asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex A asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java M hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java 25 files changed, 479 insertions(+), 4 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml index f9f5677..369d93b 100644 --- a/asterixdb/asterix-algebra/pom.xml +++ b/asterixdb/asterix-algebra/pom.xml @@ -242,5 +242,9 @@ <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-util</artifactId> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java index a9bd856..ec44d60 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -20,10 +20,15 @@ import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.om.base.ADateTime; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.util.JSONUtil; + +import com.fasterxml.jackson.databind.node.ObjectNode; public abstract class BaseClientRequest implements IClientRequest { protected final IStatementExecutorContext ctx; + protected final long requestTime = System.currentTimeMillis(); protected final String contextId; private boolean complete; @@ -50,5 +55,21 @@ doCancel(appCtx); } + @Override + public String toJson() { + try { + return JSONUtil.convertNode(asJson()); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + protected ObjectNode asJson() { + ObjectNode json = JSONUtil.createObject(); + json.put("requestTime", new ADateTime(requestTime).toSimpleString()); + json.put("clientContextID", contextId); + return json; + } + protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java index 520ce03..81714ca 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java @@ -22,6 +22,9 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.util.JSONUtil; + +import com.fasterxml.jackson.databind.node.ObjectNode; public class ClientJobRequest extends BaseClientRequest { private final JobId jobId; @@ -41,4 +44,15 @@ } ctx.remove(contextId); } + + @Override + public String toJson() { + final ObjectNode jsonNode = super.asJson(); + jsonNode.put("jobId", jobId.toString()); + try { + return JSONUtil.convertNode(jsonNode); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java index 78080f3..29e7bda 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java @@ -19,6 +19,8 @@ package org.apache.asterix.translator; +import java.util.Map; + import org.apache.asterix.common.api.IClientRequest; /** @@ -52,4 +54,9 @@ * a user provided client context id. */ IClientRequest remove(String clientContextId); + + /** + * @return The currently running requests + */ + Map<String, IClientRequest> getRunningRequests(); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java index c4e2859..a2a7906 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.translator; +import java.util.Collections; +import java.util.Map; + import org.apache.asterix.common.api.IClientRequest; public class NoOpStatementExecutorContext implements IStatementExecutorContext { @@ -42,4 +45,8 @@ return null; } + @Override + public Map<String, IClientRequest> getRunningRequests() { + return Collections.emptyMap(); + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java index a4da189..136fda7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.api.http.ctx; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -42,4 +43,9 @@ public IClientRequest remove(String clientContextId) { return runningQueries.remove(clientContextId); } + + @Override + public Map<String, IClientRequest> getRunningRequests() { + return Collections.unmodifiableMap(runningQueries); + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java new file mode 100644 index 0000000..6d32763 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.metadata.api.IDatasourceFunction; +import org.apache.asterix.metadata.declared.DataSourceId; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; + +public class ActiveRequestsDatasource extends FunctionDataSource { + + private static final DataSourceId ACTIVE_REQUESTS_DATASOURCE_ID = new DataSourceId( + ActiveRequestsRewriter.ACTIVE_REQUESTS.getNamespace(), ActiveRequestsRewriter.ACTIVE_REQUESTS.getName()); + + public ActiveRequestsDatasource(INodeDomain domain) throws AlgebricksException { + super(ACTIVE_REQUESTS_DATASOURCE_ID, domain); + } + + @Override + protected IDatasourceFunction createFunction(MetadataProvider metadataProvider, + AlgebricksAbsolutePartitionConstraint locations) { + AlgebricksAbsolutePartitionConstraint randomLocation = + AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations()); + return new ActiveRequestsFunction(randomLocation); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java new file mode 100644 index 0000000..7c621f1 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; + +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.app.message.ActiveRequestsRequest; +import org.apache.asterix.app.message.ActiveRequestsResponse; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.metadata.declared.AbstractDatasourceFunction; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ActiveRequestsFunction extends AbstractDatasourceFunction { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + + public ActiveRequestsFunction(AlgebricksAbsolutePartitionConstraint locations) { + super(locations); + } + + @Override + public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) + throws HyracksDataException { + INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); + INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker(); + MessageFuture messageFuture = messageBroker.registerMessageFuture(); + long futureId = messageFuture.getFutureId(); + ActiveRequestsRequest request = new ActiveRequestsRequest(serviceCtx.getNodeId(), futureId); + try { + messageBroker.sendMessageToPrimaryCC(request); + ActiveRequestsResponse response = + (ActiveRequestsResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + return new ActiveRequestsReader(response.getRequests()); + } catch (Exception e) { + LOGGER.warn("Could not retrieve active requests", e); + throw HyracksDataException.create(e); + } finally { + messageBroker.deregisterMessageFuture(futureId); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java new file mode 100644 index 0000000..9c9ebd9 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import java.io.IOException; + +import org.apache.asterix.external.api.IRawRecord; +import org.apache.asterix.external.input.record.CharArrayRecord; + +public class ActiveRequestsReader extends FunctionReader { + + private final String[] activeRequests; + private CharArrayRecord record; + private int recordIndex; + + public ActiveRequestsReader(String[] activeRequests) { + this.activeRequests = activeRequests; + record = new CharArrayRecord(); + } + + @Override + public boolean hasNext() throws Exception { + return recordIndex < activeRequests.length; + } + + @Override + public IRawRecord<char[]> next() throws IOException { + record.reset(); + record.append((activeRequests[recordIndex++]).toCharArray()); + record.endRecord(); + return record; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java new file mode 100644 index 0000000..b0daabb --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.common.functions.FunctionConstants; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +public class ActiveRequestsRewriter extends FunctionRewriter { + + public static final FunctionIdentifier ACTIVE_REQUESTS = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "active-requests", 0); + public static final ActiveRequestsRewriter INSTANCE = new ActiveRequestsRewriter(ACTIVE_REQUESTS); + + private ActiveRequestsRewriter(FunctionIdentifier functionId) { + super(functionId); + } + + @Override + protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f) + throws AlgebricksException { + return new ActiveRequestsDatasource(context.getComputationNodeDomain()); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java new file mode 100644 index 0000000..9d15131 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import java.util.Collection; + +import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.hyracks.bootstrap.CCApplication; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.asterix.translator.IStatementExecutorContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ActiveRequestsRequest implements ICcAddressedMessage { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final long reqId; + + public ActiveRequestsRequest(String nodeId, long reqId) { + this.nodeId = nodeId; + this.reqId = reqId; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); + CCApplication application = (CCApplication) ccs.getApplication(); + IStatementExecutorContext executorsCtx = application.getStatementExecutorContext(); + final Collection<IClientRequest> runningRequests = executorsCtx.getRunningRequests().values(); + final String[] requests = runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new); + ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, requests); + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + try { + messageBroker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure sending response to nc", e); + } + } +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java new file mode 100644 index 0000000..0bf7976 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class ActiveRequestsResponse implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private final long reqId; + private final String[] requests; + + public ActiveRequestsResponse(long reqId, String[] requests) { + this.reqId = reqId; + this.requests = requests; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(reqId); + if (future != null) { + future.complete(this); + } + } + + public String[] getRequests() { + return requests; + } +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java index 83ceec7..3407d59 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.util; +import org.apache.asterix.app.function.ActiveRequestsRewriter; import org.apache.asterix.app.function.DatasetResourcesRewriter; import org.apache.asterix.app.function.DatasetRewriter; import org.apache.asterix.app.function.FeedRewriter; @@ -54,6 +55,11 @@ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); BuiltinFunctions.addUnnestFun(PingRewriter.PING, true); BuiltinFunctions.addDatasourceFunction(PingRewriter.PING, PingRewriter.INSTANCE); + // Active requests function + BuiltinFunctions.addFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, + (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); + BuiltinFunctions.addUnnestFun(ActiveRequestsRewriter.ACTIVE_REQUESTS, true); + BuiltinFunctions.addDatasourceFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, ActiveRequestsRewriter.INSTANCE); } private MetadataBuiltinFunctions() { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java index e85fedf..48d11ee 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java @@ -51,6 +51,7 @@ public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri, List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception { + cancellable = cancellable && !containsClientContextID(str); String clientContextId = UUID.randomUUID().toString(); final List<TestCase.CompilationUnit.Parameter> newParams = cancellable ? upsertParam(params, "client_context_id", ParameterTypeEnum.STRING, clientContextId) : params; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 4766639..282d83d 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -1999,6 +1999,12 @@ } } + protected static boolean containsClientContextID(String statement) { + List<Parameter> httpParams = extractParameters(statement); + return httpParams.stream().map(Parameter::getName) + .anyMatch(QueryServiceServlet.Parameter.CLIENT_ID.str()::equals); + } + private static boolean isCancellable(String type) { return !NON_CANCELLABLE.contains(type); } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp new file mode 100644 index 0000000..ec96a51 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +-- param client_context_id=sleep_async_query +-- handlevariable=status + +select value sleep("result", 10000); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp new file mode 100644 index 0000000..3a29ef2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + -- param client_context_id=ensure_running_query + -- polltimeoutsecs=15 +SELECT VALUE rqst FROM active_requests() rqst +WHERE rqst.clientContextID = 'sleep_async_query'; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp new file mode 100644 index 0000000..1881f1a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + -- param client_context_id=ensure_completed_query + -- polltimeoutsecs=15 +SELECT VALUE rqst FROM active_requests() rqst +WHERE rqst.clientContextID = 'sleep_async_query'; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex new file mode 100644 index 0000000..92f4746 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex @@ -0,0 +1 @@ +/\{ "clientContextID": "sleep_async_query", "jobId": "JID:.*", "requestTime": ".*" \}/ \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 8156244..ceee5f9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -4982,6 +4982,11 @@ <output-dir compare="Text">p_sort_num_samples</output-dir> </compilation-unit> </test-case> + <test-case FilePath="misc"> + <compilation-unit name="active_requests"> + <output-dir compare="Text">active_requests</output-dir> + </compilation-unit> + </test-case> </test-group> <test-group name="index"> <test-group name="index/validations"> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java index 30759de..53771d5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java @@ -35,4 +35,9 @@ * @throws HyracksDataException */ void cancel(ICcApplicationContext appCtx) throws HyracksDataException; + + /** + * @return A json representation of this request + */ + String toJson(); } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java index efcb828..62e5c87 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java @@ -118,11 +118,15 @@ return sbder.toString(); } - public String toSimpleString() throws IOException { + public String toSimpleString() { StringBuilder sbder = new StringBuilder(); - GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 0, sbder, - GregorianCalendarSystem.Fields.YEAR, GregorianCalendarSystem.Fields.MILLISECOND, true); - return sbder.toString(); + try { + GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 0, sbder, + GregorianCalendarSystem.Fields.YEAR, GregorianCalendarSystem.Fields.MILLISECOND, true); + return sbder.toString(); + } catch (IOException e) { + throw new IllegalStateException(e); + } } public long getChrononTime() { diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java index fa4a707..b53f4d3 100644 --- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java +++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java @@ -19,6 +19,7 @@ package org.apache.hyracks.algebricks.common.constraints; import java.util.Arrays; +import java.util.Random; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.exceptions.ErrorCode; @@ -33,6 +34,11 @@ Arrays.sort(sortedLocations); } + public static AlgebricksAbsolutePartitionConstraint randomLocation(String[] locations) { + int randomIndex = new Random().nextInt(locations.length); + return new AlgebricksAbsolutePartitionConstraint(new String[] { locations[randomIndex] }); + } + @Override public PartitionConstraintType getPartitionConstraintType() { return PartitionConstraintType.ABSOLUTE; -- To view, visit https://asterix-gerrit.ics.uci.edu/3136 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I95962742161ed18c4cf2e09c8541c8ad3b35356c Gerrit-PatchSet: 8 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org>