abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1324
Change subject: Extract NodeControllerIPCI out of NodeControllerService ...................................................................... Extract NodeControllerIPCI out of NodeControllerService Change-Id: I9e7f160dfa7418cd29693a990fa17d80a14e5841 --- A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java 3 files changed, 140 insertions(+), 97 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/24/1324/1 diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java new file mode 100644 index 0000000..93ccaa4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc; + +import org.apache.hyracks.control.common.ipc.CCNCFunctions; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction; +import org.apache.hyracks.control.nc.task.ShutdownTask; +import org.apache.hyracks.control.nc.task.ThreadDumpTask; +import org.apache.hyracks.control.nc.work.AbortTasksWork; +import org.apache.hyracks.control.nc.work.ApplicationMessageWork; +import org.apache.hyracks.control.nc.work.CleanupJobletWork; +import org.apache.hyracks.control.nc.work.DeployBinaryWork; +import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork; +import org.apache.hyracks.control.nc.work.StartTasksWork; +import org.apache.hyracks.control.nc.work.StateDumpWork; +import org.apache.hyracks.control.nc.work.UnDeployBinaryWork; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.api.IIPCI; + +/** + * Interprocess communication in a node controller + * This class must be refactored with each function carrying its own implementation + */ +final class NodeControllerIPCI implements IIPCI { + private final NodeControllerService ncs; + + /** + * @param nodeControllerService + */ + NodeControllerIPCI(NodeControllerService nodeControllerService) { + ncs = nodeControllerService; + } + + @Override + public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload, + Exception exception) { + CCNCFunctions.Function fn = (CCNCFunctions.Function) payload; + switch (fn.getFunctionId()) { + case SEND_APPLICATION_MESSAGE: + CCNCFunctions.SendApplicationMessageFunction amf = + (CCNCFunctions.SendApplicationMessageFunction) fn; + ncs.getWorkQueue().schedule(new ApplicationMessageWork(ncs, amf.getMessage(), + amf.getDeploymentId(), amf.getNodeId())); + return; + case START_TASKS: + CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn; + ncs.getWorkQueue().schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(), + stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags())); + return; + case ABORT_TASKS: + CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn; + ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks())); + return; + case CLEANUP_JOBLET: + CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn; + ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, cjf.getJobId(), cjf.getStatus())); + return; + case REPORT_PARTITION_AVAILABILITY: + CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = + (CCNCFunctions.ReportPartitionAvailabilityFunction) fn; + ncs.getWorkQueue().schedule(new ReportPartitionAvailabilityWork(ncs, + rpaf.getPartitionId(), rpaf.getNetworkAddress())); + return; + case NODE_REGISTRATION_RESULT: + CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn; + ncs.setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException()); + return; + + case GET_NODE_CONTROLLERS_INFO_RESPONSE: + CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = + (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn; + ncs.setNodeControllersInfo(gncirf.getNodeControllerInfos()); + return; + + case DEPLOY_BINARY: + CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn; + ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), + dbf.getBinaryURLs())); + return; + + case UNDEPLOY_BINARY: + CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn; + ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId())); + return; + + case STATE_DUMP_REQUEST: + final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn; + ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId())); + return; + + case SHUTDOWN_REQUEST: + final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn; + ncs.getExecutorService().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService())); + return; + + case THREAD_DUMP_REQUEST: + final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn; + ncs.getExecutorService().submit(new ThreadDumpTask(ncs, tdrf.getRequestId())); + return; + + default: + throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId()); + } + + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index ed46b53..09ee09a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -62,7 +62,6 @@ import org.apache.hyracks.control.common.heartbeat.HeartbeatData; import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema; import org.apache.hyracks.control.common.ipc.CCNCFunctions; -import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction; import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.utils.PidHelper; @@ -79,19 +78,8 @@ import org.apache.hyracks.control.nc.partitions.PartitionManager; import org.apache.hyracks.control.nc.resources.memory.MemoryManager; import org.apache.hyracks.control.nc.runtime.RootHyracksContext; -import org.apache.hyracks.control.nc.work.AbortTasksWork; -import org.apache.hyracks.control.nc.work.ApplicationMessageWork; import org.apache.hyracks.control.nc.work.BuildJobProfilesWork; -import org.apache.hyracks.control.nc.work.CleanupJobletWork; -import org.apache.hyracks.control.nc.work.DeployBinaryWork; -import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork; -import org.apache.hyracks.control.nc.task.ShutdownTask; -import org.apache.hyracks.control.nc.work.StartTasksWork; -import org.apache.hyracks.control.nc.work.StateDumpWork; -import org.apache.hyracks.control.nc.task.ThreadDumpTask; -import org.apache.hyracks.control.nc.work.UnDeployBinaryWork; import org.apache.hyracks.ipc.api.IIPCHandle; -import org.apache.hyracks.ipc.api.IIPCI; import org.apache.hyracks.ipc.api.IPCPerformanceCounters; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory; @@ -167,8 +155,8 @@ public NodeControllerService(NCConfig ncConfig) throws Exception { this.ncConfig = ncConfig; id = ncConfig.nodeId; - NodeControllerIPCI ipci = new NodeControllerIPCI(); - ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), ipci, + ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), + new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer()); this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices))); @@ -219,7 +207,7 @@ return devices; } - private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { + synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) { this.nodeParameters = parameters; this.registrationException = exception; this.registrationPending = false; @@ -238,7 +226,7 @@ return fv.get(); } - private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) { + void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) { FutureValue<Map<String, NodeControllerInfo>> fv; synchronized (getNodeControllerInfosAcceptor) { fv = getNodeControllerInfosAcceptor.getValue(); @@ -505,86 +493,6 @@ } catch (Exception e) { LOGGER.log(Level.WARNING, "Exception reporting profile", e); } - } - } - - private final class NodeControllerIPCI implements IIPCI { - @Override - public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload, - Exception exception) { - CCNCFunctions.Function fn = (CCNCFunctions.Function) payload; - switch (fn.getFunctionId()) { - case SEND_APPLICATION_MESSAGE: - CCNCFunctions.SendApplicationMessageFunction amf = - (CCNCFunctions.SendApplicationMessageFunction) fn; - queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), - amf.getDeploymentId(), amf.getNodeId())); - return; - - case START_TASKS: - CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn; - queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(), - stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags())); - return; - - case ABORT_TASKS: - CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn; - queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks())); - return; - - case CLEANUP_JOBLET: - CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn; - queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus())); - return; - - case REPORT_PARTITION_AVAILABILITY: - CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = - (CCNCFunctions.ReportPartitionAvailabilityFunction) fn; - queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, - rpaf.getPartitionId(), rpaf.getNetworkAddress())); - return; - - case NODE_REGISTRATION_RESULT: - CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn; - setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException()); - return; - - case GET_NODE_CONTROLLERS_INFO_RESPONSE: - CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = - (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn; - setNodeControllersInfo(gncirf.getNodeControllerInfos()); - return; - - case DEPLOY_BINARY: - CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn; - queue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(), - dbf.getBinaryURLs())); - return; - - case UNDEPLOY_BINARY: - CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn; - queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId())); - return; - - case STATE_DUMP_REQUEST: - final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn; - queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId())); - return; - - case SHUTDOWN_REQUEST: - final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn; - executor.submit(new ShutdownTask(NodeControllerService.this, sdrf.isTerminateNCService())); - return; - - case THREAD_DUMP_REQUEST: - final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn; - executor.submit(new ThreadDumpTask(NodeControllerService.this, tdrf.getRequestId())); - return; - - default: - throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId()); - } - } } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java index bfc46df..087f06e 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java @@ -18,6 +18,19 @@ */ package org.apache.hyracks.ipc.api; +/** + * The interprocess communication interface that handles communication between different processes across the cluster + */ +@FunctionalInterface public interface IIPCI { - public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception); + + /** + * handles the incoming message + * @param handle the message IPC handle + * @param mid the message id + * @param rmid the request message id (if the message is a response to a request) + * @param payload the message payload + * @param exception an exception if the message was an error message + */ + void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1324 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I9e7f160dfa7418cd29693a990fa17d80a14e5841 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]>
