Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/3115

Change subject: WIP
......................................................................

WIP

Change-Id: I80d786899d6dd5970c8faf24041bc60da881a365
---
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
7 files changed, 323 insertions(+), 0 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/15/3115/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java
new file mode 100644
index 0000000..897ddda
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class JobSummariesDatasource extends FunctionDataSource {
+
+    private static final DataSourceId JOB_SUMMARIES_DATASOURCE_ID = new 
DataSourceId(
+            JobSummariesRewriter.JOBSUMMARIES.getNamespace(), 
JobSummariesRewriter.JOBSUMMARIES.getName());
+
+    public JobSummariesDatasource(INodeDomain domain) throws 
AlgebricksException {
+        super(JOB_SUMMARIES_DATASOURCE_ID, domain);
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        return new JobSummariesFunction(locations);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java
new file mode 100644
index 0000000..3ab2f7e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static 
org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.GetJobSummariesRequest;
+import org.apache.asterix.app.message.GetJobSummariesResponse;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class JobSummariesFunction extends AbstractDatasourceFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    public JobSummariesFunction(AlgebricksAbsolutePartitionConstraint 
locations) {
+        super(locations);
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, 
int partition)
+            throws HyracksDataException {
+        INCServiceContext serviceCtx = 
ctx.getJobletContext().getServiceContext();
+        INCMessageBroker messageBroker = (INCMessageBroker) 
serviceCtx.getMessageBroker();
+        MessageFuture messageFuture = messageBroker.registerMessageFuture();
+        GetJobSummariesRequest request =
+                new GetJobSummariesRequest(serviceCtx.getNodeId(), 
messageFuture.getFutureId());
+        try {
+            messageBroker.sendMessageToPrimaryCC(request);
+            GetJobSummariesResponse response =
+                    (GetJobSummariesResponse) 
messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            final String[] summaries = response.getSummaries();
+            return new JobSummariesReader(summaries);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e); // TODO(tillw) Does this 
make sense?
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java
new file mode 100644
index 0000000..d5a3272
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+
+public class JobSummariesReader extends FunctionReader {
+
+    private final String[] summaries;
+    private int pos = 0;
+
+    public JobSummariesReader(String[] summaries) {
+        this.summaries = summaries;
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return pos < summaries.length;
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        CharArrayRecord record = new CharArrayRecord();
+        record.set(summaries[pos++]);
+        record.endRecord();
+        return record;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java
new file mode 100644
index 0000000..7b535f1
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class JobSummariesRewriter extends FunctionRewriter {
+
+    public static final FunctionIdentifier JOBSUMMARIES =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "jobs", 0);
+    public static final JobSummariesRewriter INSTANCE = new 
JobSummariesRewriter(JOBSUMMARIES);
+
+    private JobSummariesRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    protected FunctionDataSource toDatasource(IOptimizationContext context, 
AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        return new JobSummariesDatasource(context.getComputationNodeDomain());
+    }
+
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java
new file mode 100644
index 0000000..57dc514
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.work.GetJobSummariesJSONWork;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+public class GetJobSummariesRequest implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final long reqId;
+
+    public GetJobSummariesRequest(String nodeId, long reqId) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        ClusterControllerService ccs = (ClusterControllerService) 
appCtx.getServiceContext().getControllerService();
+        GetJobSummariesJSONWork gjse = new 
GetJobSummariesJSONWork(ccs.getJobManager());
+        try {
+            ccs.getWorkQueue().scheduleAndSync(gjse);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure getting job summaries", e);
+            return; // TODO(tillw) return or throw?
+        }
+        final ArrayNode gjseSummaries = gjse.getSummaries();
+        final int size = gjseSummaries.size();
+        String[] summaries = new String[size];
+        for (int i = 0; i < size; ++i) {
+            summaries[i] = gjseSummaries.get(i).toString();
+        }
+        GetJobSummariesResponse response = new GetJobSummariesResponse(reqId, 
summaries);
+        CCMessageBroker messageBroker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java
new file mode 100644
index 0000000..00345a6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GetJobSummariesResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final String[] summaries;
+
+    public GetJobSummariesResponse(long reqId, String[] summaries) {
+        this.reqId = reqId;
+        this.summaries = summaries;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public String[] getSummaries() {
+        return summaries;
+    }
+
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 83ceec7..937d72e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -21,6 +21,7 @@
 import org.apache.asterix.app.function.DatasetResourcesRewriter;
 import org.apache.asterix.app.function.DatasetRewriter;
 import org.apache.asterix.app.function.FeedRewriter;
+import org.apache.asterix.app.function.JobSummariesRewriter;
 import org.apache.asterix.app.function.PingRewriter;
 import org.apache.asterix.app.function.StorageComponentsRewriter;
 import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -54,6 +55,11 @@
                 (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, 
true);
         BuiltinFunctions.addUnnestFun(PingRewriter.PING, true);
         BuiltinFunctions.addDatasourceFunction(PingRewriter.PING, 
PingRewriter.INSTANCE);
+        // job-summaries function
+        BuiltinFunctions.addPrivateFunction(JobSummariesRewriter.JOBSUMMARIES,
+                (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, 
true);
+        BuiltinFunctions.addUnnestFun(JobSummariesRewriter.JOBSUMMARIES, true);
+        
BuiltinFunctions.addDatasourceFunction(JobSummariesRewriter.JOBSUMMARIES, 
JobSummariesRewriter.INSTANCE);
     }
 
     private MetadataBuiltinFunctions() {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3115
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I80d786899d6dd5970c8faf24041bc60da881a365
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>

Reply via email to