abdullah alamoudi has submitted this change and it was merged.

Change subject: Extract IPCIs out of ClusterControllerService
......................................................................


Extract IPCIs out of ClusterControllerService

moving the two IPCIs out of cluster controller service is a good
start to cleanup the class. In addition, this change renames queue
to workQueue in NodeControllerService for consistency.

Change-Id: I403e61cc054a860bef6a71fa04393f4d9c368b36
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1325
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
5 files changed, 354 insertions(+), 350 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; Verified

Objections:
  Jenkins: Violations found



diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index a7c2a36..ef0af26 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -33,13 +33,11 @@
     public enum FunctionId {
         GET_CLUSTER_CONTROLLER_INFO,
         GET_CLUSTER_TOPOLOGY,
-        CREATE_JOB,
         GET_JOB_STATUS,
         GET_JOB_INFO,
         START_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
         GET_DATASET_RESULT_STATUS,
-        GET_DATASET_RECORD_DESCRIPTOR,
         GET_DATASET_RESULT_LOCATIONS,
         WAIT_FOR_COMPLETION,
         GET_NODE_CONTROLLERS_INFO,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
new file mode 100644
index 0000000..01c3bf5
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
+import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
+import org.apache.hyracks.control.cc.work.GetJobInfoWork;
+import org.apache.hyracks.control.cc.work.GetJobStatusWork;
+import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
+import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
+import org.apache.hyracks.control.cc.work.GetResultStatusWork;
+import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
+import org.apache.hyracks.control.cc.work.JobStartWork;
+import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+
+class ClientInterfaceIPCI implements IIPCI {
+
+    private static final Logger LOGGER = 
Logger.getLogger(ClientInterfaceIPCI.class.getName());
+    private final ClusterControllerService ccs;
+    private final JobIdFactory jobIdFactory;
+
+    ClientInterfaceIPCI(ClusterControllerService ccs) {
+        this.ccs = ccs;
+        jobIdFactory = new JobIdFactory();
+    }
+
+    @Override
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, 
Object payload,
+            Exception exception) {
+        HyracksClientInterfaceFunctions.Function fn = 
(HyracksClientInterfaceFunctions.Function) payload;
+        switch (fn.getFunctionId()) {
+            case GET_CLUSTER_CONTROLLER_INFO:
+                try {
+                    handle.send(mid, ccs.getClusterControllerInfo(), null);
+                } catch (IPCException e) {
+                    LOGGER.log(Level.WARNING, "Error sending response to 
GET_CLUSTER_CONTROLLER_INFO request", e);
+                }
+                break;
+            case GET_JOB_STATUS:
+                HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+                        (HyracksClientInterfaceFunctions.GetJobStatusFunction) 
fn;
+                ccs.getWorkQueue().schedule(new GetJobStatusWork(ccs, 
gjsf.getJobId(),
+                        new IPCResponder<JobStatus>(handle, mid)));
+                break;
+            case GET_JOB_INFO:
+                HyracksClientInterfaceFunctions.GetJobInfoFunction gjif =
+                        (HyracksClientInterfaceFunctions.GetJobInfoFunction) 
fn;
+                ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs, 
gjif.getJobId(),
+                        new IPCResponder<JobInfo>(handle, mid)));
+                break;
+            case START_JOB:
+                HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                        (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+                JobId jobId = jobIdFactory.create();
+                ccs.getWorkQueue().schedule(new JobStartWork(ccs, 
sjf.getDeploymentId(),
+                        sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new 
IPCResponder<JobId>(handle, mid)));
+                break;
+            case GET_DATASET_DIRECTORY_SERIVICE_INFO:
+                ccs.getWorkQueue().schedule(new 
GetDatasetDirectoryServiceInfoWork(ccs,
+                        new IPCResponder<NetworkAddress>(handle, mid)));
+                break;
+            case GET_DATASET_RESULT_STATUS:
+                HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction 
gdrsf =
+                        
(HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+                ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, 
gdrsf.getJobId(),
+                        gdrsf.getResultSetId(), new 
IPCResponder<Status>(handle, mid)));
+                break;
+            case GET_DATASET_RESULT_LOCATIONS:
+                
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
+                        
(HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+                ccs.getWorkQueue().schedule(new 
GetResultPartitionLocationsWork(ccs,
+                        gdrlf.getJobId(), gdrlf.getResultSetId(), 
gdrlf.getKnownRecords(),
+                        new IPCResponder<>(handle, mid)));
+                break;
+            case WAIT_FOR_COMPLETION:
+                HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf 
=
+                        
(HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+                ccs.getWorkQueue().schedule(new WaitForJobCompletionWork(ccs, 
wfcf.getJobId(),
+                        new IPCResponder<>(handle, mid)));
+                break;
+            case GET_NODE_CONTROLLERS_INFO:
+                ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+                        new IPCResponder<>(handle, mid)));
+                break;
+            case GET_CLUSTER_TOPOLOGY:
+                try {
+                    handle.send(mid, ccs.getCCContext().getClusterTopology(), 
null);
+                } catch (IPCException e) {
+                    LOGGER.log(Level.WARNING, "Error sending response to 
GET_CLUSTER_TOPOLOGY request", e);
+                }
+                break;
+            case CLI_DEPLOY_BINARY:
+                HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+                        
(HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
+                ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, 
dbf.getBinaryURLs(),
+                        dbf.getDeploymentId(), new IPCResponder<>(handle, 
mid)));
+                break;
+            case CLI_UNDEPLOY_BINARY:
+                HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf 
=
+                        
(HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+                ccs.getWorkQueue().schedule(new CliUnDeployBinaryWork(ccs, 
udbf.getDeploymentId(),
+                        new IPCResponder<>(handle, mid)));
+                break;
+            case CLUSTER_SHUTDOWN:
+                HyracksClientInterfaceFunctions.ClusterShutdownFunction csf =
+                        
(HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
+                ccs.getWorkQueue().schedule(new ClusterShutdownWork(ccs,
+                        csf.isTerminateNCService(), new IPCResponder<>(handle, 
mid)));
+                break;
+            case GET_NODE_DETAILS_JSON:
+                HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction 
gndjf =
+                        
(HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
+                ccs.getWorkQueue().schedule(new GetNodeDetailsJSONWork(ccs, 
gndjf.getNodeId(),
+                        gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new 
IPCResponder<>(handle, mid)));
+                break;
+            case THREAD_DUMP:
+                HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
+                        (HyracksClientInterfaceFunctions.ThreadDumpFunction) 
fn;
+                ccs.getWorkQueue().schedule(new GetThreadDumpWork(ccs, 
tdf.getNode(),
+                        new IPCResponder<String>(handle, mid)));
+                break;
+            default:
+                try {
+                    handle.send(mid, null, new 
IllegalArgumentException("Unknown function " + fn.getFunctionId()));
+                } catch (IPCException e) {
+                    LOGGER.log(Level.WARNING, "Error sending Unknown function 
response", e);
+                }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
new file mode 100644
index 0000000..b6c9a08
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
+import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
+import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
+import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
+import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
+import org.apache.hyracks.control.cc.work.RegisterNodeWork;
+import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
+import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
+import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
+import org.apache.hyracks.control.cc.work.ReportProfilesWork;
+import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
+import 
org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
+import org.apache.hyracks.control.cc.work.TaskCompleteWork;
+import org.apache.hyracks.control.cc.work.TaskFailureWork;
+import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+
+class ClusterControllerIPCI implements IIPCI {
+    private static final Logger LOGGER = 
Logger.getLogger(ClusterControllerIPCI.class.getName());
+    private final ClusterControllerService ccs;
+
+    ClusterControllerIPCI(ClusterControllerService ccs) {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public void deliverIncomingMessage(final IIPCHandle handle, long mid, long 
rmid, Object payload,
+            Exception exception) {
+        CCNCFunctions.Function fn = (Function) payload;
+        switch (fn.getFunctionId()) {
+            case REGISTER_NODE:
+                CCNCFunctions.RegisterNodeFunction rnf = 
(CCNCFunctions.RegisterNodeFunction) fn;
+                ccs.getWorkQueue().schedule(new RegisterNodeWork(ccs, 
rnf.getNodeRegistration()));
+                break;
+            case UNREGISTER_NODE:
+                CCNCFunctions.UnregisterNodeFunction unf = 
(CCNCFunctions.UnregisterNodeFunction) fn;
+                ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs, 
unf.getNodeId()));
+                break;
+            case NODE_HEARTBEAT:
+                CCNCFunctions.NodeHeartbeatFunction nhf = 
(CCNCFunctions.NodeHeartbeatFunction) fn;
+                ccs.getWorkQueue().schedule(new NodeHeartbeatWork(ccs, 
nhf.getNodeId(),
+                        nhf.getHeartbeatData()));
+                break;
+            case NOTIFY_JOBLET_CLEANUP:
+                CCNCFunctions.NotifyJobletCleanupFunction njcf = 
(CCNCFunctions.NotifyJobletCleanupFunction) fn;
+                ccs.getWorkQueue().schedule(new 
JobletCleanupNotificationWork(ccs, njcf.getJobId(),
+                        njcf.getNodeId()));
+                break;
+            case NOTIFY_DEPLOY_BINARY:
+                CCNCFunctions.NotifyDeployBinaryFunction ndbf = 
(CCNCFunctions.NotifyDeployBinaryFunction) fn;
+                ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, 
ndbf.getDeploymentId(),
+                        ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+                break;
+            case REPORT_PROFILE:
+                CCNCFunctions.ReportProfileFunction rpf = 
(CCNCFunctions.ReportProfileFunction) fn;
+                ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs, 
rpf.getProfiles()));
+                break;
+            case NOTIFY_TASK_COMPLETE:
+                CCNCFunctions.NotifyTaskCompleteFunction ntcf = 
(CCNCFunctions.NotifyTaskCompleteFunction) fn;
+                ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, 
ntcf.getJobId(),
+                        ntcf.getTaskId(), ntcf.getNodeId(), 
ntcf.getStatistics()));
+                break;
+            case NOTIFY_TASK_FAILURE:
+                CCNCFunctions.NotifyTaskFailureFunction ntff = 
(CCNCFunctions.NotifyTaskFailureFunction) fn;
+                ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, 
ntff.getJobId(),
+                        ntff.getTaskId(), ntff.getNodeId(), 
ntff.getExceptions()));
+                break;
+            case REGISTER_PARTITION_PROVIDER:
+                CCNCFunctions.RegisterPartitionProviderFunction rppf =
+                        (CCNCFunctions.RegisterPartitionProviderFunction) fn;
+                ccs.getWorkQueue().schedule(new 
RegisterPartitionAvailibilityWork(ccs,
+                        rppf.getPartitionDescriptor()));
+                break;
+            case REGISTER_PARTITION_REQUEST:
+                CCNCFunctions.RegisterPartitionRequestFunction rprf =
+                        (CCNCFunctions.RegisterPartitionRequestFunction) fn;
+                ccs.getWorkQueue().schedule(new 
RegisterPartitionRequestWork(ccs,
+                        rprf.getPartitionRequest()));
+                break;
+            case REGISTER_RESULT_PARTITION_LOCATION:
+                CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
+                        
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+                ccs.getWorkQueue().schedule(new 
RegisterResultPartitionLocationWork(ccs,
+                        rrplf.getJobId(), rrplf.getResultSetId(), 
rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+                        rrplf.getPartition(), rrplf.getNPartitions(), 
rrplf.getNetworkAddress()));
+                break;
+            case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
+                CCNCFunctions.ReportResultPartitionWriteCompletionFunction 
rrpwc =
+                        
(CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+                ccs.getWorkQueue().schedule(new 
ReportResultPartitionWriteCompletionWork(ccs,
+                        rrpwc.getJobId(), rrpwc.getResultSetId(), 
rrpwc.getPartition()));
+                break;
+            case REPORT_RESULT_PARTITION_FAILURE:
+                CCNCFunctions.ReportResultPartitionFailureFunction rrpf =
+                        (CCNCFunctions.ReportResultPartitionFailureFunction) 
fn;
+                ccs.getWorkQueue().schedule(new 
ReportResultPartitionFailureWork(ccs,
+                        rrpf.getJobId(), rrpf.getResultSetId(), 
rrpf.getPartition()));
+                break;
+            case SEND_APPLICATION_MESSAGE:
+                CCNCFunctions.SendApplicationMessageFunction rsf =
+                        (CCNCFunctions.SendApplicationMessageFunction) fn;
+                ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs, 
rsf.getMessage(),
+                        rsf.getDeploymentId(), rsf.getNodeId()));
+                break;
+            case GET_NODE_CONTROLLERS_INFO:
+                ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+                        new IResultCallback<Map<String, NodeControllerInfo>>() 
{
+                            @Override
+                            public void setValue(Map<String, 
NodeControllerInfo> result) {
+                                new 
IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
+                                        .setValue(new 
CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
+                            }
+
+                            @Override
+                            public void setException(Exception e) {
+                            }
+                        }));
+                break;
+            case STATE_DUMP_RESPONSE:
+                CCNCFunctions.StateDumpResponseFunction dsrf = 
(StateDumpResponseFunction) fn;
+                ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs, 
dsrf.getNodeId(),
+                        dsrf.getStateDumpId(), dsrf.getState()));
+                break;
+            case SHUTDOWN_RESPONSE:
+                CCNCFunctions.ShutdownResponseFunction sdrf = 
(ShutdownResponseFunction) fn;
+                ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs, 
sdrf.getNodeId()));
+                break;
+            case THREAD_DUMP_RESPONSE:
+                CCNCFunctions.ThreadDumpResponseFunction tdrf =
+                        (CCNCFunctions.ThreadDumpResponseFunction)fn;
+                ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs,
+                        tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
+                break;
+            default:
+                LOGGER.warning("Unknown function: " + fn.getFunctionId());
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ac38523..c76534b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -39,17 +39,11 @@
 
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
-import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.context.ICCContext;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.topology.TopologyDefinitionParser;
@@ -58,58 +52,21 @@
 import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.web.WebServer;
