Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1556688&r1=1556687&r2=1556688&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Thu Jan 9 00:53:04 2014 @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -98,6 +99,7 @@ public class MiniYARNCluster extends Com private boolean useFixedPorts; private boolean useRpc = false; + private int failoverTimeout; private ConcurrentMap<ApplicationAttemptId, Long> appMasters = new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2); @@ -189,12 +191,15 @@ public class MiniYARNCluster extends Com YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS); useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC); + failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); if (useRpc && !useFixedPorts) { throw new YarnRuntimeException("Invalid configuration!" + " Minicluster can use rpc only when configured to use fixed ports"); } + conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true); if (resourceManagers.length > 1) { conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); if (conf.get(YarnConfiguration.RM_HA_IDS) == null) { @@ -218,6 +223,13 @@ public class MiniYARNCluster extends Com // Don't try to login using keytab in the testcases. } }; + if (!useFixedPorts) { + if (HAUtil.isHAEnabled(conf)) { + setHARMConfiguration(i, conf); + } else { + setNonHARMConfiguration(conf); + } + } addService(new ResourceManagerWrapper(i)); } for(int index = 0; index < nodeManagers.length; index++) { @@ -230,18 +242,103 @@ public class MiniYARNCluster extends Com conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } + private void setNonHARMConfiguration(Configuration conf) { + String hostname = MiniYARNCluster.getHostname(); + conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0"); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0"); + WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0); + } + + private void setHARMConfiguration(final int index, Configuration conf) { + String hostname = MiniYARNCluster.getHostname(); + for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) { + conf.set(HAUtil.addSuffix(confKey, rmIds[index]), hostname + ":0"); + } + } + + private synchronized void initResourceManager(int index, Configuration conf) { + if (HAUtil.isHAEnabled(conf)) { + conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]); + } + resourceManagers[index].init(conf); + resourceManagers[index].getRMContext().getDispatcher().register( + RMAppAttemptEventType.class, + new EventHandler<RMAppAttemptEvent>() { + public void handle(RMAppAttemptEvent event) { + if (event instanceof RMAppAttemptRegistrationEvent) { + appMasters.put(event.getApplicationAttemptId(), + event.getTimestamp()); + } else if (event instanceof RMAppAttemptUnregistrationEvent) { + appMasters.remove(event.getApplicationAttemptId()); + } + } + }); + } + + private synchronized void startResourceManager(final int index) { + try { + Thread rmThread = new Thread() { + public void run() { + resourceManagers[index].start(); + } + }; + rmThread.setName("RM-" + index); + rmThread.start(); + int waitCount = 0; + while (resourceManagers[index].getServiceState() == STATE.INITED + && waitCount++ < 60) { + LOG.info("Waiting for RM to start..."); + Thread.sleep(1500); + } + if (resourceManagers[index].getServiceState() != STATE.STARTED) { + // RM could have failed. + throw new IOException( + "ResourceManager failed to start. Final state is " + + resourceManagers[index].getServiceState()); + } + } catch (Throwable t) { + throw new YarnRuntimeException(t); + } + LOG.info("MiniYARN ResourceManager address: " + + getConfig().get(YarnConfiguration.RM_ADDRESS)); + LOG.info("MiniYARN ResourceManager web address: " + + WebAppUtils.getRMWebAppURLWithoutScheme(getConfig())); + } + + @InterfaceAudience.Private + @VisibleForTesting + public synchronized void stopResourceManager(int index) { + if (resourceManagers[index] != null) { + resourceManagers[index].stop(); + resourceManagers[index] = null; + } + } + + @InterfaceAudience.Private + @VisibleForTesting + public synchronized void restartResourceManager(int index) + throws InterruptedException { + if (resourceManagers[index] != null) { + resourceManagers[index].stop(); + resourceManagers[index] = null; + } + Configuration conf = getConfig(); + resourceManagers[index] = new ResourceManager(); + initResourceManager(index, getConfig()); + startResourceManager(index); + } + public File getTestWorkDir() { return testWorkDir; } /** - * In a HA cluster, go through all the RMs and find the Active RM. If none - * of them are active, wait upto 5 seconds for them to transition to Active. - * - * In an non-HA cluster, return the index of the only RM. + * In a HA cluster, go through all the RMs and find the Active RM. In a + * non-HA cluster, return the index of the only RM. * - * @return index of the active RM or -1 if none of them transition to - * active even after 5 seconds of waiting + * @return index of the active RM or -1 if none of them turn active */ @InterfaceAudience.Private @VisibleForTesting @@ -250,9 +347,12 @@ public class MiniYARNCluster extends Com return 0; } - int numRetriesForRMBecomingActive = 5; + int numRetriesForRMBecomingActive = failoverTimeout / 100; while (numRetriesForRMBecomingActive-- > 0) { for (int i = 0; i < resourceManagers.length; i++) { + if (resourceManagers[i] == null) { + continue; + } try { if (HAServiceProtocol.HAServiceState.ACTIVE == resourceManagers[i].getRMContext().getRMAdminService() @@ -265,7 +365,7 @@ public class MiniYARNCluster extends Com } } try { - Thread.sleep(1000); + Thread.sleep(100); } catch (InterruptedException e) { throw new YarnRuntimeException("Interrupted while waiting for one " + "of the ResourceManagers to become active"); @@ -282,7 +382,7 @@ public class MiniYARNCluster extends Com int activeRMIndex = getActiveRMIndex(); return activeRMIndex == -1 ? null - : this.resourceManagers[getActiveRMIndex()]; + : this.resourceManagers[activeRMIndex]; } public ResourceManager getResourceManager(int i) { @@ -310,82 +410,21 @@ public class MiniYARNCluster extends Com index = i; } - private void setNonHARMConfiguration(Configuration conf) { - String hostname = MiniYARNCluster.getHostname(); - conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0"); - WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0); - } - - private void setHARMConfiguration(Configuration conf) { - String hostname = MiniYARNCluster.getHostname(); - for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) { - for (String id : HAUtil.getRMHAIds(conf)) { - conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0"); - } - } - } - @Override protected synchronized void serviceInit(Configuration conf) throws Exception { - conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true); - - if (!useFixedPorts) { - if (HAUtil.isHAEnabled(conf)) { - setHARMConfiguration(conf); - } else { - setNonHARMConfiguration(conf); - } - } - if (HAUtil.isHAEnabled(conf)) { - conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]); - } - resourceManagers[index].init(conf); - resourceManagers[index].getRMContext().getDispatcher().register - (RMAppAttemptEventType.class, - new EventHandler<RMAppAttemptEvent>() { - public void handle(RMAppAttemptEvent event) { - if (event instanceof RMAppAttemptRegistrationEvent) { - appMasters.put(event.getApplicationAttemptId(), event.getTimestamp()); - } else if (event instanceof RMAppAttemptUnregistrationEvent) { - appMasters.remove(event.getApplicationAttemptId()); - } - } - }); + initResourceManager(index, conf); super.serviceInit(conf); } @Override protected synchronized void serviceStart() throws Exception { - try { - new Thread() { - public void run() { - resourceManagers[index].start(); - } - }.start(); - int waitCount = 0; - while (resourceManagers[index].getServiceState() == STATE.INITED - && waitCount++ < 60) { - LOG.info("Waiting for RM to start..."); - Thread.sleep(1500); - } - if (resourceManagers[index].getServiceState() != STATE.STARTED) { - // RM could have failed. - throw new IOException( - "ResourceManager failed to start. Final state is " - + resourceManagers[index].getServiceState()); - } - super.serviceStart(); - } catch (Throwable t) { - throw new YarnRuntimeException(t); - } + startResourceManager(index); LOG.info("MiniYARN ResourceManager address: " + getConfig().get(YarnConfiguration.RM_ADDRESS)); LOG.info("MiniYARN ResourceManager web address: " + WebAppUtils.getRMWebAppURLWithoutScheme(getConfig())); + super.serviceStart(); } private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException { @@ -406,7 +445,6 @@ public class MiniYARNCluster extends Com waitForAppMastersToFinish(5000); resourceManagers[index].stop(); } - super.serviceStop(); if (Shell.WINDOWS) { // On Windows, clean up the short temporary symlink that was created to @@ -420,6 +458,7 @@ public class MiniYARNCluster extends Com testWorkDir.getAbsolutePath()); } } + super.serviceStop(); } }
Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java?rev=1556688&r1=1556687&r2=1556688&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java Thu Jan 9 00:53:04 2014 @@ -19,21 +19,20 @@ package org.apache.hadoop.yarn.server.webproxy; import java.io.IOException; -import java.net.InetSocketAddress; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; /** * This class abstracts away how ApplicationReports are fetched. @@ -50,16 +49,12 @@ public class AppReportFetcher { */ public AppReportFetcher(Configuration conf) { this.conf = conf; - YarnRPC rpc = YarnRPC.create(this.conf); - InetSocketAddress rmAddress = conf.getSocketAddr( - YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); - LOG.info("Connecting to ResourceManager at " + rmAddress); - applicationsManager = - (ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class, - rmAddress, this.conf); - LOG.info("Connected to ResourceManager at " + rmAddress); + try { + applicationsManager = ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } } /** @@ -91,4 +86,10 @@ public class AppReportFetcher { .getApplicationReport(request); return response.getApplicationReport(); } + + public void stop() { + if (this.applicationsManager != null) { + RPC.stopProxy(this.applicationsManager); + } + } } Modified: hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java?rev=1556688&r1=1556687&r2=1556688&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java Thu Jan 9 00:53:04 2014 @@ -117,6 +117,9 @@ public class WebAppProxy extends Abstrac throw new YarnRuntimeException("Error stopping proxy web server",e); } } + if(this.fetcher != null) { + this.fetcher.stop(); + } super.serviceStop(); }
