Till Westmann has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/3115
Change subject: WIP ...................................................................... WIP Change-Id: I80d786899d6dd5970c8faf24041bc60da881a365 --- A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java 7 files changed, 323 insertions(+), 0 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/15/3115/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java new file mode 100644 index 0000000..897ddda --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.metadata.api.IDatasourceFunction; +import org.apache.asterix.metadata.declared.DataSourceId; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; + +public class JobSummariesDatasource extends FunctionDataSource { + + private static final DataSourceId JOB_SUMMARIES_DATASOURCE_ID = new DataSourceId( + JobSummariesRewriter.JOBSUMMARIES.getNamespace(), JobSummariesRewriter.JOBSUMMARIES.getName()); + + public JobSummariesDatasource(INodeDomain domain) throws AlgebricksException { + super(JOB_SUMMARIES_DATASOURCE_ID, domain); + } + + @Override + protected IDatasourceFunction createFunction(MetadataProvider metadataProvider, + AlgebricksAbsolutePartitionConstraint locations) { + return new JobSummariesFunction(locations); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java new file mode 100644 index 0000000..3ab2f7e --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; + +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.app.message.GetJobSummariesRequest; +import org.apache.asterix.app.message.GetJobSummariesResponse; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.metadata.declared.AbstractDatasourceFunction; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class JobSummariesFunction extends AbstractDatasourceFunction { + + private static final long serialVersionUID = 1L; + + public JobSummariesFunction(AlgebricksAbsolutePartitionConstraint locations) { + super(locations); + } + + @Override + public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) + throws HyracksDataException { + INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); + INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker(); + MessageFuture messageFuture = messageBroker.registerMessageFuture(); + GetJobSummariesRequest request = + new GetJobSummariesRequest(serviceCtx.getNodeId(), messageFuture.getFutureId()); + try { + messageBroker.sendMessageToPrimaryCC(request); + GetJobSummariesResponse response = + (GetJobSummariesResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + final String[] summaries = response.getSummaries(); + return new JobSummariesReader(summaries); + } catch (Exception e) { + throw HyracksDataException.create(e); // TODO(tillw) Does this make sense? + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java new file mode 100644 index 0000000..d5a3272 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import java.io.IOException; + +import org.apache.asterix.external.api.IRawRecord; +import org.apache.asterix.external.input.record.CharArrayRecord; + +public class JobSummariesReader extends FunctionReader { + + private final String[] summaries; + private int pos = 0; + + public JobSummariesReader(String[] summaries) { + this.summaries = summaries; + } + + @Override + public boolean hasNext() throws Exception { + return pos < summaries.length; + } + + @Override + public IRawRecord<char[]> next() throws IOException, InterruptedException { + CharArrayRecord record = new CharArrayRecord(); + record.set(summaries[pos++]); + record.endRecord(); + return record; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java new file mode 100644 index 0000000..7b535f1 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.common.functions.FunctionConstants; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +public class JobSummariesRewriter extends FunctionRewriter { + + public static final FunctionIdentifier JOBSUMMARIES = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "jobs", 0); + public static final JobSummariesRewriter INSTANCE = new JobSummariesRewriter(JOBSUMMARIES); + + private JobSummariesRewriter(FunctionIdentifier functionId) { + super(functionId); + } + + @Override + protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f) + throws AlgebricksException { + return new JobSummariesDatasource(context.getComputationNodeDomain()); + } + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java new file mode 100644 index 0000000..57dc514 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.work.GetJobSummariesJSONWork; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.databind.node.ArrayNode; + +public class GetJobSummariesRequest implements ICcAddressedMessage { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final long reqId; + + public GetJobSummariesRequest(String nodeId, long reqId) { + this.nodeId = nodeId; + this.reqId = reqId; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); + GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs.getJobManager()); + try { + ccs.getWorkQueue().scheduleAndSync(gjse); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure getting job summaries", e); + return; // TODO(tillw) return or throw? + } + final ArrayNode gjseSummaries = gjse.getSummaries(); + final int size = gjseSummaries.size(); + String[] summaries = new String[size]; + for (int i = 0; i < size; ++i) { + summaries[i] = gjseSummaries.get(i).toString(); + } + GetJobSummariesResponse response = new GetJobSummariesResponse(reqId, summaries); + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + try { + messageBroker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure sending response to nc", e); + } + } + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java new file mode 100644 index 0000000..00345a6 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class GetJobSummariesResponse implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private final long reqId; + private final String[] summaries; + + public GetJobSummariesResponse(long reqId, String[] summaries) { + this.reqId = reqId; + this.summaries = summaries; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(reqId); + if (future != null) { + future.complete(this); + } + } + + public String[] getSummaries() { + return summaries; + } + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java index 83ceec7..937d72e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java @@ -21,6 +21,7 @@ import org.apache.asterix.app.function.DatasetResourcesRewriter; import org.apache.asterix.app.function.DatasetRewriter; import org.apache.asterix.app.function.FeedRewriter; +import org.apache.asterix.app.function.JobSummariesRewriter; import org.apache.asterix.app.function.PingRewriter; import org.apache.asterix.app.function.StorageComponentsRewriter; import org.apache.asterix.om.functions.BuiltinFunctions; @@ -54,6 +55,11 @@ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); BuiltinFunctions.addUnnestFun(PingRewriter.PING, true); BuiltinFunctions.addDatasourceFunction(PingRewriter.PING, PingRewriter.INSTANCE); + // job-summaries function + BuiltinFunctions.addPrivateFunction(JobSummariesRewriter.JOBSUMMARIES, + (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); + BuiltinFunctions.addUnnestFun(JobSummariesRewriter.JOBSUMMARIES, true); + BuiltinFunctions.addDatasourceFunction(JobSummariesRewriter.JOBSUMMARIES, JobSummariesRewriter.INSTANCE); } private MetadataBuiltinFunctions() { -- To view, visit https://asterix-gerrit.ics.uci.edu/3115 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I80d786899d6dd5970c8faf24041bc60da881a365 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <[email protected]>