-import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
-import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
 import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
-import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
-import org.apache.hyracks.control.cc.work.GetJobInfoWork;
-import org.apache.hyracks.control.cc.work.GetJobStatusWork;
-import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
 import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
-import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
-import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
-import org.apache.hyracks.control.cc.work.GetResultStatusWork;
-import org.apache.hyracks.control.cc.work.JobStartWork;
-import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
-import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
-import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
-import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
-import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
-import org.apache.hyracks.control.cc.work.RegisterNodeWork;
-import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
-import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
-import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
 import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
-import org.apache.hyracks.control.cc.work.ReportProfilesWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
-import 
org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
-import org.apache.hyracks.control.cc.work.TaskCompleteWork;
-import org.apache.hyracks.control.cc.work.TaskFailureWork;
 import org.apache.hyracks.control.cc.work.TriggerNCWork;
-import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
-import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.IniUtils;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
-import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
-import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
 import org.apache.hyracks.control.common.logs.LogFile;
 import org.apache.hyracks.control.common.shutdown.ShutdownRun;
-import org.apache.hyracks.control.common.work.IPCResponder;
-import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.WorkQueue;
-import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
-import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 import org.ini4j.Ini;
@@ -156,8 +113,6 @@
 
     private final IDatasetDirectoryService datasetDirectoryService;
 
