Murtadha Hubail has submitted this change and it was merged.

Change subject: [ASTERIXDB-2513][FUN] Add Active_Requests Function
......................................................................


[ASTERIXDB-2513][FUN] Add Active_Requests Function

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Add a datasource function (active_requests) which
  returns the active jobs that the user specified
  client_context_id for.
- This function runs on a single NC and uses messaging
  to get the currently running jobs from CC.
- Currently, the function returns the following fields:
  -- clientContextId: the user specified clientContextId.
  -- requestTime: a timestamp at which the request reference
                  was created.
  -- jobId: optionally, the job id that belongs to this request.
- The function may be improved later to return all jobs and it may
  return additional fields such as (request uuid, statement,
  executionTime, elapsedTime, nodeAddress, userAgent, etc..)
- Add test case.
- Do not allow cancellation test to cancel queries with
  clientContextId to avoid intermittent failures.

Change-Id: I95962742161ed18c4cf2e09c8541c8ad3b35356c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3136
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
---
M asterixdb/asterix-algebra/pom.xml
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
M 
hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
25 files changed, 479 insertions(+), 4 deletions(-)

Approvals:
  Jenkins: Verified; No violations found; ; Verified
  Michael Blow: Looks good to me, approved

Objections:
  Anon. E. Moose #1000171: Violations found



diff --git a/asterixdb/asterix-algebra/pom.xml 
b/asterixdb/asterix-algebra/pom.xml
index f9f5677..369d93b 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -242,5 +242,9 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-util</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index a9bd856..ec44d60 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,10 +20,15 @@
 
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.ADateTime;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public abstract class BaseClientRequest implements IClientRequest {
     protected final IStatementExecutorContext ctx;
+    protected final long requestTime = System.currentTimeMillis();
     protected final String contextId;
     private boolean complete;
 
@@ -50,5 +55,21 @@
         doCancel(appCtx);
     }
 
