Michael Blow has submitted this change and it was merged. Change subject: [NO ISSUE] Refactor IPC reconnect logic to be usable by all IPC connections ......................................................................
[NO ISSUE] Refactor IPC reconnect logic to be usable by all IPC connections Change-Id: I2430510b22f936b89879df98322ef51ec87c6da6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2284 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java D hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java R hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java C hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java A hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java 14 files changed, 189 insertions(+), 182 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 8a8342e..83a40f0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -45,7 +45,6 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.dataset.ResultSetId; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.ipc.exceptions.IPCException; @@ -154,7 +153,7 @@ @Override protected void handleExecuteStatementException(Throwable t, RequestExecutionState execution) { if (t instanceof TimeoutException - || (t instanceof HyracksDataException && ExceptionUtils.getRootCause(t) instanceof IPCException)) { + || ExceptionUtils.matchingCause(t, candidate -> candidate instanceof IPCException)) { GlobalConfig.ASTERIX_LOGGER.log(Level.WARN, t.toString(), t); execution.setStatus(ResultStatus.FAILED, HttpResponseStatus.SERVICE_UNAVAILABLE); } else { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java index 3105b3f..256ce08 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.common.exceptions; +import java.util.function.Predicate; + public class ExceptionUtils { public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n"; public static final String PARAMETER_NAME = "Parameter name: "; @@ -49,4 +51,21 @@ } return current; } + + /** + * Determines whether supplied exception contains a matching cause in its hierarchy, or is itself a match + */ + public static boolean matchingCause(Throwable e, Predicate<Throwable> test) { + Throwable current = e; + Throwable cause = e.getCause(); + while (cause != null && cause != current) { + if (test.test(cause)) { + return true; + } + Throwable nextCause = current.getCause(); + current = cause; + cause = nextCause; + } + return test.test(e); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 85ef927..80b61f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -83,7 +83,8 @@ RPCInterface rpci = new RPCInterface(); ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer()); ipc.start(); - hci = new HyracksClientInterfaceRemoteProxy(ipc.getHandle(new InetSocketAddress(ccHost, ccPort)), rpci); + hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)), + rpci); ccInfo = hci.getClusterControllerInfo(); } diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java index 075747d..63139d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java @@ -40,7 +40,7 @@ RPCInterface rpci = new RPCInterface(); ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer()); ipc.start(); - IIPCHandle ddsIpchandle = ipc.getHandle(new InetSocketAddress(ddsHost, ddsPort)); + IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new InetSocketAddress(ddsHost, ddsPort)); this.ddsi = new HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 590a0f3..712d2ec 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -101,6 +101,7 @@ removeDeadNode(nodeId); } else { try { + // TODO(mblow): it seems we should close IPC handles when we're done with them (like here) IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress()); ncIPCHandle.send(-1, new AbortCCJobsFunction(), null); } catch (IPCException e) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index 07b0f04..3a38287 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -50,13 +50,15 @@ @Override protected void doRun() throws Exception { String id = reg.getNodeId(); + // TODO(mblow): it seems we should close IPC handles when we're done with them (like here) IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress()); CCNCFunctions.NodeRegistrationResult result; Map<IOption, Object> ncConfiguration = new HashMap<>(); try { LOGGER.log(Level.WARN, "Registering INodeController: id = " + id); NodeControllerRemoteProxy nc = - new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress()); + new NodeControllerRemoteProxy( + ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress())); NodeControllerState state = new NodeControllerState(nc, reg); INodeManager nodeManager = ccs.getNodeManager(); nodeManager.addNode(id, state); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 447d678..f2e7d87 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.control.common.ipc; -import java.net.InetSocketAddress; import java.util.List; import org.apache.hyracks.api.comm.NetworkAddress; @@ -53,42 +52,26 @@ import org.apache.hyracks.control.common.job.PartitionRequest; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; -import org.apache.hyracks.ipc.impl.IPCSystem; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.hyracks.ipc.api.IIPCHandle; -public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implements IClusterController { - private static final Logger LOGGER = LogManager.getLogger(); +public class ClusterControllerRemoteProxy implements IClusterController { - private final int clusterConnectRetries; + private IIPCHandle ipcHandle; - public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries, - IControllerRemoteProxyIPCEventListener eventListener) { - super(ipc, inetSocketAddress, eventListener); - this.clusterConnectRetries = clusterConnectRetries; - } - - @Override - protected int getMaxRetries(boolean first) { - // -1 == retry forever - return first ? clusterConnectRetries : 0; - } - - @Override - protected Logger getLogger() { - return LOGGER; + public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) { + this.ipcHandle = ipcHandle; } @Override public void registerNode(NodeRegistration reg) throws Exception { RegisterNodeFunction fn = new RegisterNodeFunction(reg); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void unregisterNode(String nodeId) throws Exception { UnregisterNodeFunction fn = new UnregisterNodeFunction(nodeId); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override @@ -96,7 +79,7 @@ throws Exception { NotifyTaskCompleteFunction fn = new NotifyTaskCompleteFunction(jobId, taskId, nodeId, statistics); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override @@ -104,53 +87,53 @@ throws Exception { NotifyTaskFailureFunction fn = new NotifyTaskFailureFunction(jobId, taskId, nodeId, exceptions); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception { NotifyJobletCleanupFunction fn = new NotifyJobletCleanupFunction(jobId, nodeId); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception { NotifyDeployBinaryFunction fn = new NotifyDeployBinaryFunction(deploymentId, nodeId, status); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception { NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData); - ensureIpcHandle(0).send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void reportProfile(String id, List<JobProfile> profiles) throws Exception { ReportProfileFunction fn = new ReportProfileFunction(id, profiles); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception { RegisterPartitionProviderFunction fn = new RegisterPartitionProviderFunction( partitionDescriptor); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception { RegisterPartitionRequestFunction fn = new RegisterPartitionRequestFunction( partitionRequest); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception { SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override @@ -158,44 +141,44 @@ boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception { RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction( jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception { ReportResultPartitionWriteCompletionFunction fn = new ReportResultPartitionWriteCompletionFunction( jobId, rsId, partition); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception { ReportDeployedJobSpecFailureFunction fn = new ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void getNodeControllerInfos() throws Exception { - ensureIpcHandle().send(-1, new GetNodeControllersInfoFunction(), null); + ipcHandle.send(-1, new GetNodeControllersInfoFunction(), null); } @Override public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception { StateDumpResponseFunction fn = new StateDumpResponseFunction(nodeId, stateDumpId, state); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void notifyShutdown(String nodeId) throws Exception { ShutdownResponseFunction sdrf = new ShutdownResponseFunction(nodeId); - ensureIpcHandle().send(-1, sdrf, null); + ipcHandle.send(-1, sdrf, null); } @Override public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception { ThreadDumpResponseFunction tdrf = new ThreadDumpResponseFunction(nodeId, requestId, threadDumpJSON); - ensureIpcHandle().send(-1, tdrf, null); + ipcHandle.send(-1, tdrf, null); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java deleted file mode 100644 index fe9e85a..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.common.ipc; - -import java.net.InetSocketAddress; - -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.ipc.api.IIPCHandle; -import org.apache.hyracks.ipc.exceptions.IPCException; -import org.apache.hyracks.ipc.impl.IPCSystem; -import org.apache.logging.log4j.Logger; - -public abstract class ControllerRemoteProxy { - protected final IPCSystem ipc; - private final InetSocketAddress inetSocketAddress; - private final IControllerRemoteProxyIPCEventListener eventListener; - private IIPCHandle ipcHandle; - - protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) { - this(ipc, inetSocketAddress, null); - } - - protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, - IControllerRemoteProxyIPCEventListener eventListener) { - this.ipc = ipc; - this.inetSocketAddress = inetSocketAddress; - this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() { - } : eventListener; - } - - protected IIPCHandle ensureIpcHandle() throws HyracksDataException { - return ensureIpcHandle(getMaxRetries(ipcHandle == null)); - } - - protected IIPCHandle ensureIpcHandle(int maxRetries) throws HyracksDataException { - if (ipcHandle != null && ipcHandle.isConnected()) { - return ipcHandle; - } - try { - final boolean first = ipcHandle == null; - if (!first) { - getLogger().warn("ipcHandle " + ipcHandle + " disconnected; retrying connection"); - eventListener.ipcHandleDisconnected(ipcHandle); - } - ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries); - if (first) { - eventListener.ipcHandleConnected(ipcHandle); - } else { - getLogger().warn("ipcHandle " + ipcHandle + " restored"); - eventListener.ipcHandleRestored(ipcHandle); - } - } catch (IPCException e) { - throw HyracksDataException.create(e); - } - return ipcHandle; - } - - /** - * Maximum number of times to retry a failed connection attempt - * @param first true if the initial connection attempt (i.e. server start) - * @return the maximum number of retries, if any. <0 means retry forever - */ - protected abstract int getMaxRetries(boolean first); - - protected abstract Logger getLogger(); - - public InetSocketAddress getAddress() { - return inetSocketAddress; - } -} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index b4aaf45..b6b9b4b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -48,26 +48,13 @@ import org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction; import org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; -import org.apache.hyracks.ipc.impl.IPCSystem; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.hyracks.ipc.api.IIPCHandle; -public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements INodeController { - private static final Logger LOGGER = LogManager.getLogger(); +public class NodeControllerRemoteProxy implements INodeController { + private final IIPCHandle ipcHandle; - public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) { - super(ipc, inetSocketAddress); - } - - @Override - protected int getMaxRetries(boolean first) { - // -1 == retry forever - return 0; - } - - @Override - protected Logger getLogger() { - return LOGGER; + public NodeControllerRemoteProxy(IIPCHandle ipcHandle) { + this.ipcHandle = ipcHandle; } @Override @@ -77,74 +64,78 @@ throws Exception { StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags, jobParameters, deployedJobSpecId); - ensureIpcHandle().send(-1, stf, null); + ipcHandle.send(-1, stf, null); } @Override public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception { AbortTasksFunction atf = new AbortTasksFunction(jobId, tasks); - ensureIpcHandle().send(-1, atf, null); + ipcHandle.send(-1, atf, null); } @Override public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception { CleanupJobletFunction cjf = new CleanupJobletFunction(jobId, status); - ensureIpcHandle().send(-1, cjf, null); + ipcHandle.send(-1, cjf, null); } @Override public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception { ReportPartitionAvailabilityFunction rpaf = new ReportPartitionAvailabilityFunction( pid, networkAddress); - ensureIpcHandle().send(-1, rpaf, null); + ipcHandle.send(-1, rpaf, null); } @Override public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception { DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs); - ensureIpcHandle().send(-1, rpaf, null); + ipcHandle.send(-1, rpaf, null); } @Override public void undeployBinary(DeploymentId deploymentId) throws Exception { UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId); - ensureIpcHandle().send(-1, rpaf, null); + ipcHandle.send(-1, rpaf, null); } @Override public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception { DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception { UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void dumpState(String stateDumpId) throws Exception { StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId); - ensureIpcHandle().send(-1, dsf, null); + ipcHandle.send(-1, dsf, null); } @Override public void shutdown(boolean terminateNCService) throws Exception { ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService); - ensureIpcHandle().send(-1, sdrf, null); + ipcHandle.send(-1, sdrf, null); } @Override public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception { SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); } @Override public void takeThreadDump(String requestId) throws Exception { ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId); - ensureIpcHandle().send(-1, fn, null); + ipcHandle.send(-1, fn, null); + } + + public InetSocketAddress getAddress() { + return ipcHandle.getRemoteAddress(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 01e34c1..18a6b20 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -68,7 +68,6 @@ import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema; import org.apache.hyracks.control.common.ipc.CCNCFunctions; import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy; -import org.apache.hyracks.control.common.ipc.IControllerRemoteProxyIPCEventListener; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.work.FutureValue; import org.apache.hyracks.control.common.work.WorkQueue; @@ -83,6 +82,7 @@ import org.apache.hyracks.control.nc.partitions.PartitionManager; import org.apache.hyracks.control.nc.resources.memory.MemoryManager; import org.apache.hyracks.control.nc.work.BuildJobProfilesWork; +import org.apache.hyracks.ipc.api.IIPCEventListener; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IPCPerformanceCounters; import org.apache.hyracks.ipc.exceptions.IPCException; @@ -297,9 +297,10 @@ if (messagingNetManager != null) { messagingNetManager.start(); } - this.ccs = new ClusterControllerRemoteProxy(ipc, + this.ccs = new ClusterControllerRemoteProxy( + ipc.getHandle( new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()), - ncConfig.getClusterConnectRetries(), new IControllerRemoteProxyIPCEventListener() { + ncConfig.getClusterConnectRetries(), 1, new IIPCEventListener() { @Override public void ipcHandleRestored(IIPCHandle handle) throws IPCException { // we need to re-register in case of NC -> CC connection reset @@ -310,7 +311,7 @@ throw new IPCException(e); } } - }); + })); registerNode(); workQueue.start(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java similarity index 88% rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java rename to hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java index ec4f9e4..a6ba545 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java @@ -16,12 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.control.common.ipc; +package org.apache.hyracks.ipc.api; -import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.exceptions.IPCException; -public interface IControllerRemoteProxyIPCEventListener { +public interface IIPCEventListener { default void ipcHandleConnected(IIPCHandle handle) throws IPCException { // no-op diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java index 8e38651..b36e645 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hyracks.ipc.api.IIPCEventListener; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; import org.apache.hyracks.ipc.api.IPCPerformanceCounters; @@ -70,6 +71,28 @@ } public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries) throws IPCException { + return getHandle(remoteAddress, maxRetries, 0); + } + + public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress) throws IPCException { + return getReconnectingHandle(remoteAddress, 1); + } + + public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress, int reconnectAttempts) + throws IPCException { + return getHandle(remoteAddress, 0, reconnectAttempts, NoOpIPCEventListener.INSTANCE); + } + + public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts) + throws IPCException { + return getHandle(remoteAddress, maxRetries, reconnectAttempts, NoOpIPCEventListener.INSTANCE); + } + + public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts, + IIPCEventListener eventListener) throws IPCException { + if (reconnectAttempts > 0) { + return new ReconnectingIPCHandle(this, eventListener, remoteAddress, maxRetries, reconnectAttempts); + } try { return cMgr.getIPCHandle(remoteAddress, maxRetries); } catch (IOException e) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java similarity index 61% copy from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java copy to hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java index ec4f9e4..156641f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java @@ -16,22 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.control.common.ipc; +package org.apache.hyracks.ipc.impl; -import org.apache.hyracks.ipc.api.IIPCHandle; -import org.apache.hyracks.ipc.exceptions.IPCException; +import org.apache.hyracks.ipc.api.IIPCEventListener; -public interface IControllerRemoteProxyIPCEventListener { +public class NoOpIPCEventListener implements IIPCEventListener { + public static final IIPCEventListener INSTANCE = new NoOpIPCEventListener(); - default void ipcHandleConnected(IIPCHandle handle) throws IPCException { - // no-op - } - - default void ipcHandleDisconnected(IIPCHandle handle) throws IPCException { - // no-op - } - - default void ipcHandleRestored(IIPCHandle handle) throws IPCException { - // no-op + private NoOpIPCEventListener() { } } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java new file mode 100644 index 0000000..f0da860 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.ipc.impl; + +import java.net.InetSocketAddress; + +import org.apache.hyracks.ipc.api.IIPCEventListener; +import org.apache.hyracks.ipc.api.IIPCHandle; +import org.apache.hyracks.ipc.exceptions.IPCException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +class ReconnectingIPCHandle implements IIPCHandle { + + private static final Logger LOGGER = LogManager.getLogger(); + private final IPCSystem ipc; + private final int reconnectAttempts; + private final IIPCEventListener listener; + private IIPCHandle delegate; + + ReconnectingIPCHandle(IPCSystem ipc, IIPCEventListener listener, InetSocketAddress remoteAddress, int maxRetries, + int reconnectAttempts) throws IPCException { + this.ipc = ipc; + this.listener = listener; + this.reconnectAttempts = reconnectAttempts; + this.delegate = ipc.getHandle(remoteAddress, maxRetries); + listener.ipcHandleConnected(delegate); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return delegate.getRemoteAddress(); + } + + @Override + public long send(long requestId, Object payload, Exception exception) throws IPCException { + return ensureConnected().send(requestId, payload, exception); + } + + @Override + public void setAttachment(Object attachment) { + delegate.setAttachment(attachment); + } + + @Override + public Object getAttachment() { + return delegate.getAttachment(); + } + + @Override + public boolean isConnected() { + return delegate.isConnected(); + } + + private IIPCHandle ensureConnected() throws IPCException { + if (delegate.isConnected()) { + return delegate; + } + listener.ipcHandleDisconnected(delegate); + delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts); + LOGGER.warn("ipcHandle " + delegate + " restored"); + listener.ipcHandleRestored(delegate); + + return delegate; + } + +} -- To view, visit https://asterix-gerrit.ics.uci.edu/2284 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I2430510b22f936b89879df98322ef51ec87c6da6 Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
