abdullah alamoudi has submitted this change and it was merged. Change subject: Extract IPCIs out of ClusterControllerService ......................................................................
Extract IPCIs out of ClusterControllerService moving the two IPCIs out of cluster controller service is a good start to cleanup the class. In addition, this change renames queue to workQueue in NodeControllerService for consistency. Change-Id: I403e61cc054a860bef6a71fa04393f4d9c368b36 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1325 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java 5 files changed, 354 insertions(+), 350 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index a7c2a36..ef0af26 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -33,13 +33,11 @@ public enum FunctionId { GET_CLUSTER_CONTROLLER_INFO, GET_CLUSTER_TOPOLOGY, - CREATE_JOB, GET_JOB_STATUS, GET_JOB_INFO, START_JOB, GET_DATASET_DIRECTORY_SERIVICE_INFO, GET_DATASET_RESULT_STATUS, - GET_DATASET_RECORD_DESCRIPTOR, GET_DATASET_RESULT_LOCATIONS, WAIT_FOR_COMPLETION, GET_NODE_CONTROLLERS_INFO, diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java new file mode 100644 index 0000000..01c3bf5 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; +import org.apache.hyracks.api.comm.NetworkAddress; +import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobIdFactory; +import org.apache.hyracks.api.job.JobInfo; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.control.cc.work.CliDeployBinaryWork; +import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork; +import org.apache.hyracks.control.cc.work.ClusterShutdownWork; +import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork; +import org.apache.hyracks.control.cc.work.GetJobInfoWork; +import org.apache.hyracks.control.cc.work.GetJobStatusWork; +import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork; +import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork; +import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork; +import org.apache.hyracks.control.cc.work.GetResultStatusWork; +import org.apache.hyracks.control.cc.work.GetThreadDumpWork; +import org.apache.hyracks.control.cc.work.JobStartWork; +import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork; +import org.apache.hyracks.control.common.work.IPCResponder; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.api.IIPCI; +import org.apache.hyracks.ipc.exceptions.IPCException; + +class ClientInterfaceIPCI implements IIPCI { + + private static final Logger LOGGER = Logger.getLogger(ClientInterfaceIPCI.class.getName()); + private final ClusterControllerService ccs; + private final JobIdFactory jobIdFactory; + + ClientInterfaceIPCI(ClusterControllerService ccs) { + this.ccs = ccs; + jobIdFactory = new JobIdFactory(); + } + + @Override + public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, + Exception exception) { + HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload; + switch (fn.getFunctionId()) { + case GET_CLUSTER_CONTROLLER_INFO: + try { + handle.send(mid, ccs.getClusterControllerInfo(), null); + } catch (IPCException e) { + LOGGER.log(Level.WARNING, "Error sending response to GET_CLUSTER_CONTROLLER_INFO request", e); + } + break; + case GET_JOB_STATUS: + HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = + (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn; + ccs.getWorkQueue().schedule(new GetJobStatusWork(ccs, gjsf.getJobId(), + new IPCResponder<JobStatus>(handle, mid))); + break; + case GET_JOB_INFO: + HyracksClientInterfaceFunctions.GetJobInfoFunction gjif = + (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn; + ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs, gjif.getJobId(), + new IPCResponder<JobInfo>(handle, mid))); + break; + case START_JOB: + HyracksClientInterfaceFunctions.StartJobFunction sjf = + (HyracksClientInterfaceFunctions.StartJobFunction) fn; + JobId jobId = jobIdFactory.create(); + ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), + sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid))); + break; + case GET_DATASET_DIRECTORY_SERIVICE_INFO: + ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs, + new IPCResponder<NetworkAddress>(handle, mid))); + break; + case GET_DATASET_RESULT_STATUS: + HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrsf = + (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn; + ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(), + gdrsf.getResultSetId(), new IPCResponder<Status>(handle, mid))); + break; + case GET_DATASET_RESULT_LOCATIONS: + HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = + (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn; + ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, + gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(), + new IPCResponder<>(handle, mid))); + break; + case WAIT_FOR_COMPLETION: + HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = + (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn; + ccs.getWorkQueue().schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(), + new IPCResponder<>(handle, mid))); + break; + case GET_NODE_CONTROLLERS_INFO: + ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs, + new IPCResponder<>(handle, mid))); + break; + case GET_CLUSTER_TOPOLOGY: + try { + handle.send(mid, ccs.getCCContext().getClusterTopology(), null); + } catch (IPCException e) { + LOGGER.log(Level.WARNING, "Error sending response to GET_CLUSTER_TOPOLOGY request", e); + } + break; + case CLI_DEPLOY_BINARY: + HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = + (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn; + ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), + dbf.getDeploymentId(), new IPCResponder<>(handle, mid))); + break; + case CLI_UNDEPLOY_BINARY: + HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = + (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn; + ccs.getWorkQueue().schedule(new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(), + new IPCResponder<>(handle, mid))); + break; + case CLUSTER_SHUTDOWN: + HyracksClientInterfaceFunctions.ClusterShutdownFunction csf = + (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn; + ccs.getWorkQueue().schedule(new ClusterShutdownWork(ccs, + csf.isTerminateNCService(), new IPCResponder<>(handle, mid))); + break; + case GET_NODE_DETAILS_JSON: + HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf = + (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn; + ccs.getWorkQueue().schedule(new GetNodeDetailsJSONWork(ccs, gndjf.getNodeId(), + gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid))); + break; + case THREAD_DUMP: + HyracksClientInterfaceFunctions.ThreadDumpFunction tdf = + (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn; + ccs.getWorkQueue().schedule(new GetThreadDumpWork(ccs, tdf.getNode(), + new IPCResponder<String>(handle, mid))); + break; + default: + try { + handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId())); + } catch (IPCException e) { + LOGGER.log(Level.WARNING, "Error sending Unknown function response", e); + } + } + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java new file mode 100644 index 0000000..b6c9a08 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc; + +import java.util.Map; +import java.util.logging.Logger; + +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.control.cc.work.ApplicationMessageWork; +import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork; +import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork; +import org.apache.hyracks.control.cc.work.NodeHeartbeatWork; +import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork; +import org.apache.hyracks.control.cc.work.NotifyShutdownWork; +import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse; +import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse; +import org.apache.hyracks.control.cc.work.RegisterNodeWork; +import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork; +import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork; +import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork; +import org.apache.hyracks.control.cc.work.ReportProfilesWork; +import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork; +import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork; +import org.apache.hyracks.control.cc.work.TaskCompleteWork; +import org.apache.hyracks.control.cc.work.TaskFailureWork; +import org.apache.hyracks.control.cc.work.UnregisterNodeWork; +import org.apache.hyracks.control.common.ipc.CCNCFunctions; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction; +import org.apache.hyracks.control.common.work.IPCResponder; +import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.api.IIPCI; + +class ClusterControllerIPCI implements IIPCI { + private static final Logger LOGGER = Logger.getLogger(ClusterControllerIPCI.class.getName()); + private final ClusterControllerService ccs; + + ClusterControllerIPCI(ClusterControllerService ccs) { + this.ccs = ccs; + } + + @Override + public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload, + Exception exception) { + CCNCFunctions.Function fn = (Function) payload; + switch (fn.getFunctionId()) { + case REGISTER_NODE: + CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn; + ccs.getWorkQueue().schedule(new RegisterNodeWork(ccs, rnf.getNodeRegistration())); + break; + case UNREGISTER_NODE: + CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn; + ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs, unf.getNodeId())); + break; + case NODE_HEARTBEAT: + CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn; + ccs.getWorkQueue().schedule(new NodeHeartbeatWork(ccs, nhf.getNodeId(), + nhf.getHeartbeatData())); + break; + case NOTIFY_JOBLET_CLEANUP: + CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn; + ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(), + njcf.getNodeId())); + break; + case NOTIFY_DEPLOY_BINARY: + CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn; + ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(), + ndbf.getNodeId(), ndbf.getDeploymentStatus())); + break; + case REPORT_PROFILE: + CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn; + ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs, rpf.getProfiles())); + break; + case NOTIFY_TASK_COMPLETE: + CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn; + ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(), + ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics())); + break; + case NOTIFY_TASK_FAILURE: + CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn; + ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(), + ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions())); + break; + case REGISTER_PARTITION_PROVIDER: + CCNCFunctions.RegisterPartitionProviderFunction rppf = + (CCNCFunctions.RegisterPartitionProviderFunction) fn; + ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs, + rppf.getPartitionDescriptor())); + break; + case REGISTER_PARTITION_REQUEST: + CCNCFunctions.RegisterPartitionRequestFunction rprf = + (CCNCFunctions.RegisterPartitionRequestFunction) fn; + ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs, + rprf.getPartitionRequest())); + break; + case REGISTER_RESULT_PARTITION_LOCATION: + CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = + (CCNCFunctions.RegisterResultPartitionLocationFunction) fn; + ccs.getWorkQueue().schedule(new RegisterResultPartitionLocationWork(ccs, + rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(), + rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress())); + break; + case REPORT_RESULT_PARTITION_WRITE_COMPLETION: + CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrpwc = + (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn; + ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs, + rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition())); + break; + case REPORT_RESULT_PARTITION_FAILURE: + CCNCFunctions.ReportResultPartitionFailureFunction rrpf = + (CCNCFunctions.ReportResultPartitionFailureFunction) fn; + ccs.getWorkQueue().schedule(new ReportResultPartitionFailureWork(ccs, + rrpf.getJobId(), rrpf.getResultSetId(), rrpf.getPartition())); + break; + case SEND_APPLICATION_MESSAGE: + CCNCFunctions.SendApplicationMessageFunction rsf = + (CCNCFunctions.SendApplicationMessageFunction) fn; + ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs, rsf.getMessage(), + rsf.getDeploymentId(), rsf.getNodeId())); + break; + case GET_NODE_CONTROLLERS_INFO: + ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs, + new IResultCallback<Map<String, NodeControllerInfo>>() { + @Override + public void setValue(Map<String, NodeControllerInfo> result) { + new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1) + .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result)); + } + + @Override + public void setException(Exception e) { + } + })); + break; + case STATE_DUMP_RESPONSE: + CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn; + ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs, dsrf.getNodeId(), + dsrf.getStateDumpId(), dsrf.getState())); + break; + case SHUTDOWN_RESPONSE: + CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn; + ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs, sdrf.getNodeId())); + break; + case THREAD_DUMP_RESPONSE: + CCNCFunctions.ThreadDumpResponseFunction tdrf = + (CCNCFunctions.ThreadDumpResponseFunction)fn; + ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs, + tdrf.getRequestId(), tdrf.getThreadDumpJSON())); + break; + default: + LOGGER.warning("Unknown function: " + fn.getFunctionId()); + } + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index ac38523..c76534b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -39,17 +39,11 @@ import org.apache.hyracks.api.application.ICCApplicationEntryPoint; import org.apache.hyracks.api.client.ClusterControllerInfo; -import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; -import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.context.ICCContext; -import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobIdFactory; -import org.apache.hyracks.api.job.JobInfo; -import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.api.topology.ClusterTopology; import org.apache.hyracks.api.topology.TopologyDefinitionParser; @@ -58,58 +52,21 @@ import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.cc.web.WebServer; -import org.apache.hyracks.control.cc.work.ApplicationMessageWork; -import org.apache.hyracks.control.cc.work.CliDeployBinaryWork; -import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork; -import org.apache.hyracks.control.cc.work.ClusterShutdownWork; import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun; -import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork; import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork; -import org.apache.hyracks.control.cc.work.GetJobInfoWork; -import org.apache.hyracks.control.cc.work.GetJobStatusWork; -import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork; -import org.apache.hyracks.control.cc.work.GetThreadDumpWork; import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun; -import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork; -import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork; -import org.apache.hyracks.control.cc.work.GetResultStatusWork; -import org.apache.hyracks.control.cc.work.JobStartWork; -import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork; -import org.apache.hyracks.control.cc.work.NodeHeartbeatWork; -import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork; -import org.apache.hyracks.control.cc.work.NotifyShutdownWork; -import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse; -import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse; -import org.apache.hyracks.control.cc.work.RegisterNodeWork; -import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork; -import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork; -import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork; import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork; -import org.apache.hyracks.control.cc.work.ReportProfilesWork; -import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork; -import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork; import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork; -import org.apache.hyracks.control.cc.work.TaskCompleteWork; -import org.apache.hyracks.control.cc.work.TaskFailureWork; import org.apache.hyracks.control.cc.work.TriggerNCWork; -import org.apache.hyracks.control.cc.work.UnregisterNodeWork; -import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork; import org.apache.hyracks.control.common.context.ServerContext; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.controllers.IniUtils; import org.apache.hyracks.control.common.deployment.DeploymentRun; import org.apache.hyracks.control.common.ipc.CCNCFunctions; -import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function; -import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction; -import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction; import org.apache.hyracks.control.common.logs.LogFile; import org.apache.hyracks.control.common.shutdown.ShutdownRun; -import org.apache.hyracks.control.common.work.IPCResponder; -import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.WorkQueue; -import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; -import org.apache.hyracks.ipc.exceptions.IPCException; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; import org.ini4j.Ini; @@ -156,8 +113,6 @@ private final IDatasetDirectoryService datasetDirectoryService; - private final JobIdFactory jobIdFactory; - private final Map<DeploymentId, DeploymentRun> deploymentRunMap; private final Map<String, StateDumpRun> stateDumpRunMap; @@ -175,10 +130,10 @@ nodeRegistry = new LinkedHashMap<>(); ipAddressNodeNameMap = new HashMap<>(); serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot)); - IIPCI ccIPCI = new ClusterControllerIPCI(); + IIPCI ccIPCI = new ClusterControllerIPCI(this); clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI, new CCNCFunctions.SerializerDeserializer()); - IIPCI ciIPCI = new HyracksClientInterfaceIPCI(); + IIPCI ciIPCI = new ClientInterfaceIPCI(this); clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer()); webServer = new WebServer(this); @@ -208,7 +163,6 @@ ccContext = new ClusterControllerContext(topology); sweeper = new DeadNodeSweeper(); datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold); - jobIdFactory = new JobIdFactory(); deploymentRunMap = new HashMap<>(); stateDumpRunMap = new HashMap<>(); @@ -440,291 +394,6 @@ public IDatasetDirectoryService getDatasetDirectoryService() { return datasetDirectoryService; - } - - private class HyracksClientInterfaceIPCI implements IIPCI { - - @Override - public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, - Exception exception) { - HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload; - switch (fn.getFunctionId()) { - case GET_CLUSTER_CONTROLLER_INFO: { - try { - handle.send(mid, info, null); - } catch (IPCException e) { - e.printStackTrace(); - } - return; - } - - case CREATE_JOB: - break; - case GET_JOB_STATUS: { - HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = - (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn; - workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(), - new IPCResponder<JobStatus>(handle, mid))); - return; - } - - case GET_JOB_INFO: { - HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = - (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn; - workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(), - new IPCResponder<JobInfo>(handle, mid))); - return; - } - - case START_JOB: { - HyracksClientInterfaceFunctions.StartJobFunction sjf = - (HyracksClientInterfaceFunctions.StartJobFunction) fn; - JobId jobId = jobIdFactory.create(); - workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(), - sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid))); - return; - } - - case GET_DATASET_DIRECTORY_SERIVICE_INFO: { - workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this, - new IPCResponder<NetworkAddress>(handle, mid))); - return; - } - - case GET_DATASET_RESULT_STATUS: { - HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = - (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn; - workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), - gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid))); - return; - } - - case GET_DATASET_RECORD_DESCRIPTOR: - break; - case GET_DATASET_RESULT_LOCATIONS: { - HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = - (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn; - workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, - gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(), - new IPCResponder<>(handle, mid))); - return; - } - - case WAIT_FOR_COMPLETION: { - HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = - (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn; - workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(), - new IPCResponder<Object>(handle, mid))); - return; - } - - case GET_NODE_CONTROLLERS_INFO: { - workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this, - new IPCResponder<>(handle, mid))); - return; - } - - case GET_CLUSTER_TOPOLOGY: { - try { - handle.send(mid, ccContext.getClusterTopology(), null); - } catch (IPCException e) { - e.printStackTrace(); - } - return; - } - - case CLI_DEPLOY_BINARY: { - HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = - (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn; - workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(), - dbf.getDeploymentId(), new IPCResponder<>(handle, mid))); - return; - } - - case CLI_UNDEPLOY_BINARY: { - HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = - (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn; - workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(), - new IPCResponder<>(handle, mid))); - return; - } - case CLUSTER_SHUTDOWN: { - HyracksClientInterfaceFunctions.ClusterShutdownFunction csf = - (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn; - workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this, - csf.isTerminateNCService(), new IPCResponder<>(handle, mid))); - return; - } - - case GET_NODE_DETAILS_JSON: - HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf = - (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn; - workQueue.schedule(new GetNodeDetailsJSONWork(ClusterControllerService.this, gndjf.getNodeId(), - gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid))); - return; - - case THREAD_DUMP: - HyracksClientInterfaceFunctions.ThreadDumpFunction tdf = - (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn; - workQueue.schedule(new GetThreadDumpWork(ClusterControllerService.this, tdf.getNode(), - new IPCResponder<String>(handle, mid))); - return; - } - try { - handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId())); - } catch (IPCException e) { - e.printStackTrace(); - } - } - } - - private class ClusterControllerIPCI implements IIPCI { - @Override - public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload, - Exception exception) { - CCNCFunctions.Function fn = (Function) payload; - switch (fn.getFunctionId()) { - case REGISTER_NODE: { - CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn; - workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration())); - return; - } - - case UNREGISTER_NODE: { - CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn; - workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId())); - return; - } - - case NODE_HEARTBEAT: { - CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn; - workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), - nhf.getHeartbeatData())); - return; - } - - case NOTIFY_JOBLET_CLEANUP: { - CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn; - workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(), - njcf.getNodeId())); - return; - } - - case NOTIFY_DEPLOY_BINARY: { - CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn; - workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(), - ndbf.getNodeId(), ndbf.getDeploymentStatus())); - return; - } - - case REPORT_PROFILE: { - CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn; - workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles())); - return; - } - - case NOTIFY_TASK_COMPLETE: { - CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn; - workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), - ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics())); - return; - } - case NOTIFY_TASK_FAILURE: { - CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn; - workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), - ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions())); - return; - } - - case REGISTER_PARTITION_PROVIDER: { - CCNCFunctions.RegisterPartitionProviderFunction rppf = - (CCNCFunctions.RegisterPartitionProviderFunction) fn; - workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, - rppf.getPartitionDescriptor())); - return; - } - - case REGISTER_PARTITION_REQUEST: { - CCNCFunctions.RegisterPartitionRequestFunction rprf = - (CCNCFunctions.RegisterPartitionRequestFunction) fn; - workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, - rprf.getPartitionRequest())); - return; - } - - case REGISTER_RESULT_PARTITION_LOCATION: { - CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = - (CCNCFunctions.RegisterResultPartitionLocationFunction) fn; - workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, - rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(), - rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress())); - return; - } - - case REPORT_RESULT_PARTITION_WRITE_COMPLETION: { - CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = - (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn; - workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this, - rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition())); - return; - } - - case REPORT_RESULT_PARTITION_FAILURE: { - CCNCFunctions.ReportResultPartitionFailureFunction rrplf = - (CCNCFunctions.ReportResultPartitionFailureFunction) fn; - workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, - rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition())); - return; - } - - case SEND_APPLICATION_MESSAGE: { - CCNCFunctions.SendApplicationMessageFunction rsf = - (CCNCFunctions.SendApplicationMessageFunction) fn; - workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), - rsf.getDeploymentId(), rsf.getNodeId())); - return; - } - - case GET_NODE_CONTROLLERS_INFO: { - workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this, - new IResultCallback<Map<String, NodeControllerInfo>>() { - @Override - public void setValue(Map<String, NodeControllerInfo> result) { - new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1) - .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result)); - } - - @Override - public void setException(Exception e) { - } - })); - return; - } - - case STATE_DUMP_RESPONSE: { - CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn; - workQueue.schedule(new NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(), - dsrf.getStateDumpId(), dsrf.getState())); - return; - } - - case SHUTDOWN_RESPONSE: { - CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn; - workQueue.schedule(new NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId())); - return; - } - - case THREAD_DUMP_RESPONSE: { - CCNCFunctions.ThreadDumpResponseFunction tdrf = - (CCNCFunctions.ThreadDumpResponseFunction)fn; - workQueue.schedule(new NotifyThreadDumpResponse(ClusterControllerService.this, - tdrf.getRequestId(), tdrf.getThreadDumpJSON())); - return; - - } - } - LOGGER.warning("Unknown function: " + fn.getFunctionId()); - } } public synchronized void addStateDumpRun(String id, StateDumpRun sdr) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 21bf9c2..c6b415b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -116,7 +116,7 @@ private DatasetNetworkManager datasetNetworkManager; - private final WorkQueue queue; + private final WorkQueue workQueue; private final Timer timer; @@ -179,7 +179,7 @@ FullFrameChannelInterfaceFactory.INSTANCE); lccm = new LifeCycleComponentManager(); - queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. + workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new Hashtable<>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, @@ -303,7 +303,7 @@ } appCtx.setDistributedState(nodeParameters.getDistributedState()); - queue.start(); + workQueue.start(); heartbeatTask = new HeartbeatTask(ccs); @@ -354,7 +354,7 @@ if (messagingNetManager != null) { messagingNetManager.stop(); } - queue.stop(); + workQueue.stop(); if (ncAppEntryPoint != null) { ncAppEntryPoint.stop(); } @@ -409,7 +409,7 @@ } public WorkQueue getWorkQueue() { - return queue; + return workQueue; } public ThreadMXBean getThreadMXBean() { @@ -492,7 +492,7 @@ try { FutureValue<List<JobProfile>> fv = new FutureValue<>(); BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv); - queue.scheduleAndSync(bjpw); + workQueue.scheduleAndSync(bjpw); List<JobProfile> profiles = fv.get(); if (!profiles.isEmpty()) { cc.reportProfile(id, profiles); @@ -512,30 +512,32 @@ case SEND_APPLICATION_MESSAGE: CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn; - queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), + workQueue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId())); return; case START_TASKS: CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn; - queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(), - stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags())); + workQueue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), + stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), + stf.getFlags())); return; case ABORT_TASKS: CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn; - queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks())); + workQueue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks())); return; case CLEANUP_JOBLET: CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn; - queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus())); + workQueue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), + cjf.getStatus())); return; case REPORT_PARTITION_AVAILABILITY: CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn; - queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, + workQueue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf.getPartitionId(), rpaf.getNetworkAddress())); return; @@ -552,18 +554,18 @@ case DEPLOY_BINARY: CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn; - queue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(), + workQueue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(), dbf.getBinaryURLs())); return; case UNDEPLOY_BINARY: CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn; - queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId())); + workQueue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId())); return; case STATE_DUMP_REQUEST: final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn; - queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId())); + workQueue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId())); return; case SHUTDOWN_REQUEST: -- To view, visit https://asterix-gerrit.ics.uci.edu/1325 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I403e61cc054a860bef6a71fa04393f4d9c368b36 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
