Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2284

Change subject: [NO ISSUE] Refactor IPC reconnect logic to be usable by all IPC 
connections
......................................................................

[NO ISSUE] Refactor IPC reconnect logic to be usable by all IPC connections

Change-Id: I2430510b22f936b89879df98322ef51ec87c6da6
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M 
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
D 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
R 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
C 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
A 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
12 files changed, 168 insertions(+), 182 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/84/2284/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 85ef927..80b61f4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -83,7 +83,8 @@
         RPCInterface rpci = new RPCInterface();
         ipc = new IPCSystem(new InetSocketAddress(0), rpci, new 
JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
-        hci = new HyracksClientInterfaceRemoteProxy(ipc.getHandle(new 
InetSocketAddress(ccHost, ccPort)), rpci);
+        hci = new 
HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new 
InetSocketAddress(ccHost, ccPort)),
+                rpci);
         ccInfo = hci.getClusterControllerInfo();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 075747d..63139d9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -40,7 +40,7 @@
         RPCInterface rpci = new RPCInterface();
         ipc = new IPCSystem(new InetSocketAddress(0), rpci, new 
JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
-        IIPCHandle ddsIpchandle = ipc.getHandle(new InetSocketAddress(ddsHost, 
ddsPort));
+        IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new 
InetSocketAddress(ddsHost, ddsPort));
         this.ddsi = new 
HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 590a0f3..712d2ec 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -101,6 +101,7 @@
             removeDeadNode(nodeId);
         } else {
             try {
+                // TODO(mblow): it seems we should close IPC handles when 
we're done with them (like here)
                 IIPCHandle ncIPCHandle = 
ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
                 ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
             } catch (IPCException e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 07b0f04..3a38287 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -50,13 +50,15 @@
     @Override
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
+        // TODO(mblow): it seems we should close IPC handles when we're done 
with them (like here)
         IIPCHandle ncIPCHandle = 
ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
         CCNCFunctions.NodeRegistrationResult result;
         Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
             LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
             NodeControllerRemoteProxy nc =
-                    new NodeControllerRemoteProxy(ccs.getClusterIPC(), 
reg.getNodeControllerAddress());
+                    new NodeControllerRemoteProxy(
+                            
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
             NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 447d678..f2e7d87 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
-import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -53,42 +52,26 @@
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.ipc.api.IIPCHandle;
 
-public class ClusterControllerRemoteProxy extends ControllerRemoteProxy 
implements IClusterController {
-    private static final Logger LOGGER = LogManager.getLogger();
+public class ClusterControllerRemoteProxy implements IClusterController {
 
-    private final int clusterConnectRetries;
+    private IIPCHandle ipcHandle;
 
-    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress, int clusterConnectRetries,
-                                        IControllerRemoteProxyIPCEventListener 
eventListener) {
-        super(ipc, inetSocketAddress, eventListener);
-        this.clusterConnectRetries = clusterConnectRetries;
-    }
-
-    @Override
-    protected int getMaxRetries(boolean first) {
-        // -1 == retry forever
-        return first ? clusterConnectRetries : 0;
-    }
-
-    @Override
-    protected Logger getLogger() {
-        return LOGGER;
+    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
     }
 
     @Override
     public void registerNode(NodeRegistration reg) throws Exception {
         RegisterNodeFunction fn = new RegisterNodeFunction(reg);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
         UnregisterNodeFunction fn = new UnregisterNodeFunction(nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
@@ -96,7 +79,7 @@
             throws Exception {
         NotifyTaskCompleteFunction fn = new NotifyTaskCompleteFunction(jobId, 
taskId,
                 nodeId, statistics);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
@@ -104,53 +87,53 @@
             throws Exception {
         NotifyTaskFailureFunction fn = new NotifyTaskFailureFunction(jobId, 
taskId, nodeId,
                 exceptions);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws 
Exception {
         NotifyJobletCleanupFunction fn = new 
NotifyJobletCleanupFunction(jobId, nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, 
DeploymentStatus status) throws Exception {
         NotifyDeployBinaryFunction fn = new 
NotifyDeployBinaryFunction(deploymentId, nodeId,
                 status);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws 
Exception {
         NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
-        ensureIpcHandle(0).send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws 
Exception {
         ReportProfileFunction fn = new ReportProfileFunction(id, profiles);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor 
partitionDescriptor) throws Exception {
         RegisterPartitionProviderFunction fn = new 
RegisterPartitionProviderFunction(
                 partitionDescriptor);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) 
throws Exception {
         RegisterPartitionRequestFunction fn = new 
RegisterPartitionRequestFunction(
                 partitionRequest);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void sendApplicationMessageToCC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception {
         SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
@@ -158,44 +141,44 @@
             boolean emptyResult, int partition, int nPartitions, 
NetworkAddress networkAddress) throws Exception {
         RegisterResultPartitionLocationFunction fn = new 
RegisterResultPartitionLocationFunction(
                 jobId, rsId, orderedResult, emptyResult, partition, 
nPartitions, networkAddress);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition) throws Exception {
         ReportResultPartitionWriteCompletionFunction fn = new 
ReportResultPartitionWriteCompletionFunction(
                 jobId, rsId, partition);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyDeployedJobSpecFailure(DeployedJobSpecId 
deployedJobSpecId, String nodeId) throws Exception {
         ReportDeployedJobSpecFailureFunction fn = new 
ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void getNodeControllerInfos() throws Exception {
-        ensureIpcHandle().send(-1, new GetNodeControllersInfoFunction(), null);
+        ipcHandle.send(-1, new GetNodeControllersInfoFunction(), null);
     }
 
     @Override
     public void notifyStateDump(String nodeId, String stateDumpId, String 
state) throws Exception {
         StateDumpResponseFunction fn = new StateDumpResponseFunction(nodeId, 
stateDumpId,
                 state);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyShutdown(String nodeId) throws Exception {
         ShutdownResponseFunction sdrf = new ShutdownResponseFunction(nodeId);
-        ensureIpcHandle().send(-1, sdrf, null);
+        ipcHandle.send(-1, sdrf, null);
     }
 
     @Override
     public void notifyThreadDump(String nodeId, String requestId, String 
threadDumpJSON) throws Exception {
         ThreadDumpResponseFunction tdrf = new 
ThreadDumpResponseFunction(nodeId, requestId,
                 threadDumpJSON);
-        ensureIpcHandle().send(-1, tdrf, null);
+        ipcHandle.send(-1, tdrf, null);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
deleted file mode 100644
index fe9e85a..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.ipc;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.exceptions.IPCException;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.Logger;
-
-public abstract class ControllerRemoteProxy {
-    protected final IPCSystem ipc;
-    private final InetSocketAddress inetSocketAddress;
-    private final IControllerRemoteProxyIPCEventListener eventListener;
-    private IIPCHandle ipcHandle;
-
-    protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress) {
-        this(ipc, inetSocketAddress, null);
-    }
-
-    protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress,
-            IControllerRemoteProxyIPCEventListener eventListener) {
-        this.ipc = ipc;
-        this.inetSocketAddress = inetSocketAddress;
-        this.eventListener = eventListener == null ? new 
IControllerRemoteProxyIPCEventListener() {
-        } : eventListener;
-    }
-
-    protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
-        return ensureIpcHandle(getMaxRetries(ipcHandle == null));
-    }
-
-    protected IIPCHandle ensureIpcHandle(int maxRetries) throws 
HyracksDataException {
-        if (ipcHandle != null && ipcHandle.isConnected()) {
-            return ipcHandle;
-        }
-        try {
-            final boolean first = ipcHandle == null;
-            if (!first) {
-                getLogger().warn("ipcHandle " + ipcHandle + " disconnected; 
retrying connection");
-                eventListener.ipcHandleDisconnected(ipcHandle);
-            }
-            ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries);
-            if (first) {
-                eventListener.ipcHandleConnected(ipcHandle);
-            } else {
-                getLogger().warn("ipcHandle " + ipcHandle + " restored");
-                eventListener.ipcHandleRestored(ipcHandle);
-            }
-        } catch (IPCException e) {
-            throw HyracksDataException.create(e);
-        }
-        return ipcHandle;
-    }
-
-    /**
-     * Maximum number of times to retry a failed connection attempt
-     * @param first true if the initial connection attempt (i.e. server start)
-     * @return the maximum number of retries, if any.  <0 means retry forever
-     */
-    protected abstract int getMaxRetries(boolean first);
-
-    protected abstract Logger getLogger();
-
-    public InetSocketAddress getAddress() {
-        return inetSocketAddress;
-    }
-}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index b4aaf45..b6b9b4b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -48,26 +48,13 @@
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.ipc.api.IIPCHandle;
 
-public class NodeControllerRemoteProxy extends ControllerRemoteProxy 
implements INodeController {
-    private static final Logger LOGGER = LogManager.getLogger();
+public class NodeControllerRemoteProxy implements INodeController {
+    private final IIPCHandle ipcHandle;
 
-    public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress 
inetSocketAddress) {
-        super(ipc, inetSocketAddress);
-    }
-
-    @Override
-    protected int getMaxRetries(boolean first) {
-        // -1 == retry forever
-        return 0;
-    }
-
-    @Override
-    protected Logger getLogger() {
-        return LOGGER;
+    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
     }
 
     @Override
@@ -77,74 +64,78 @@
             throws Exception {
         StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, 
planBytes,
                 taskDescriptors, connectorPolicies, flags, jobParameters, 
deployedJobSpecId);
-        ensureIpcHandle().send(-1, stf, null);
+        ipcHandle.send(-1, stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws 
Exception {
         AbortTasksFunction atf = new AbortTasksFunction(jobId, tasks);
-        ensureIpcHandle().send(-1, atf, null);
+        ipcHandle.send(-1, atf, null);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
         CleanupJobletFunction cjf = new CleanupJobletFunction(jobId, status);
-        ensureIpcHandle().send(-1, cjf, null);
+        ipcHandle.send(-1, cjf, null);
     }
 
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress 
networkAddress) throws Exception {
         ReportPartitionAvailabilityFunction rpaf = new 
ReportPartitionAvailabilityFunction(
                 pid, networkAddress);
-        ensureIpcHandle().send(-1, rpaf, null);
+        ipcHandle.send(-1, rpaf, null);
     }
 
     @Override
     public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) 
throws Exception {
         DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, 
binaryURLs);
-        ensureIpcHandle().send(-1, rpaf, null);
+        ipcHandle.send(-1, rpaf, null);
     }
 
     @Override
     public void undeployBinary(DeploymentId deploymentId) throws Exception {
         UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
-        ensureIpcHandle().send(-1, rpaf, null);
+        ipcHandle.send(-1, rpaf, null);
     }
 
     @Override
     public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] 
planBytes) throws Exception {
         DeployJobSpecFunction fn = new 
DeployJobSpecFunction(deployedJobSpecId, planBytes);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws 
Exception {
         UndeployJobSpecFunction fn = new 
UndeployJobSpecFunction(deployedJobSpecId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void dumpState(String stateDumpId) throws Exception {
         StateDumpRequestFunction dsf = new 
StateDumpRequestFunction(stateDumpId);
-        ensureIpcHandle().send(-1, dsf, null);
+        ipcHandle.send(-1, dsf, null);
     }
 
     @Override
     public void shutdown(boolean terminateNCService) throws Exception {
         ShutdownRequestFunction sdrf = new 
ShutdownRequestFunction(terminateNCService);
-        ensureIpcHandle().send(-1, sdrf, null);
+        ipcHandle.send(-1, sdrf, null);
     }
 
     @Override
     public void sendApplicationMessageToNC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception {
         SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void takeThreadDump(String requestId) throws Exception {
         ThreadDumpRequestFunction fn = new 
ThreadDumpRequestFunction(requestId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    public InetSocketAddress getAddress() {
+        return ipcHandle.getRemoteAddress();
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 01e34c1..18a6b20 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -68,7 +68,6 @@
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import 
org.apache.hyracks.control.common.ipc.IControllerRemoteProxyIPCEventListener;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.work.FutureValue;
 import org.apache.hyracks.control.common.work.WorkQueue;
@@ -83,6 +82,7 @@
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
+import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -297,9 +297,10 @@
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-        this.ccs = new ClusterControllerRemoteProxy(ipc,
+        this.ccs = new ClusterControllerRemoteProxy(
+                ipc.getHandle(
                 new InetSocketAddress(ncConfig.getClusterAddress(), 
ncConfig.getClusterPort()),
-                ncConfig.getClusterConnectRetries(), new 
IControllerRemoteProxyIPCEventListener() {
+                ncConfig.getClusterConnectRetries(), 1, new 
IIPCEventListener() {
                     @Override
                     public void ipcHandleRestored(IIPCHandle handle) throws 
IPCException {
                         // we need to re-register in case of NC -> CC 
connection reset
@@ -310,7 +311,7 @@
                             throw new IPCException(e);
                         }
                     }
-                });
+                }));
         registerNode();
 
         workQueue.start();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
similarity index 88%
rename from 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
rename to 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
index ec4f9e4..a6ba545 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.control.common.ipc;
+package org.apache.hyracks.ipc.api;
 
-import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 
-public interface IControllerRemoteProxyIPCEventListener {
+public interface IIPCEventListener {
 
     default void ipcHandleConnected(IIPCHandle handle) throws IPCException {
         // no-op
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index 8e38651..b36e645 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
@@ -70,6 +71,28 @@
     }
 
     public IIPCHandle getHandle(InetSocketAddress remoteAddress, int 
maxRetries) throws IPCException {
+        return getHandle(remoteAddress, maxRetries, 0);
+    }
+
+    public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress) 
throws IPCException {
+        return getReconnectingHandle(remoteAddress, 1);
+    }
+
+    public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress, 
int reconnectAttempts)
+            throws IPCException {
+        return getHandle(remoteAddress, 0, reconnectAttempts, 
NoOpIPCEventListener.INSTANCE);
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int 
maxRetries, int reconnectAttempts)
+            throws IPCException {
+        return getHandle(remoteAddress, maxRetries, reconnectAttempts, 
NoOpIPCEventListener.INSTANCE);
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int 
maxRetries, int reconnectAttempts,
+            IIPCEventListener eventListener) throws IPCException {
+        if (reconnectAttempts > 0) {
+            return new ReconnectingIPCHandle(this, eventListener, 
remoteAddress, maxRetries, reconnectAttempts);
+        }
         try {
             return cMgr.getIPCHandle(remoteAddress, maxRetries);
         } catch (IOException e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
similarity index 61%
copy from 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
copy to 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
index ec4f9e4..dfefd31 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
@@ -16,22 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.control.common.ipc;
+package org.apache.hyracks.ipc.impl;
 
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.api.IIPCEventListener;
 
-public interface IControllerRemoteProxyIPCEventListener {
-
-    default void ipcHandleConnected(IIPCHandle handle) throws IPCException {
-        // no-op
-    }
-
-    default void ipcHandleDisconnected(IIPCHandle handle) throws IPCException {
-        // no-op
-    }
-
-    default void ipcHandleRestored(IIPCHandle handle) throws IPCException {
-        // no-op
-    }
+public class NoOpIPCEventListener implements IIPCEventListener {
+    public static final IIPCEventListener INSTANCE = new 
NoOpIPCEventListener();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
new file mode 100644
index 0000000..f0da860
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.impl;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.ipc.api.IIPCEventListener;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+class ReconnectingIPCHandle implements IIPCHandle {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final IPCSystem ipc;
+    private final int reconnectAttempts;
+    private final IIPCEventListener listener;
+    private IIPCHandle delegate;
+
+    ReconnectingIPCHandle(IPCSystem ipc, IIPCEventListener listener, 
InetSocketAddress remoteAddress, int maxRetries,
+            int reconnectAttempts) throws IPCException {
+        this.ipc = ipc;
+        this.listener = listener;
+        this.reconnectAttempts = reconnectAttempts;
+        this.delegate = ipc.getHandle(remoteAddress, maxRetries);
+        listener.ipcHandleConnected(delegate);
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return delegate.getRemoteAddress();
+    }
+
+    @Override
+    public long send(long requestId, Object payload, Exception exception) 
throws IPCException {
+        return ensureConnected().send(requestId, payload, exception);
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        delegate.setAttachment(attachment);
+    }
+
+    @Override
+    public Object getAttachment() {
+        return delegate.getAttachment();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return delegate.isConnected();
+    }
+
+    private IIPCHandle ensureConnected() throws IPCException {
+        if (delegate.isConnected()) {
+            return delegate;
+        }
+        listener.ipcHandleDisconnected(delegate);
+        delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts);
+        LOGGER.warn("ipcHandle " + delegate + " restored");
+        listener.ipcHandleRestored(delegate);
+
+        return delegate;
+    }
+
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2284
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2430510b22f936b89879df98322ef51ec87c6da6
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to