-    private final JobIdFactory jobIdFactory;
-
     private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
 
     private final Map<String, StateDumpRun> stateDumpRunMap;
@@ -175,10 +130,10 @@
         nodeRegistry = new LinkedHashMap<>();
         ipAddressNodeNameMap = new HashMap<>();
         serverCtx = new 
ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new 
File(ccConfig.ccRoot));
-        IIPCI ccIPCI = new ClusterControllerIPCI();
+        IIPCI ccIPCI = new ClusterControllerIPCI(this);
         clusterIPC = new IPCSystem(new 
InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
-        IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
+        IIPCI ciIPCI = new ClientInterfaceIPCI(this);
         clientIPC = new IPCSystem(new 
InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
                 new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this);
@@ -208,7 +163,6 @@
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
         datasetDirectoryService = new 
DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
-        jobIdFactory = new JobIdFactory();
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
@@ -440,291 +394,6 @@
 
     public IDatasetDirectoryService getDatasetDirectoryService() {
         return datasetDirectoryService;
-    }
-
-    private class HyracksClientInterfaceIPCI implements IIPCI {
-
-        @Override
-        public void deliverIncomingMessage(IIPCHandle handle, long mid, long 
rmid, Object payload,
-                Exception exception) {
-            HyracksClientInterfaceFunctions.Function fn = 
(HyracksClientInterfaceFunctions.Function) payload;
-            switch (fn.getFunctionId()) {
-                case GET_CLUSTER_CONTROLLER_INFO: {
-                    try {
-                        handle.send(mid, info, null);
-                    } catch (IPCException e) {
-                        e.printStackTrace();
-                    }
-                    return;
-                }
-
-                case CREATE_JOB:
-                    break;
-                case GET_JOB_STATUS: {
-                    HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
-                            
(HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
-                    workQueue.schedule(new 
GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
-                            new IPCResponder<JobStatus>(handle, mid)));
-                    return;
-                }
-
-                case GET_JOB_INFO: {
-                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
-                            
(HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
-                    workQueue.schedule(new 
GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
-                            new IPCResponder<JobInfo>(handle, mid)));
-                    return;
-                }
-
-                case START_JOB: {
-                    HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                            (HyracksClientInterfaceFunctions.StartJobFunction) 
fn;
-                    JobId jobId = jobIdFactory.create();
-                    workQueue.schedule(new 
JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
-                            sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new 
IPCResponder<JobId>(handle, mid)));
-                    return;
-                }
-
-                case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
-                    workQueue.schedule(new 
GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
-                            new IPCResponder<NetworkAddress>(handle, mid)));
-                    return;
-                }
-
-                case GET_DATASET_RESULT_STATUS: {
-                    
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
-                            
(HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
-                    workQueue.schedule(new 
GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
-                            gdrlf.getResultSetId(), new 
IPCResponder<Status>(handle, mid)));
-                    return;
-                }
-
-                case GET_DATASET_RECORD_DESCRIPTOR:
-                    break;
-                case GET_DATASET_RESULT_LOCATIONS: {
-                    
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
-                            
(HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
-                    workQueue.schedule(new 
GetResultPartitionLocationsWork(ClusterControllerService.this,
-                            gdrlf.getJobId(), gdrlf.getResultSetId(), 
gdrlf.getKnownRecords(),
-                            new IPCResponder<>(handle, mid)));
-                    return;
-                }
-
-                case WAIT_FOR_COMPLETION: {
-                    HyracksClientInterfaceFunctions.WaitForCompletionFunction 
wfcf =
-                            
(HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
-                    workQueue.schedule(new 
WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
-                            new IPCResponder<Object>(handle, mid)));
-                    return;
-                }
-
-                case GET_NODE_CONTROLLERS_INFO: {
-                    workQueue.schedule(new 
GetNodeControllersInfoWork(ClusterControllerService.this,
-                            new IPCResponder<>(handle, mid)));
-                    return;
-                }
-
-                case GET_CLUSTER_TOPOLOGY: {
-                    try {
-                        handle.send(mid, ccContext.getClusterTopology(), null);
-                    } catch (IPCException e) {
-                        e.printStackTrace();
-                    }
-                    return;
-                }
-
-                case CLI_DEPLOY_BINARY: {
-                    HyracksClientInterfaceFunctions.CliDeployBinaryFunction 
dbf =
-                            
(HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
-                    workQueue.schedule(new 
CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
-                            dbf.getDeploymentId(), new IPCResponder<>(handle, 
mid)));
-                    return;
-                }
-
-                case CLI_UNDEPLOY_BINARY: {
-                    HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction 
udbf =
-                            
(HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
-                    workQueue.schedule(new 
CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
-                            new IPCResponder<>(handle, mid)));
-                    return;
-                }
-                case CLUSTER_SHUTDOWN: {
-                    HyracksClientInterfaceFunctions.ClusterShutdownFunction 
csf =
-                            
(HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
-                    workQueue.schedule(new 
ClusterShutdownWork(ClusterControllerService.this,
-                            csf.isTerminateNCService(), new 
IPCResponder<>(handle, mid)));
-                    return;
-                }
-
-                case GET_NODE_DETAILS_JSON:
-                    HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction 
gndjf =
-                            
(HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
-                    workQueue.schedule(new 
GetNodeDetailsJSONWork(ClusterControllerService.this, gndjf.getNodeId(),
-                            gndjf.isIncludeStats(), gndjf.isIncludeConfig(), 
new IPCResponder<>(handle, mid)));
-                    return;
-
-                case THREAD_DUMP:
-                    HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
-                            
(HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
-                    workQueue.schedule(new 
GetThreadDumpWork(ClusterControllerService.this, tdf.getNode(),
-                            new IPCResponder<String>(handle, mid)));
-                    return;
-            }
-            try {
-                handle.send(mid, null, new IllegalArgumentException("Unknown 
function " + fn.getFunctionId()));
-            } catch (IPCException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private class ClusterControllerIPCI implements IIPCI {
-        @Override
-        public void deliverIncomingMessage(final IIPCHandle handle, long mid, 
long rmid, Object payload,
-                Exception exception) {
-            CCNCFunctions.Function fn = (Function) payload;
-            switch (fn.getFunctionId()) {
-                case REGISTER_NODE: {
-                    CCNCFunctions.RegisterNodeFunction rnf = 
(CCNCFunctions.RegisterNodeFunction) fn;
-                    workQueue.schedule(new 
RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
-                    return;
-                }
-
-                case UNREGISTER_NODE: {
-                    CCNCFunctions.UnregisterNodeFunction unf = 
(CCNCFunctions.UnregisterNodeFunction) fn;
-                    workQueue.schedule(new 
UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
-                    return;
-                }
-
-                case NODE_HEARTBEAT: {
-                    CCNCFunctions.NodeHeartbeatFunction nhf = 
(CCNCFunctions.NodeHeartbeatFunction) fn;
-                    workQueue.schedule(new 
NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(),
-                            nhf.getHeartbeatData()));
-                    return;
-                }
-
-                case NOTIFY_JOBLET_CLEANUP: {
-                    CCNCFunctions.NotifyJobletCleanupFunction njcf = 
(CCNCFunctions.NotifyJobletCleanupFunction) fn;
-                    workQueue.schedule(new 
JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(),
-                            njcf.getNodeId()));
-                    return;
-                }
-
-                case NOTIFY_DEPLOY_BINARY: {
-                    CCNCFunctions.NotifyDeployBinaryFunction ndbf = 
(CCNCFunctions.NotifyDeployBinaryFunction) fn;
-                    workQueue.schedule(new 
NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(),
-                            ndbf.getNodeId(), ndbf.getDeploymentStatus()));
-                    return;
-                }
-
-                case REPORT_PROFILE: {
-                    CCNCFunctions.ReportProfileFunction rpf = 
(CCNCFunctions.ReportProfileFunction) fn;
-                    workQueue.schedule(new 
ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
-                    return;
-                }
-
-                case NOTIFY_TASK_COMPLETE: {
-                    CCNCFunctions.NotifyTaskCompleteFunction ntcf = 
(CCNCFunctions.NotifyTaskCompleteFunction) fn;
-                    workQueue.schedule(new 
TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(),
-                            ntcf.getTaskId(), ntcf.getNodeId(), 
ntcf.getStatistics()));
-                    return;
-                }
-                case NOTIFY_TASK_FAILURE: {
-                    CCNCFunctions.NotifyTaskFailureFunction ntff = 
(CCNCFunctions.NotifyTaskFailureFunction) fn;
-                    workQueue.schedule(new 
TaskFailureWork(ClusterControllerService.this, ntff.getJobId(),
-                            ntff.getTaskId(), ntff.getNodeId(), 
ntff.getExceptions()));
-                    return;
-                }
-
-                case REGISTER_PARTITION_PROVIDER: {
-                    CCNCFunctions.RegisterPartitionProviderFunction rppf =
-                            (CCNCFunctions.RegisterPartitionProviderFunction) 
fn;
-                    workQueue.schedule(new 
RegisterPartitionAvailibilityWork(ClusterControllerService.this,
-                            rppf.getPartitionDescriptor()));
-                    return;
-                }
-
-                case REGISTER_PARTITION_REQUEST: {
-                    CCNCFunctions.RegisterPartitionRequestFunction rprf =
-                            (CCNCFunctions.RegisterPartitionRequestFunction) 
fn;
-                    workQueue.schedule(new 
RegisterPartitionRequestWork(ClusterControllerService.this,
-                            rprf.getPartitionRequest()));
-                    return;
-                }
-
-                case REGISTER_RESULT_PARTITION_LOCATION: {
-                    CCNCFunctions.RegisterResultPartitionLocationFunction 
rrplf =
-                            
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
-                    workQueue.schedule(new 
RegisterResultPartitionLocationWork(ClusterControllerService.this,
-                            rrplf.getJobId(), rrplf.getResultSetId(), 
rrplf.getOrderedResult(), rrplf.getEmptyResult(),
-                            rrplf.getPartition(), rrplf.getNPartitions(), 
rrplf.getNetworkAddress()));
-                    return;
-                }
-
-                case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
-                    CCNCFunctions.ReportResultPartitionWriteCompletionFunction 
rrplf =
-                            
(CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
-                    workQueue.schedule(new 
ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
-                            rrplf.getJobId(), rrplf.getResultSetId(), 
rrplf.getPartition()));
-                    return;
-                }
-
-                case REPORT_RESULT_PARTITION_FAILURE: {
-                    CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
-                            
(CCNCFunctions.ReportResultPartitionFailureFunction) fn;
-                    workQueue.schedule(new 
ReportResultPartitionFailureWork(ClusterControllerService.this,
-                            rrplf.getJobId(), rrplf.getResultSetId(), 
rrplf.getPartition()));
-                    return;
-                }
-
-                case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction rsf =
-                            (CCNCFunctions.SendApplicationMessageFunction) fn;
-                    workQueue.schedule(new 
ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
-                            rsf.getDeploymentId(), rsf.getNodeId()));
-                    return;
-                }
-
-                case GET_NODE_CONTROLLERS_INFO: {
-                    workQueue.schedule(new 
GetNodeControllersInfoWork(ClusterControllerService.this,
-                            new IResultCallback<Map<String, 
NodeControllerInfo>>() {
-                                @Override
-                                public void setValue(Map<String, 
NodeControllerInfo> result) {
-                                    new 
IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
-                                            .setValue(new 
CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
-                                }
-
-                                @Override
-                                public void setException(Exception e) {
-                                }
-                            }));
-                    return;
-                }
-
-                case STATE_DUMP_RESPONSE: {
-                    CCNCFunctions.StateDumpResponseFunction dsrf = 
(StateDumpResponseFunction) fn;
-                    workQueue.schedule(new 
NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(),
-                            dsrf.getStateDumpId(), dsrf.getState()));
-                    return;
-                }
-
-                case SHUTDOWN_RESPONSE: {
-                    CCNCFunctions.ShutdownResponseFunction sdrf = 
(ShutdownResponseFunction) fn;
-                    workQueue.schedule(new 
NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId()));
-                    return;
-                }
-
-                case THREAD_DUMP_RESPONSE: {
-                    CCNCFunctions.ThreadDumpResponseFunction tdrf =
-                            (CCNCFunctions.ThreadDumpResponseFunction)fn;
-                    workQueue.schedule(new 
NotifyThreadDumpResponse(ClusterControllerService.this,
-                            tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
-                    return;
-
-                }
-            }
-            LOGGER.warning("Unknown function: " + fn.getFunctionId());
-        }
     }
 
     public synchronized void addStateDumpRun(String id, StateDumpRun sdr) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 21bf9c2..c6b415b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -116,7 +116,7 @@
 
     private DatasetNetworkManager datasetNetworkManager;
 
-    private final WorkQueue queue;
+    private final WorkQueue workQueue;
 
     private final Timer timer;
 
@@ -179,7 +179,7 @@
                 FullFrameChannelInterfaceFactory.INSTANCE);
 
         lccm = new LifeCycleComponentManager();
-        queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves 
MAX_PRIORITY of the heartbeat thread.
+        workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves 
MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
@@ -303,7 +303,7 @@
         }
         appCtx.setDistributedState(nodeParameters.getDistributedState());
 
-        queue.start();
+        workQueue.start();
 
         heartbeatTask = new HeartbeatTask(ccs);
 
@@ -354,7 +354,7 @@
             if (messagingNetManager != null) {
                 messagingNetManager.stop();
             }
-            queue.stop();
+            workQueue.stop();
             if (ncAppEntryPoint != null) {
                 ncAppEntryPoint.stop();
             }
@@ -409,7 +409,7 @@
     }
 
     public WorkQueue getWorkQueue() {
-        return queue;
+        return workQueue;
     }
 
     public ThreadMXBean getThreadMXBean() {
@@ -492,7 +492,7 @@
             try {
                 FutureValue<List<JobProfile>> fv = new FutureValue<>();
                 BuildJobProfilesWork bjpw = new 
BuildJobProfilesWork(NodeControllerService.this, fv);
-                queue.scheduleAndSync(bjpw);
+                workQueue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
                 if (!profiles.isEmpty()) {
                     cc.reportProfile(id, profiles);
@@ -512,30 +512,32 @@
                 case SEND_APPLICATION_MESSAGE:
                     CCNCFunctions.SendApplicationMessageFunction amf =
                             (CCNCFunctions.SendApplicationMessageFunction) fn;
-                    queue.schedule(new 
ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
+                    workQueue.schedule(new 
ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
                             amf.getDeploymentId(), amf.getNodeId()));
                     return;
 
                 case START_TASKS:
                     CCNCFunctions.StartTasksFunction stf = 
(CCNCFunctions.StartTasksFunction) fn;
-                    queue.schedule(new 
StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), 
stf.getJobId(),
-                            stf.getPlanBytes(), stf.getTaskDescriptors(), 
stf.getConnectorPolicies(), stf.getFlags()));
+                    workQueue.schedule(new 
StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
+                            stf.getJobId(), stf.getPlanBytes(), 
stf.getTaskDescriptors(), stf.getConnectorPolicies(),
+                            stf.getFlags()));
                     return;
 
                 case ABORT_TASKS:
                     CCNCFunctions.AbortTasksFunction atf = 
(CCNCFunctions.AbortTasksFunction) fn;
-                    queue.schedule(new 
AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+                    workQueue.schedule(new 
AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
                     return;
 
                 case CLEANUP_JOBLET:
                     CCNCFunctions.CleanupJobletFunction cjf = 
(CCNCFunctions.CleanupJobletFunction) fn;
-                    queue.schedule(new 
CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+                    workQueue.schedule(new 
CleanupJobletWork(NodeControllerService.this, cjf.getJobId(),
+                            cjf.getStatus()));
                     return;
 
                 case REPORT_PARTITION_AVAILABILITY:
                     CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
                             
(CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
-                    queue.schedule(new 
ReportPartitionAvailabilityWork(NodeControllerService.this,
+                    workQueue.schedule(new 
ReportPartitionAvailabilityWork(NodeControllerService.this,
                             rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                     return;
 
@@ -552,18 +554,18 @@
 
                 case DEPLOY_BINARY:
                     CCNCFunctions.DeployBinaryFunction dbf = 
(CCNCFunctions.DeployBinaryFunction) fn;
-                    queue.schedule(new 
DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
+                    workQueue.schedule(new 
DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
                             dbf.getBinaryURLs()));
                     return;
 
                 case UNDEPLOY_BINARY:
                     CCNCFunctions.UnDeployBinaryFunction ndbf = 
(CCNCFunctions.UnDeployBinaryFunction) fn;
-                    queue.schedule(new 
UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
+                    workQueue.schedule(new 
UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
                     return;
 
                 case STATE_DUMP_REQUEST:
                     final CCNCFunctions.StateDumpRequestFunction dsrf = 
(StateDumpRequestFunction) fn;
-                    queue.schedule(new 
StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
+                    workQueue.schedule(new 
StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
                     return;
 
                 case SHUTDOWN_REQUEST:

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1325
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I403e61cc054a860bef6a71fa04393f4d9c368b36
Gerrit-PatchSet: 7
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: Yingyi Bu <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>

Reply via email to