Michael Blow has submitted this change and it was merged.

Change subject: Attempt to reconnect IPCHandle on connection failure
......................................................................


Attempt to reconnect IPCHandle on connection failure

IPCHandles can become invalid due to network interruption or node
crash/restart.  Automatically retry connection in event of attempt
to use disconnected handle.

Change-Id: I069dcd59898021054462c8213fb623df2deec598
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1828
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
BAD: Jenkins <[email protected]>
Reviewed-by: Yingyi Bu <[email protected]>
Integration-Tests: Jenkins <[email protected]>
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
7 files changed, 175 insertions(+), 91 deletions(-)

Approvals:
  Yingyi Bu: Looks good to me, approved
  Jenkins: Verified; No violations found; No violations found; Verified



diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 79033d8..dc7bad0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -54,8 +54,8 @@
         CCNCFunctions.NodeRegistrationResult result;
         Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
-            INodeController nodeController = new 
NodeControllerRemoteProxy(ncIPCHandle);
-            NodeControllerState state = new 
NodeControllerState(nodeController, reg);
+            INodeController nc = new 
NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+            NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
             IApplicationConfig cfg = 
state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4eb1732..620033c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1396,7 +1396,8 @@
         int cdid = dis.readInt();
         int senderIndex = dis.readInt();
         int receiverIndex = dis.readInt();
-        PartitionId pid = new PartitionId(new JobId(jobId), new 
ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+        PartitionId pid = new PartitionId(new JobId(jobId), new 
ConnectorDescriptorId(cdid), senderIndex,
+                receiverIndex);
         return pid;
     }
 
@@ -1412,8 +1413,8 @@
         int aid = dis.readInt();
         int partition = dis.readInt();
         int attempt = dis.readInt();
-        TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new 
OperatorDescriptorId(odid), aid),
-                partition), attempt);
+        TaskAttemptId taId = new TaskAttemptId(
+                new TaskId(new ActivityId(new OperatorDescriptorId(odid), 
aid), partition), attempt);
         return taId;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 83ef32b..98d258f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,7 +18,11 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
