Till Westmann has submitted this change and it was merged. Change subject: [NO ISSUE] Add internal function jobs() to retrieve job information ......................................................................
[NO ISSUE] Add internal function jobs() to retrieve job information - user model changes: added function jobs() - storage format changes: no - interface changes: no Change-Id: I80d786899d6dd5970c8faf24041bc60da881a365 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3115 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> --- A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 10 files changed, 357 insertions(+), 0 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Murtadha Hubail: Looks good to me, approved Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java new file mode 100644 index 0000000..b92dc55 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesDatasource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.metadata.api.IDatasourceFunction; +import org.apache.asterix.metadata.declared.DataSourceId; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; + +public class JobSummariesDatasource extends FunctionDataSource { + + private static final DataSourceId JOB_SUMMARIES_DATASOURCE_ID = new DataSourceId( + JobSummariesRewriter.JOBSUMMARIES.getNamespace(), JobSummariesRewriter.JOBSUMMARIES.getName()); + + public JobSummariesDatasource(INodeDomain domain) throws AlgebricksException { + super(JOB_SUMMARIES_DATASOURCE_ID, domain); + } + + @Override + protected IDatasourceFunction createFunction(MetadataProvider metadataProvider, + AlgebricksAbsolutePartitionConstraint locations) { + return new JobSummariesFunction(AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations())); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java new file mode 100644 index 0000000..82062e2 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesFunction.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; + +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.app.message.GetJobSummariesRequest; +import org.apache.asterix.app.message.GetJobSummariesResponse; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.metadata.declared.AbstractDatasourceFunction; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class JobSummariesFunction extends AbstractDatasourceFunction { + + private static final Logger LOGGER = LogManager.getLogger(); + + private static final long serialVersionUID = 1L; + + public JobSummariesFunction(AlgebricksAbsolutePartitionConstraint locations) { + super(locations); + } + + @Override + public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) + throws HyracksDataException { + INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); + INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker(); + MessageFuture messageFuture = messageBroker.registerMessageFuture(); + final long futureId = messageFuture.getFutureId(); + GetJobSummariesRequest request = new GetJobSummariesRequest(serviceCtx.getNodeId(), futureId); + try { + messageBroker.sendMessageToPrimaryCC(request); + GetJobSummariesResponse response = + (GetJobSummariesResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + return new JobSummariesReader(response.getSummaries()); + } catch (Exception e) { + LOGGER.warn("Could no retrieve jobs info", e); + throw HyracksDataException.create(e); + } finally { + messageBroker.deregisterMessageFuture(futureId); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java new file mode 100644 index 0000000..d5a3272 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import java.io.IOException; + +import org.apache.asterix.external.api.IRawRecord; +import org.apache.asterix.external.input.record.CharArrayRecord; + +public class JobSummariesReader extends FunctionReader { + + private final String[] summaries; + private int pos = 0; + + public JobSummariesReader(String[] summaries) { + this.summaries = summaries; + } + + @Override + public boolean hasNext() throws Exception { + return pos < summaries.length; + } + + @Override + public IRawRecord<char[]> next() throws IOException, InterruptedException { + CharArrayRecord record = new CharArrayRecord(); + record.set(summaries[pos++]); + record.endRecord(); + return record; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java new file mode 100644 index 0000000..ef753b4 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/JobSummariesRewriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.common.functions.FunctionConstants; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +public class JobSummariesRewriter extends FunctionRewriter { + + public static final FunctionIdentifier JOBSUMMARIES = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "jobs", 0); + public static final JobSummariesRewriter INSTANCE = new JobSummariesRewriter(JOBSUMMARIES); + + private JobSummariesRewriter(FunctionIdentifier functionId) { + super(functionId); + } + + @Override + protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f) + throws AlgebricksException { + return new JobSummariesDatasource(context.getComputationNodeDomain()); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java new file mode 100644 index 0000000..b885b13 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesRequest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.work.GetJobSummariesJSONWork; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.databind.node.ArrayNode; + +public class GetJobSummariesRequest implements ICcAddressedMessage { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final long reqId; + + public GetJobSummariesRequest(String nodeId, long reqId) { + this.nodeId = nodeId; + this.reqId = reqId; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); + GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs.getJobManager()); + try { + ccs.getWorkQueue().scheduleAndSync(gjse); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure getting jobs", e); + throw HyracksDataException.create(e); + } + final ArrayNode gjseSummaries = gjse.getSummaries(); + final int size = gjseSummaries.size(); + String[] summaries = new String[size]; + for (int i = 0; i < size; ++i) { + summaries[i] = gjseSummaries.get(i).toString(); + } + GetJobSummariesResponse response = new GetJobSummariesResponse(reqId, summaries); + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + try { + messageBroker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure sending response to nc", e); + throw HyracksDataException.create(e); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java new file mode 100644 index 0000000..7554d53 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/GetJobSummariesResponse.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class GetJobSummariesResponse implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private final long reqId; + private final String[] summaries; + + public GetJobSummariesResponse(long reqId, String[] summaries) { + this.reqId = reqId; + this.summaries = summaries; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(reqId); + if (future != null) { + future.complete(this); + } + } + + public String[] getSummaries() { + return summaries; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java index 3407d59..a8314ec 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java @@ -22,6 +22,7 @@ import org.apache.asterix.app.function.DatasetResourcesRewriter; import org.apache.asterix.app.function.DatasetRewriter; import org.apache.asterix.app.function.FeedRewriter; +import org.apache.asterix.app.function.JobSummariesRewriter; import org.apache.asterix.app.function.PingRewriter; import org.apache.asterix.app.function.StorageComponentsRewriter; import org.apache.asterix.om.functions.BuiltinFunctions; @@ -60,6 +61,11 @@ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); BuiltinFunctions.addUnnestFun(ActiveRequestsRewriter.ACTIVE_REQUESTS, true); BuiltinFunctions.addDatasourceFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, ActiveRequestsRewriter.INSTANCE); + // job-summaries function + BuiltinFunctions.addPrivateFunction(JobSummariesRewriter.JOBSUMMARIES, + (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); + BuiltinFunctions.addUnnestFun(JobSummariesRewriter.JOBSUMMARIES, true); + BuiltinFunctions.addDatasourceFunction(JobSummariesRewriter.JOBSUMMARIES, JobSummariesRewriter.INSTANCE); } private MetadataBuiltinFunctions() { diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp new file mode 100644 index 0000000..e8e898f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/jobs/jobs.1.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +SET `import-private-functions` "true"; +select value j +from jobs() j +order by j.`start-time` desc +limit 1; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex new file mode 100644 index 0000000..914f6cb --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.1.regex @@ -0,0 +1 @@ +/\{ "type": "job-summary", "job-id": "JID:.*", "create-time": \d*, "start-time": \d*, "end-time": \d*, "status": "RUNNING" \}/ diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index ceee5f9..f753bd0 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -4987,6 +4987,11 @@ <output-dir compare="Text">active_requests</output-dir> </compilation-unit> </test-case> + <test-case FilePath="misc"> + <compilation-unit name="jobs"> + <output-dir compare="Text">jobs</output-dir> + </compilation-unit> + </test-case> </test-group> <test-group name="index"> <test-group name="index/validations"> -- To view, visit https://asterix-gerrit.ics.uci.edu/3115 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I80d786899d6dd5970c8faf24041bc60da881a365 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org>