+    @Override
+    public String toJson() {
+        try {
+            return JSONUtil.convertNode(asJson());
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    protected ObjectNode asJson() {
+        ObjectNode json = JSONUtil.createObject();
+        json.put("requestTime", new ADateTime(requestTime).toSimpleString());
+        json.put("clientContextID", contextId);
+        return json;
+    }
+
     protected abstract void doCancel(ICcApplicationContext appCtx) throws 
HyracksDataException;
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
index 520ce03..81714ca 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
@@ -22,6 +22,9 @@
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class ClientJobRequest extends BaseClientRequest {
     private final JobId jobId;
@@ -41,4 +44,15 @@
         }
         ctx.remove(contextId);
     }
+
+    @Override
+    public String toJson() {
+        final ObjectNode jsonNode = super.asJson();
+        jsonNode.put("jobId", jobId.toString());
+        try {
+            return JSONUtil.convertNode(jsonNode);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
index 78080f3..29e7bda 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -19,6 +19,8 @@
 
 package org.apache.asterix.translator;
 
+import java.util.Map;
+
 import org.apache.asterix.common.api.IClientRequest;
 
 /**
@@ -52,4 +54,9 @@
      *            a user provided client context id.
      */
     IClientRequest remove(String clientContextId);
+
+    /**
+     * @return The currently running requests
+     */
+    Map<String, IClientRequest> getRunningRequests();
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
index c4e2859..a2a7906 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.translator;
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.asterix.common.api.IClientRequest;
 
 public class NoOpStatementExecutorContext implements IStatementExecutorContext 
{
@@ -42,4 +45,8 @@
         return null;
     }
 
+    @Override
+    public Map<String, IClientRequest> getRunningRequests() {
+        return Collections.emptyMap();
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
index a4da189..136fda7 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.api.http.ctx;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -42,4 +43,9 @@
     public IClientRequest remove(String clientContextId) {
         return runningQueries.remove(clientContextId);
     }
+
+    @Override
+    public Map<String, IClientRequest> getRunningRequests() {
+        return Collections.unmodifiableMap(runningQueries);
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
new file mode 100644
index 0000000..6d32763
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class ActiveRequestsDatasource extends FunctionDataSource {
+
+    private static final DataSourceId ACTIVE_REQUESTS_DATASOURCE_ID = new 
DataSourceId(
+            ActiveRequestsRewriter.ACTIVE_REQUESTS.getNamespace(), 
ActiveRequestsRewriter.ACTIVE_REQUESTS.getName());
+
+    public ActiveRequestsDatasource(INodeDomain domain) throws 
AlgebricksException {
+        super(ACTIVE_REQUESTS_DATASOURCE_ID, domain);
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        AlgebricksAbsolutePartitionConstraint randomLocation =
+                
AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations());
+        return new ActiveRequestsFunction(randomLocation);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
new file mode 100644
index 0000000..7c621f1
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static 
org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.ActiveRequestsRequest;
+import org.apache.asterix.app.message.ActiveRequestsResponse;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ActiveRequestsFunction extends AbstractDatasourceFunction {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+
+    public ActiveRequestsFunction(AlgebricksAbsolutePartitionConstraint 
locations) {
+        super(locations);
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, 
int partition)
+            throws HyracksDataException {
+        INCServiceContext serviceCtx = 
ctx.getJobletContext().getServiceContext();
+        INCMessageBroker messageBroker = (INCMessageBroker) 
serviceCtx.getMessageBroker();
+        MessageFuture messageFuture = messageBroker.registerMessageFuture();
+        long futureId = messageFuture.getFutureId();
+        ActiveRequestsRequest request = new 
ActiveRequestsRequest(serviceCtx.getNodeId(), futureId);
+        try {
+            messageBroker.sendMessageToPrimaryCC(request);
+            ActiveRequestsResponse response =
+                    (ActiveRequestsResponse) 
messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            return new ActiveRequestsReader(response.getRequests());
+        } catch (Exception e) {
+            LOGGER.warn("Could not retrieve active requests", e);
+            throw HyracksDataException.create(e);
+        } finally {
+            messageBroker.deregisterMessageFuture(futureId);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
new file mode 100644
index 0000000..9c9ebd9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+
+public class ActiveRequestsReader extends FunctionReader {
+
+    private final String[] activeRequests;
+    private CharArrayRecord record;
+    private int recordIndex;
+
+    public ActiveRequestsReader(String[] activeRequests) {
+        this.activeRequests = activeRequests;
+        record = new CharArrayRecord();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return recordIndex < activeRequests.length;
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException {
+        record.reset();
+        record.append((activeRequests[recordIndex++]).toCharArray());
+        record.endRecord();
+        return record;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java
new file mode 100644
index 0000000..b0daabb
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ActiveRequestsRewriter extends FunctionRewriter {
+
+    public static final FunctionIdentifier ACTIVE_REQUESTS =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"active-requests", 0);
+    public static final ActiveRequestsRewriter INSTANCE = new 
ActiveRequestsRewriter(ACTIVE_REQUESTS);
+
+    private ActiveRequestsRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    protected FunctionDataSource toDatasource(IOptimizationContext context, 
AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        return new 
ActiveRequestsDatasource(context.getComputationNodeDomain());
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
new file mode 100644
index 0000000..9d15131
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Collection;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ActiveRequestsRequest implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final long reqId;
+
+    public ActiveRequestsRequest(String nodeId, long reqId) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        ClusterControllerService ccs = (ClusterControllerService) 
appCtx.getServiceContext().getControllerService();
+        CCApplication application = (CCApplication) ccs.getApplication();
+        IStatementExecutorContext executorsCtx = 
application.getStatementExecutorContext();
+        final Collection<IClientRequest> runningRequests = 
executorsCtx.getRunningRequests().values();
+        final String[] requests = 
runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new);
+        ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, 
requests);
+        CCMessageBroker messageBroker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
new file mode 100644
index 0000000..0bf7976
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ActiveRequestsResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final String[] requests;
+
+    public ActiveRequestsResponse(long reqId, String[] requests) {
+        this.reqId = reqId;
+        this.requests = requests;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public String[] getRequests() {
+        return requests;
+    }
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 83ceec7..3407d59 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.util;
 
+import org.apache.asterix.app.function.ActiveRequestsRewriter;
 import org.apache.asterix.app.function.DatasetResourcesRewriter;
 import org.apache.asterix.app.function.DatasetRewriter;
 import org.apache.asterix.app.function.FeedRewriter;
@@ -54,6 +55,11 @@
                 (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, 
true);
         BuiltinFunctions.addUnnestFun(PingRewriter.PING, true);
         BuiltinFunctions.addDatasourceFunction(PingRewriter.PING, 
PingRewriter.INSTANCE);
+        // Active requests function
+        BuiltinFunctions.addFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS,
+                (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, 
true);
+        BuiltinFunctions.addUnnestFun(ActiveRequestsRewriter.ACTIVE_REQUESTS, 
true);
+        
BuiltinFunctions.addDatasourceFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, 
ActiveRequestsRewriter.INSTANCE);
     }
 
     private MetadataBuiltinFunctions() {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index e85fedf..48d11ee 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -51,6 +51,7 @@
     public InputStream executeQueryService(String str, 
TestCaseContext.OutputFormat fmt, URI uri,
             List<TestCase.CompilationUnit.Parameter> params, boolean 
jsonEncoded,
             Predicate<Integer> responseCodeValidator, boolean cancellable) 
throws Exception {
+        cancellable = cancellable && !containsClientContextID(str);
         String clientContextId = UUID.randomUUID().toString();
         final List<TestCase.CompilationUnit.Parameter> newParams = cancellable
                 ? upsertParam(params, "client_context_id", 
ParameterTypeEnum.STRING, clientContextId) : params;
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 4766639..282d83d 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1999,6 +1999,12 @@
         }
     }
 
+    protected static boolean containsClientContextID(String statement) {
+        List<Parameter> httpParams = extractParameters(statement);
+        return httpParams.stream().map(Parameter::getName)
+                
.anyMatch(QueryServiceServlet.Parameter.CLIENT_ID.str()::equals);
+    }
+
     private static boolean isCancellable(String type) {
         return !NON_CANCELLABLE.contains(type);
     }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
new file mode 100644
index 0000000..ec96a51
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+-- param client_context_id=sleep_async_query
+-- handlevariable=status
+
+select value sleep("result", 10000);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
new file mode 100644
index 0000000..3a29ef2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_running_query
+ -- polltimeoutsecs=15
+SELECT VALUE rqst FROM active_requests() rqst
+WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
new file mode 100644
index 0000000..1881f1a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+ -- polltimeoutsecs=15
+SELECT VALUE rqst FROM active_requests() rqst
+WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
new file mode 100644
index 0000000..92f4746
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -0,0 +1 @@
+/\{ "clientContextID": "sleep_async_query", "jobId": "JID:.*", "requestTime": 
".*" \}/
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 8156244..ceee5f9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4982,6 +4982,11 @@
         <output-dir compare="Text">p_sort_num_samples</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="misc">
+      <compilation-unit name="active_requests">
+        <output-dir compare="Text">active_requests</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="index">
     <test-group name="index/validations">
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 30759de..53771d5 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -35,4 +35,9 @@
      * @throws HyracksDataException
      */
     void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
+
+    /**
+     * @return A json representation of this request
+     */
+    String toJson();
 }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
index efcb828..62e5c87 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
@@ -118,11 +118,15 @@
         return sbder.toString();
     }
 
-    public String toSimpleString() throws IOException {
+    public String toSimpleString() {
         StringBuilder sbder = new StringBuilder();
-        
GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 
0, sbder,
-                GregorianCalendarSystem.Fields.YEAR, 
GregorianCalendarSystem.Fields.MILLISECOND, true);
-        return sbder.toString();
+        try {
+            
GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 
0, sbder,
+                    GregorianCalendarSystem.Fields.YEAR, 
GregorianCalendarSystem.Fields.MILLISECOND, true);
+            return sbder.toString();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
     }
 
     public long getChrononTime() {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
 
b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index fa4a707..b53f4d3 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.common.constraints;
 
 import java.util.Arrays;
+import java.util.Random;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -33,6 +34,11 @@
         Arrays.sort(sortedLocations);
     }
 
+    public static AlgebricksAbsolutePartitionConstraint 
randomLocation(String[] locations) {
+        int randomIndex = new Random().nextInt(locations.length);
+        return new AlgebricksAbsolutePartitionConstraint(new String[] { 
locations[randomIndex] });
+    }
+
     @Override
     public PartitionConstraintType getPartitionConstraintType() {
         return PartitionConstraintType.ABSOLUTE;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3136
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I95962742161ed18c4cf2e09c8541c8ad3b35356c
Gerrit-PatchSet: 8
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>

Reply via email to