+
+import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -33,141 +37,153 @@
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
-import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
-public class ClusterControllerRemoteProxy implements IClusterController {
-    private final IIPCHandle ipcHandle;
+public class ClusterControllerRemoteProxy extends ControllerRemoteProxy 
implements IClusterController {
+    private static final Logger LOGGER = 
Logger.getLogger(ClusterControllerRemoteProxy.class.getName());
 
-    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
-        this.ipcHandle = ipcHandle;
+    private final int clusterConnectRetries;
+
+    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress, int clusterConnectRetries) {
+        super(ipc, inetSocketAddress);
+        this.clusterConnectRetries = clusterConnectRetries;
+    }
+
+    @Override
+    protected int getRetries(boolean first) {
+        return first ? clusterConnectRetries : 0;
+    }
+
+    @Override
+    protected Logger getLogger() {
+        return LOGGER;
     }
 
     @Override
     public void registerNode(NodeRegistration reg) throws Exception {
-        CCNCFunctions.RegisterNodeFunction fn = new 
CCNCFunctions.RegisterNodeFunction(reg);
-        ipcHandle.send(-1, fn, null);
+        RegisterNodeFunction fn = new RegisterNodeFunction(reg);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
-        CCNCFunctions.UnregisterNodeFunction fn = new 
CCNCFunctions.UnregisterNodeFunction(nodeId);
-        ipcHandle.send(-1, fn, null);
+        UnregisterNodeFunction fn = new UnregisterNodeFunction(nodeId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String 
nodeId, TaskProfile statistics)
             throws Exception {
-        CCNCFunctions.NotifyTaskCompleteFunction fn = new 
CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
+        NotifyTaskCompleteFunction fn = new NotifyTaskCompleteFunction(jobId, 
taskId,
                 nodeId, statistics);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String 
nodeId, List<Exception> exceptions)
             throws Exception {
-        CCNCFunctions.NotifyTaskFailureFunction fn = new 
CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
+        NotifyTaskFailureFunction fn = new NotifyTaskFailureFunction(jobId, 
taskId, nodeId,
                 exceptions);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws 
Exception {
-        CCNCFunctions.NotifyJobletCleanupFunction fn = new 
CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        NotifyJobletCleanupFunction fn = new 
NotifyJobletCleanupFunction(jobId, nodeId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, 
DeploymentStatus status) throws Exception {
-        CCNCFunctions.NotifyDeployBinaryFunction fn = new 
CCNCFunctions.NotifyDeployBinaryFunction(deploymentId,
-                nodeId, status);
-        ipcHandle.send(-1, fn, null);
+        NotifyDeployBinaryFunction fn = new 
NotifyDeployBinaryFunction(deploymentId, nodeId,
+                status);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws 
Exception {
-        CCNCFunctions.NodeHeartbeatFunction fn = new 
CCNCFunctions.NodeHeartbeatFunction(id, hbData);
-        ipcHandle.send(-1, fn, null);
+        NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws 
Exception {
-        CCNCFunctions.ReportProfileFunction fn = new 
CCNCFunctions.ReportProfileFunction(id, profiles);
-        ipcHandle.send(-1, fn, null);
+        ReportProfileFunction fn = new ReportProfileFunction(id, profiles);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor 
partitionDescriptor) throws Exception {
-        CCNCFunctions.RegisterPartitionProviderFunction fn = new 
CCNCFunctions.RegisterPartitionProviderFunction(
+        RegisterPartitionProviderFunction fn = new 
RegisterPartitionProviderFunction(
                 partitionDescriptor);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) 
throws Exception {
-        CCNCFunctions.RegisterPartitionRequestFunction fn = new 
CCNCFunctions.RegisterPartitionRequestFunction(
+        RegisterPartitionRequestFunction fn = new 
RegisterPartitionRequestFunction(
                 partitionRequest);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void sendApplicationMessageToCC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception {
-        CCNCFunctions.SendApplicationMessageFunction fn = new 
CCNCFunctions.SendApplicationMessageFunction(data,
+        SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
boolean orderedResult,
-                                                boolean emptyResult, int 
partition, int nPartitions,
-                                                NetworkAddress networkAddress) 
throws Exception {
-        CCNCFunctions.RegisterResultPartitionLocationFunction fn =
-                new 
CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, 
orderedResult, emptyResult,
-                        partition, nPartitions, networkAddress);
-        ipcHandle.send(-1, fn, null);
+            boolean emptyResult, int partition, int nPartitions, 
NetworkAddress networkAddress) throws Exception {
+        RegisterResultPartitionLocationFunction fn = new 
RegisterResultPartitionLocationFunction(
+                jobId, rsId, orderedResult, emptyResult, partition, 
nPartitions, networkAddress);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition) throws Exception {
-        CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn =
-                new 
CCNCFunctions.ReportResultPartitionWriteCompletionFunction(jobId, rsId, 
partition);
-        ipcHandle.send(-1, fn, null);
+        ReportResultPartitionWriteCompletionFunction fn = new 
ReportResultPartitionWriteCompletionFunction(
+                jobId, rsId, partition);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, 
int partition) throws Exception {
-        CCNCFunctions.ReportResultPartitionFailureFunction fn =
-                new CCNCFunctions.ReportResultPartitionFailureFunction(jobId, 
rsId, partition);
-        ipcHandle.send(-1, fn, null);
+        ReportResultPartitionFailureFunction fn = new 
ReportResultPartitionFailureFunction(
+                jobId, rsId, partition);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws 
Exception {
-        CCNCFunctions.ReportDistributedJobFailureFunction fn =
-                new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, 
nodeId);
-        ipcHandle.send(-1, fn, null);
+        ReportDistributedJobFailureFunction fn = new 
ReportDistributedJobFailureFunction(
+                jobId, nodeId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void getNodeControllerInfos() throws Exception {
-        ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), 
null);
+        ensureIpcHandle().send(-1, new GetNodeControllersInfoFunction(), null);
     }
 
     @Override
     public void notifyStateDump(String nodeId, String stateDumpId, String 
state) throws Exception {
-        CCNCFunctions.StateDumpResponseFunction fn =
-                new CCNCFunctions.StateDumpResponseFunction(nodeId, 
stateDumpId, state);
-        ipcHandle.send(-1, fn, null);
+        StateDumpResponseFunction fn = new StateDumpResponseFunction(nodeId, 
stateDumpId,
+                state);
+        ensureIpcHandle().send(-1, fn, null);
     }
+
     @Override
-    public void notifyShutdown(String nodeId) throws Exception{
-        CCNCFunctions.ShutdownResponseFunction sdrf = new 
CCNCFunctions.ShutdownResponseFunction(nodeId);
-        ipcHandle.send(-1, sdrf, null);
+    public void notifyShutdown(String nodeId) throws Exception {
+        ShutdownResponseFunction sdrf = new ShutdownResponseFunction(nodeId);
+        ensureIpcHandle().send(-1, sdrf, null);
     }
 
     @Override
     public void notifyThreadDump(String nodeId, String requestId, String 
threadDumpJSON) throws Exception {
-        CCNCFunctions.ThreadDumpResponseFunction tdrf =
-                new CCNCFunctions.ThreadDumpResponseFunction(nodeId, 
requestId, threadDumpJSON);
-        ipcHandle.send(-1, tdrf, null);
+        ThreadDumpResponseFunction tdrf = new 
ThreadDumpResponseFunction(nodeId, requestId,
+                threadDumpJSON);
+        ensureIpcHandle().send(-1, tdrf, null);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
new file mode 100644
index 0000000..44b0e4a
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.ipc;
+
+import java.net.InetSocketAddress;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public abstract class ControllerRemoteProxy {
+    protected final IPCSystem ipc;
+    protected final InetSocketAddress inetSocketAddress;
+    private IIPCHandle ipcHandle;
+
+    protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress) {
+        this.ipc = ipc;
+        this.inetSocketAddress = inetSocketAddress;
+    }
+
+    protected IIPCHandle ensureIpcHandle() throws IPCException {
+        final boolean first = ipcHandle == null;
+        if (first || !ipcHandle.isConnected()) {
+            if (!first) {
+                getLogger().warning("ipcHandle " + ipcHandle + " disconnected; 
retrying connection");
+            }
+            ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
+            if (!first && ipcHandle.isConnected()) {
+                getLogger().warning("ipcHandle " + ipcHandle + " restored");
+            }
+        }
+        return ipcHandle;
+    }
+
+    protected abstract int getRetries(boolean first);
+
+    protected abstract Logger getLogger();
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2a8464e..68a5b76 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
+
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -34,89 +38,99 @@
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
-public class NodeControllerRemoteProxy implements INodeController {
-    private final IIPCHandle ipcHandle;
+public class NodeControllerRemoteProxy extends ControllerRemoteProxy 
implements INodeController {
+    private static final Logger LOGGER = 
Logger.getLogger(NodeControllerRemoteProxy.class.getName());
 
-    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
-        this.ipcHandle = ipcHandle;
+    public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress) {
+        super(ipc, inetSocketAddress);
+    }
+
+    @Override
+    protected int getRetries(boolean first) {
+        return 0;
+    }
+
+    @Override
+    protected Logger getLogger() {
+        return LOGGER;
     }
 
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] 
planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, 
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
             Set<JobFlag> flags) throws Exception {
-        CCNCFunctions.StartTasksFunction stf = new 
CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
+        StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, 
planBytes,
                 taskDescriptors, connectorPolicies, flags);
-        ipcHandle.send(-1, stf, null);
+        ensureIpcHandle().send(-1, stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws 
Exception {
-        CCNCFunctions.AbortTasksFunction atf = new 
CCNCFunctions.AbortTasksFunction(jobId, tasks);
-        ipcHandle.send(-1, atf, null);
+        AbortTasksFunction atf = new AbortTasksFunction(jobId, tasks);
+        ensureIpcHandle().send(-1, atf, null);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
-        CCNCFunctions.CleanupJobletFunction cjf = new 
CCNCFunctions.CleanupJobletFunction(jobId, status);
-        ipcHandle.send(-1, cjf, null);
+        CleanupJobletFunction cjf = new CleanupJobletFunction(jobId, status);
+        ensureIpcHandle().send(-1, cjf, null);
     }
 
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress 
networkAddress) throws Exception {
-        CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new 
CCNCFunctions.ReportPartitionAvailabilityFunction(
+        ReportPartitionAvailabilityFunction rpaf = new 
ReportPartitionAvailabilityFunction(
                 pid, networkAddress);
-        ipcHandle.send(-1, rpaf, null);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) 
throws Exception {
-        CCNCFunctions.DeployBinaryFunction rpaf = new 
CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
-        ipcHandle.send(-1, rpaf, null);
+        DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, 
binaryURLs);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void undeployBinary(DeploymentId deploymentId) throws Exception {
-        CCNCFunctions.UnDeployBinaryFunction rpaf = new 
CCNCFunctions.UnDeployBinaryFunction(deploymentId);
-        ipcHandle.send(-1, rpaf, null);
+        UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
-        CCNCFunctions.DistributeJobFunction fn = new 
CCNCFunctions.DistributeJobFunction(jobId, planBytes);
-        ipcHandle.send(-1, fn, null);
+        DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void destroyJob(JobId jobId) throws Exception {
-        CCNCFunctions.DestroyJobFunction fn = new 
CCNCFunctions.DestroyJobFunction(jobId);
-        ipcHandle.send(-1, fn, null);
+        DestroyJobFunction fn = new DestroyJobFunction(jobId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void dumpState(String stateDumpId) throws Exception {
-        CCNCFunctions.StateDumpRequestFunction dsf = new 
CCNCFunctions.StateDumpRequestFunction(stateDumpId);
-        ipcHandle.send(-1, dsf, null);
+        StateDumpRequestFunction dsf = new 
StateDumpRequestFunction(stateDumpId);
+        ensureIpcHandle().send(-1, dsf, null);
     }
 
     @Override
     public void shutdown(boolean terminateNCService) throws Exception {
-        CCNCFunctions.ShutdownRequestFunction sdrf = new 
CCNCFunctions.ShutdownRequestFunction(terminateNCService);
-        ipcHandle.send(-1, sdrf, null);
+        ShutdownRequestFunction sdrf = new 
ShutdownRequestFunction(terminateNCService);
+        ensureIpcHandle().send(-1, sdrf, null);
     }
 
     @Override
     public void sendApplicationMessageToNC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception {
-        CCNCFunctions.SendApplicationMessageFunction fn = new 
CCNCFunctions.SendApplicationMessageFunction(data,
+        SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void takeThreadDump(String requestId) throws Exception {
-        CCNCFunctions.ThreadDumpRequestFunction fn = new 
CCNCFunctions.ThreadDumpRequestFunction(requestId);
-        ipcHandle.send(-1, fn, null);
+        ThreadDumpRequestFunction fn = new 
ThreadDumpRequestFunction(requestId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2fe0e27..0587a55 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -80,7 +80,6 @@
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
-import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -279,10 +278,9 @@
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-        IIPCHandle ccIPCHandle = ipc.getHandle(
+        this.ccs = new ClusterControllerRemoteProxy(ipc,
                 new InetSocketAddress(ncConfig.getClusterAddress(), 
ncConfig.getClusterPort()),
                 ncConfig.getClusterConnectRetries());
-        this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new 
HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index fe2bcae..9efd70e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -99,7 +99,7 @@
         while (true) {
             synchronized (this) {
                 handle = ipcHandleMap.get(remoteAddress);
-                if (handle == null) {
+                if (handle == null || !handle.isConnected()) {
                     handle = new IPCHandle(system, remoteAddress);
                     pendingConnections.add(handle);
                     networkThread.selector.wakeup();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1828
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I069dcd59898021054462c8213fb623df2deec598
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: Yingyi Bu <[email protected]>

Reply via email to