HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned to SERVICE_NOT_RESPONDING (Contributed by Ming Ma)
(cherry picked from commit cf4b7f506dd338ecf2ed4c643b6a6a334e070fca) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/005e1df5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/005e1df5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/005e1df5 Branch: refs/heads/branch-2 Commit: 005e1df540aeccbeb371f3bab834b140d85f0ec5 Parents: 91a5d92 Author: Vinayakumar B <vinayakum...@apache.org> Authored: Tue Feb 17 14:55:56 2015 +0530 Committer: Vinayakumar B <vinayakum...@apache.org> Committed: Tue Feb 17 15:19:44 2015 +0530 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/ha/HealthMonitor.java | 35 ++++++---- .../org/apache/hadoop/ha/DummyHAService.java | 73 +++++++++++++++++--- .../org/apache/hadoop/ha/TestHealthMonitor.java | 4 +- 4 files changed, 94 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/005e1df5/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ff71162..f3ab6f7 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -537,6 +537,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11467. KerberosAuthenticator can connect to a non-secure cluster. (yzhangal via rkanter) + HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned + to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/005e1df5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java index 0d14444..8c87629 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -30,6 +30,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.*; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.util.Daemon; @@ -201,18 +202,20 @@ public class HealthMonitor { status = proxy.getServiceStatus(); proxy.monitorHealth(); healthy = true; - } catch (HealthCheckFailedException e) { - LOG.warn("Service health check failed for " + targetToMonitor - + ": " + e.getMessage()); - enterState(State.SERVICE_UNHEALTHY); } catch (Throwable t) { - LOG.warn("Transport-level exception trying to monitor health of " + - targetToMonitor + ": " + t.getLocalizedMessage()); - RPC.stopProxy(proxy); - proxy = null; - enterState(State.SERVICE_NOT_RESPONDING); - Thread.sleep(sleepAfterDisconnectMillis); - return; + if (isHealthCheckFailedException(t)) { + LOG.warn("Service health check failed for " + targetToMonitor + + ": " + t.getMessage()); + enterState(State.SERVICE_UNHEALTHY); + } else { + LOG.warn("Transport-level exception trying to monitor health of " + + targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage()); + RPC.stopProxy(proxy); + proxy = null; + enterState(State.SERVICE_NOT_RESPONDING); + Thread.sleep(sleepAfterDisconnectMillis); + return; + } } if (status != null) { @@ -225,7 +228,15 @@ public class HealthMonitor { Thread.sleep(checkIntervalMillis); } } - + + private boolean isHealthCheckFailedException(Throwable t) { + return ((t instanceof HealthCheckFailedException) || + (t instanceof RemoteException && + ((RemoteException)t).unwrapRemoteException( + HealthCheckFailedException.class) instanceof + HealthCheckFailedException)); + } + private synchronized void setLastServiceStatus(HAServiceStatus status) { this.lastServiceState = status; for (ServiceStateCallback cb : serviceStateCallbacks) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/005e1df5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java index e9189e2..aef6c4d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java @@ -22,15 +22,25 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import com.google.protobuf.BlockingService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.mockito.Mockito; import com.google.common.collect.Lists; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT; + /** * Test-only implementation of {@link HAServiceTarget}, which returns * a mock implementation. @@ -50,22 +60,33 @@ class DummyHAService extends HAServiceTarget { DummySharedResource sharedResource; public int fenceCount = 0; public int activeTransitionCount = 0; + boolean testWithProtoBufRPC = false; static ArrayList<DummyHAService> instances = Lists.newArrayList(); int index; DummyHAService(HAServiceState state, InetSocketAddress address) { + this(state, address, false); + } + + DummyHAService(HAServiceState state, InetSocketAddress address, + boolean testWithProtoBufRPC) { this.state = state; - this.proxy = makeMock(); + this.testWithProtoBufRPC = testWithProtoBufRPC; + if (testWithProtoBufRPC) { + this.address = startAndGetRPCServerAddress(address); + } else { + this.address = address; + } + Configuration conf = new Configuration(); + this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT); try { - Configuration conf = new Configuration(); - conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName()); + conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName()); this.fencer = Mockito.spy( NodeFencer.create(conf, DUMMY_FENCE_KEY)); } catch (BadFencingConfigurationException e) { throw new RuntimeException(e); } - this.address = address; synchronized (instances) { instances.add(this); this.index = instances.size(); @@ -75,9 +96,42 @@ class DummyHAService extends HAServiceTarget { public void setSharedResource(DummySharedResource rsrc) { this.sharedResource = rsrc; } - - private HAServiceProtocol makeMock() { - return Mockito.spy(new MockHAProtocolImpl()); + + private InetSocketAddress startAndGetRPCServerAddress(InetSocketAddress serverAddress) { + Configuration conf = new Configuration(); + + try { + RPC.setProtocolEngine(conf, + HAServiceProtocolPB.class, ProtobufRpcEngine.class); + HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = + new HAServiceProtocolServerSideTranslatorPB(new MockHAProtocolImpl()); + BlockingService haPbService = HAServiceProtocolService + .newReflectiveBlockingService(haServiceProtocolXlator); + + Server server = new RPC.Builder(conf) + .setProtocol(HAServiceProtocolPB.class) + .setInstance(haPbService) + .setBindAddress(serverAddress.getHostName()) + .setPort(serverAddress.getPort()).build(); + server.start(); + return NetUtils.getConnectAddress(server); + } catch (IOException e) { + return null; + } + } + + private HAServiceProtocol makeMock(Configuration conf, int timeoutMs) { + HAServiceProtocol service; + if (!testWithProtoBufRPC) { + service = new MockHAProtocolImpl(); + } else { + try { + service = super.getProxy(conf, timeoutMs); + } catch (IOException e) { + return null; + } + } + return Mockito.spy(service); } @Override @@ -93,6 +147,9 @@ class DummyHAService extends HAServiceTarget { @Override public HAServiceProtocol getProxy(Configuration conf, int timeout) throws IOException { + if (testWithProtoBufRPC) { + proxy = makeMock(conf, timeout); + } return proxy; } @@ -168,7 +225,7 @@ class DummyHAService extends HAServiceTarget { public HAServiceStatus getServiceStatus() throws IOException { checkUnreachable(); HAServiceStatus ret = new HAServiceStatus(state); - if (state == HAServiceState.STANDBY) { + if (state == HAServiceState.STANDBY || state == HAServiceState.ACTIVE) { ret.setReadyToBecomeActive(); } return ret; http://git-wip-us.apache.org/repos/asf/hadoop/blob/005e1df5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java index db534de..b58793f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java @@ -20,6 +20,7 @@ package org.apache.hadoop.ha; import static org.junit.Assert.*; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -54,7 +55,8 @@ public class TestHealthMonitor { conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); - svc = new DummyHAService(HAServiceState.ACTIVE, null); + svc = new DummyHAService(HAServiceState.ACTIVE, + new InetSocketAddress("0.0.0.0", 0), true); hm = new HealthMonitor(conf, svc) { @Override protected HAServiceProtocol createProxy() throws IOException {