Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java Wed Aug 20 01:34:29 2014 @@ -150,6 +150,15 @@ public abstract class ResourceCalculator Resource clusterResource, Resource numerator, Resource denominator); /** + * Determine if a resource is not suitable for use as a divisor + * (will result in divide by 0, etc) + * + * @param r resource + * @return true if divisor is invalid (should not be used), false else + */ + public abstract boolean isInvalidDivisor(Resource r); + + /** * Ratio of resource <code>a</code> to resource <code>b</code>. * * @param a resource
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java Wed Aug 20 01:34:29 2014 @@ -184,6 +184,11 @@ public class Resources { return calculator.roundDown(lhs, factor); } + public static boolean isInvalidDivisor( + ResourceCalculator resourceCalculator, Resource divisor) { + return resourceCalculator.isInvalidDivisor(divisor); + } + public static float ratio( ResourceCalculator resourceCalculator, Resource lhs, Resource rhs) { return resourceCalculator.ratio(lhs, rhs); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java Wed Aug 20 01:34:29 2014 @@ -19,12 +19,12 @@ package org.apache.hadoop.yarn.webapp.ut import static org.apache.hadoop.yarn.util.StringHelper.PATH_JOINER; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; @@ -34,11 +34,18 @@ import org.apache.hadoop.http.HttpServer import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.RMHAUtils; @Private @Evolving public class WebAppUtils { + public static final String WEB_APP_TRUSTSTORE_PASSWORD_KEY = + "ssl.server.truststore.password"; + public static final String WEB_APP_KEYSTORE_PASSWORD_KEY = + "ssl.server.keystore.password"; + public static final String WEB_APP_KEY_PASSWORD_KEY = + "ssl.server.keystore.keypassword"; public static final String HTTPS_PREFIX = "https://"; public static final String HTTP_PREFIX = "http://"; @@ -170,6 +177,37 @@ public class WebAppUtils { return sb.toString(); } + /** + * Get the URL to use for binding where bind hostname can be specified + * to override the hostname in the webAppURLWithoutScheme. Port specified in the + * webAppURLWithoutScheme will be used. + * + * @param conf the configuration + * @param hostProperty bind host property name + * @param webAppURLWithoutScheme web app URL without scheme String + * @return String representing bind URL + */ + public static String getWebAppBindURL( + Configuration conf, + String hostProperty, + String webAppURLWithoutScheme) { + + // If the bind-host setting exists then it overrides the hostname + // portion of the corresponding webAppURLWithoutScheme + String host = conf.getTrimmed(hostProperty); + if (host != null && !host.isEmpty()) { + if (webAppURLWithoutScheme.contains(":")) { + webAppURLWithoutScheme = host + ":" + webAppURLWithoutScheme.split(":")[1]; + } + else { + throw new YarnRuntimeException("webAppURLWithoutScheme must include port specification but doesn't: " + + webAppURLWithoutScheme); + } + } + + return webAppURLWithoutScheme; + } + public static String getNMWebAppURLWithoutScheme(Configuration conf) { if (YarnConfiguration.useHttps(conf)) { return conf.get(YarnConfiguration.NM_WEBAPP_HTTPS_ADDRESS, @@ -242,21 +280,56 @@ public class WebAppUtils { /** * Load the SSL keystore / truststore into the HttpServer builder. + * @param builder the HttpServer2.Builder to populate with ssl config */ public static HttpServer2.Builder loadSslConfiguration( HttpServer2.Builder builder) { - Configuration sslConf = new Configuration(false); + return loadSslConfiguration(builder, null); + } + + /** + * Load the SSL keystore / truststore into the HttpServer builder. + * @param builder the HttpServer2.Builder to populate with ssl config + * @param sslConf the Configuration instance to use during loading of SSL conf + */ + public static HttpServer2.Builder loadSslConfiguration( + HttpServer2.Builder builder, Configuration sslConf) { + if (sslConf == null) { + sslConf = new Configuration(false); + } boolean needsClientAuth = YarnConfiguration.YARN_SSL_CLIENT_HTTPS_NEED_AUTH_DEFAULT; sslConf.addResource(YarnConfiguration.YARN_SSL_SERVER_RESOURCE_DEFAULT); return builder .needsClientAuth(needsClientAuth) - .keyPassword(sslConf.get("ssl.server.keystore.keypassword")) + .keyPassword(getPassword(sslConf, WEB_APP_KEY_PASSWORD_KEY)) .keyStore(sslConf.get("ssl.server.keystore.location"), - sslConf.get("ssl.server.keystore.password"), + getPassword(sslConf, WEB_APP_KEYSTORE_PASSWORD_KEY), sslConf.get("ssl.server.keystore.type", "jks")) .trustStore(sslConf.get("ssl.server.truststore.location"), - sslConf.get("ssl.server.truststore.password"), + getPassword(sslConf, WEB_APP_TRUSTSTORE_PASSWORD_KEY), sslConf.get("ssl.server.truststore.type", "jks")); } + + /** + * Leverages the Configuration.getPassword method to attempt to get + * passwords from the CredentialProvider API before falling back to + * clear text in config - if falling back is allowed. + * @param conf Configuration instance + * @param alias name of the credential to retreive + * @return String credential value or null + */ + static String getPassword(Configuration conf, String alias) { + String password = null; + try { + char[] passchars = conf.getPassword(alias); + if (passchars != null) { + password = new String(passchars); + } + } + catch (IOException ioe) { + password = null; + } + return password; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Aug 20 01:34:29 2014 @@ -71,6 +71,17 @@ </property> <property> + <description> + The actual address the server will bind to. If this optional address is + set, the RPC and webapp servers will bind to this address and the port specified in + yarn.resourcemanager.address and yarn.resourcemanager.webapp.address, respectively. This + is most useful for making RM listen to all interfaces by setting to 0.0.0.0. + </description> + <name>yarn.resourcemanager.bind-host</name> + <value></value> + </property> + + <property> <description>The number of threads used to handle applications manager requests.</description> <name>yarn.resourcemanager.client.thread-count</name> <value>50</value> @@ -195,6 +206,15 @@ </property> <property> + <description>Flag to enable override of the default kerberos authentication + filter with the RM authentication filter to allow authentication using + delegation tokens(fallback to kerberos if the tokens are missing). Only + applicable when the http authentication type is kerberos.</description> + <name>yarn.resourcemanager.webapp.delegation-token-auth-filter.enabled</name> + <value>true</value> + </property> + + <property> <description>How long to wait until a node manager is considered dead.</description> <name>yarn.nm.liveness-monitor.expiry-interval-ms</name> <value>600000</value> @@ -454,7 +474,7 @@ <property> <description>Name of the cluster. In a HA setting, this is used to ensure the RM participates in leader - election fo this cluster and ensures it does not affect + election for this cluster and ensures it does not affect other clusters</description> <name>yarn.resourcemanager.cluster-id</name> <!--value>yarn-cluster</value--> @@ -627,6 +647,17 @@ </property> <property> + <description> + The actual address the server will bind to. If this optional address is + set, the RPC and webapp servers will bind to this address and the port specified in + yarn.nodemanager.address and yarn.nodemanager.webapp.address, respectively. This is + most useful for making NM listen to all interfaces by setting to 0.0.0.0. + </description> + <name>yarn.nodemanager.bind-host</name> + <value></value> + </property> + + <property> <description>Environment variables that should be forwarded from the NodeManager's environment to the container's.</description> <name>yarn.nodemanager.admin-env</name> <value>MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX</value> @@ -1164,6 +1195,18 @@ </property> <property> + <description> + The actual address the server will bind to. If this optional address is + set, the RPC and webapp servers will bind to this address and the port specified in + yarn.timeline-service.address and yarn.timeline-service.webapp.address, respectively. + This is most useful for making the service listen to all interfaces by setting to + 0.0.0.0. + </description> + <name>yarn.timeline-service.bind-host</name> + <value></value> + </property> + + <property> <description>Store class name for timeline store.</description> <name>yarn.timeline-service.store-class</name> <value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value> Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java Wed Aug 20 01:34:29 2014 @@ -28,6 +28,7 @@ import java.net.SocketAddress; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; public class TestYarnConfiguration { @@ -75,4 +76,131 @@ public class TestYarnConfiguration { YarnConfiguration.DEFAULT_NM_PORT); assertEquals(1234, addr.getPort()); } + + @Test + public void testGetSocketAddr() throws Exception { + + YarnConfiguration conf; + InetSocketAddress resourceTrackerAddress; + + //all default + conf = new YarnConfiguration(); + resourceTrackerAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + assertEquals( + new InetSocketAddress( + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0], + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT), + resourceTrackerAddress); + + //with address + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.1"); + resourceTrackerAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + assertEquals( + new InetSocketAddress( + "10.0.0.1", + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT), + resourceTrackerAddress); + + //address and socket + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2:5001"); + resourceTrackerAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + assertEquals( + new InetSocketAddress( + "10.0.0.2", + 5001), + resourceTrackerAddress); + + //bind host only + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_BIND_HOST, "10.0.0.3"); + resourceTrackerAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + assertEquals( + new InetSocketAddress( + "10.0.0.3", + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT), + resourceTrackerAddress); + + //bind host and address no port + conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0"); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2"); + resourceTrackerAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + assertEquals( + new InetSocketAddress( + "0.0.0.0", + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT), + resourceTrackerAddress); + + //bind host and address with port + conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0"); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2:5003"); + resourceTrackerAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + assertEquals( + new InetSocketAddress( + "0.0.0.0", + 5003), + resourceTrackerAddress); + + } + + @Test + public void testUpdateConnectAddr() throws Exception { + YarnConfiguration conf; + InetSocketAddress resourceTrackerConnectAddress; + InetSocketAddress serverAddress; + + //no override, old behavior. Won't work on a host named "yo.yo.yo" + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "yo.yo.yo"); + serverAddress = new InetSocketAddress( + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0], + Integer.valueOf(YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[1])); + + resourceTrackerConnectAddress = conf.updateConnectAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + serverAddress); + + assertFalse(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo")); + + //cause override with address + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "yo.yo.yo"); + conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0"); + serverAddress = new InetSocketAddress( + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0], + Integer.valueOf(YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[1])); + + resourceTrackerConnectAddress = conf.updateConnectAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + serverAddress); + + assertTrue(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo")); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java Wed Aug 20 01:34:29 2014 @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; public class ApplicationHistoryClientService extends AbstractService { @@ -75,10 +76,11 @@ public class ApplicationHistoryClientSer protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); - InetSocketAddress address = - conf.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT); + InetSocketAddress address = conf.getSocketAddr( + YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + YarnConfiguration.TIMELINE_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT); server = rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler, @@ -88,8 +90,10 @@ public class ApplicationHistoryClientSer server.start(); this.bindAddress = - conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, - server.getListenerAddress()); + conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + YarnConfiguration.TIMELINE_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS, + server.getListenerAddress()); LOG.info("Instantiated ApplicationHistoryClientService at " + this.bindAddress); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java Wed Aug 20 01:34:29 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.ap import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +28,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.security.AuthenticationFilterInitializer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; @@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer; @@ -59,12 +62,12 @@ public class ApplicationHistoryServer ex private static final Log LOG = LogFactory .getLog(ApplicationHistoryServer.class); - protected ApplicationHistoryClientService ahsClientService; - protected ApplicationHistoryManager historyManager; - protected TimelineStore timelineStore; - protected TimelineDelegationTokenSecretManagerService secretManagerService; - protected TimelineACLsManager timelineACLsManager; - protected WebApp webApp; + private ApplicationHistoryClientService ahsClientService; + private ApplicationHistoryManager historyManager; + private TimelineStore timelineStore; + private TimelineDelegationTokenSecretManagerService secretManagerService; + private TimelineDataManager timelineDataManager; + private WebApp webApp; public ApplicationHistoryServer() { super(ApplicationHistoryServer.class.getName()); @@ -72,15 +75,18 @@ public class ApplicationHistoryServer ex @Override protected void serviceInit(Configuration conf) throws Exception { - historyManager = createApplicationHistory(); - ahsClientService = createApplicationHistoryClientService(historyManager); - addService(ahsClientService); - addService((Service) historyManager); + // init timeline services first timelineStore = createTimelineStore(conf); addIfService(timelineStore); secretManagerService = createTimelineDelegationTokenSecretManagerService(conf); addService(secretManagerService); - timelineACLsManager = createTimelineACLsManager(conf); + timelineDataManager = createTimelineDataManager(conf); + + // init generic history service afterwards + historyManager = createApplicationHistoryManager(conf); + ahsClientService = createApplicationHistoryClientService(historyManager); + addService(ahsClientService); + addService((Service) historyManager); DefaultMetricsSystem.initialize("ApplicationHistoryServer"); JvmMetrics.initSingleton("ApplicationHistoryServer", null); @@ -111,21 +117,22 @@ public class ApplicationHistoryServer ex @Private @VisibleForTesting - public ApplicationHistoryClientService getClientService() { + ApplicationHistoryClientService getClientService() { return this.ahsClientService; } - protected ApplicationHistoryClientService - createApplicationHistoryClientService( - ApplicationHistoryManager historyManager) { - return new ApplicationHistoryClientService(historyManager); - } - - protected ApplicationHistoryManager createApplicationHistory() { - return new ApplicationHistoryManagerImpl(); + /** + * @return ApplicationTimelineStore + */ + @Private + @VisibleForTesting + public TimelineStore getTimelineStore() { + return timelineStore; } - protected ApplicationHistoryManager getApplicationHistory() { + @Private + @VisibleForTesting + ApplicationHistoryManager getApplicationHistoryManager() { return this.historyManager; } @@ -154,28 +161,35 @@ public class ApplicationHistoryServer ex launchAppHistoryServer(args); } - protected ApplicationHistoryManager createApplicationHistoryManager( + private ApplicationHistoryClientService + createApplicationHistoryClientService( + ApplicationHistoryManager historyManager) { + return new ApplicationHistoryClientService(historyManager); + } + + private ApplicationHistoryManager createApplicationHistoryManager( Configuration conf) { return new ApplicationHistoryManagerImpl(); } - protected TimelineStore createTimelineStore( + private TimelineStore createTimelineStore( Configuration conf) { return ReflectionUtils.newInstance(conf.getClass( YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class, TimelineStore.class), conf); } - protected TimelineDelegationTokenSecretManagerService + private TimelineDelegationTokenSecretManagerService createTimelineDelegationTokenSecretManagerService(Configuration conf) { return new TimelineDelegationTokenSecretManagerService(); } - protected TimelineACLsManager createTimelineACLsManager(Configuration conf) { - return new TimelineACLsManager(conf); + private TimelineDataManager createTimelineDataManager(Configuration conf) { + return new TimelineDataManager( + timelineStore, new TimelineACLsManager(conf)); } - protected void startWebApp() { + private void startWebApp() { Configuration conf = getConfig(); // Always load pseudo authentication filter to parse "user.name" in an URL // to identify a HTTP request's user in insecure mode. @@ -183,23 +197,41 @@ public class ApplicationHistoryServer ex // the customized filter will be loaded by the timeline server to do Kerberos // + DT authentication. String initializers = conf.get("hadoop.http.filter.initializers"); + initializers = - initializers == null || initializers.length() == 0 ? "" : "," - + initializers; - if (!initializers.contains( - TimelineAuthenticationFilterInitializer.class.getName())) { - conf.set("hadoop.http.filter.initializers", - TimelineAuthenticationFilterInitializer.class.getName() - + initializers); + initializers == null || initializers.length() == 0 ? "" : initializers; + + if (!initializers.contains(TimelineAuthenticationFilterInitializer.class + .getName())) { + initializers = + TimelineAuthenticationFilterInitializer.class.getName() + "," + + initializers; + } + + String[] parts = initializers.split(","); + ArrayList<String> target = new ArrayList<String>(); + for (String filterInitializer : parts) { + filterInitializer = filterInitializer.trim(); + if (filterInitializer.equals(AuthenticationFilterInitializer.class + .getName())) { + continue; + } + target.add(filterInitializer); + } + String actualInitializers = + org.apache.commons.lang.StringUtils.join(target, ","); + if (!actualInitializers.equals(initializers)) { + conf.set("hadoop.http.filter.initializers", actualInitializers); } - String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(conf); + String bindAddress = WebAppUtils.getWebAppBindURL(conf, + YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); LOG.info("Instantiating AHSWebApp at " + bindAddress); try { AHSWebApp ahsWebApp = AHSWebApp.getInstance(); ahsWebApp.setApplicationHistoryManager(historyManager); - ahsWebApp.setTimelineStore(timelineStore); ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService); - ahsWebApp.setTimelineACLsManager(timelineACLsManager); + ahsWebApp.setTimelineDataManager(timelineDataManager); webApp = WebApps .$for("applicationhistory", ApplicationHistoryClientService.class, @@ -211,14 +243,6 @@ public class ApplicationHistoryServer ex throw new YarnRuntimeException(msg, e); } } - /** - * @return ApplicationTimelineStore - */ - @Private - @VisibleForTesting - public TimelineStore getTimelineStore() { - return timelineStore; - } private void doSecureLogin(Configuration conf) throws IOException { InetSocketAddress socAddr = getBindAddress(conf); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Wed Aug 20 01:34:29 2014 @@ -22,8 +22,7 @@ import static org.apache.hadoop.yarn.uti import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.server.api.ApplicationContext; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager; -import org.apache.hadoop.yarn.server.timeline.TimelineStore; -import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService; import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; @@ -36,9 +35,8 @@ import com.google.common.annotations.Vis public class AHSWebApp extends WebApp implements YarnWebParams { private ApplicationHistoryManager applicationHistoryManager; - private TimelineStore timelineStore; private TimelineDelegationTokenSecretManagerService secretManagerService; - private TimelineACLsManager timelineACLsManager; + private TimelineDataManager timelineDataManager; private static AHSWebApp instance = null; @@ -68,14 +66,6 @@ public class AHSWebApp extends WebApp im this.applicationHistoryManager = applicationHistoryManager; } - public TimelineStore getTimelineStore() { - return timelineStore; - } - - public void setTimelineStore(TimelineStore timelineStore) { - this.timelineStore = timelineStore; - } - public TimelineDelegationTokenSecretManagerService getTimelineDelegationTokenSecretManagerService() { return secretManagerService; @@ -86,12 +76,12 @@ public class AHSWebApp extends WebApp im this.secretManagerService = secretManagerService; } - public TimelineACLsManager getTimelineACLsManager() { - return timelineACLsManager; + public TimelineDataManager getTimelineDataManager() { + return timelineDataManager; } - public void setTimelineACLsManager(TimelineACLsManager timelineACLsManager) { - this.timelineACLsManager = timelineACLsManager; + public void setTimelineDataManager(TimelineDataManager timelineDataManager) { + this.timelineDataManager = timelineDataManager; } @Override @@ -101,10 +91,9 @@ public class AHSWebApp extends WebApp im bind(TimelineWebServices.class); bind(GenericExceptionHandler.class); bind(ApplicationContext.class).toInstance(applicationHistoryManager); - bind(TimelineStore.class).toInstance(timelineStore); bind(TimelineDelegationTokenSecretManagerService.class).toInstance( secretManagerService); - bind(TimelineACLsManager.class).toInstance(timelineACLsManager); + bind(TimelineDataManager.class).toInstance(timelineDataManager); route("/", AHSController.class); route(pajoin("/apps", APP_STATE), AHSController.class); route(pajoin("/app", APPLICATION_ID), AHSController.class, "app"); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java Wed Aug 20 01:34:29 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.ti import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; +import static org.fusesource.leveldbjni.JniDBFactory.bytes; import java.io.ByteArrayOutputStream; import java.io.File; @@ -60,8 +61,12 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Options; import org.iq80.leveldb.ReadOptions; @@ -141,6 +146,11 @@ public class LeveldbTimelineStore extend "z".getBytes(); private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version"; + + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 0); @Private @VisibleForTesting @@ -193,6 +203,7 @@ public class LeveldbTimelineStore extend } LOG.info("Using leveldb path " + dbPath); db = factory.open(new File(dbPath.toString()), options); + checkVersion(); startTimeWriteCache = Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( conf))); @@ -1270,8 +1281,6 @@ public class LeveldbTimelineStore extend DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); } - // warning is suppressed to prevent eclipse from noting unclosed resource - @SuppressWarnings("resource") @VisibleForTesting List<String> getEntityTypes() throws IOException { DBIterator iterator = null; @@ -1489,4 +1498,65 @@ public class LeveldbTimelineStore extend readOptions.fillCache(fillCache); return db.iterator(readOptions); } + + Version loadVersion() throws IOException { + byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return Version.newInstance(1, 0); + } + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); + return version; + } + + // Only used for test + @VisibleForTesting + void storeVersion(Version state) throws IOException { + dbStoreVersion(state); + } + + private void dbStoreVersion(Version state) throws IOException { + String key = TIMELINE_STORE_VERSION_KEY; + byte[] data = + ((VersionPBImpl) state).getProto().toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of TS-store is a major upgrade, and any + * compatible change of TS-store is a minor upgrade. + * 3) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade timeline store or remove incompatible old state. + */ + private void checkVersion() throws IOException { + Version loadedVersion = loadVersion(); + LOG.info("Loaded timeline store version info " + loadedVersion); + if (loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing timeline store version info " + getCurrentVersion()); + dbStoreVersion(CURRENT_VERSION_INFO); + } else { + String incompatibleMessage = + "Incompatible version for timeline store: expecting version " + + getCurrentVersion() + ", but loading version " + loadedVersion; + LOG.fatal(incompatibleMessage); + throw new IOException(incompatibleMessage); + } + } + } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java Wed Aug 20 01:34:29 2014 @@ -18,14 +18,10 @@ package org.apache.hadoop.yarn.server.timeline.webapp; -import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER; - -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.SortedSet; @@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.EntityIdentifier; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timeline.TimelineStore; -import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -80,14 +73,11 @@ public class TimelineWebServices { private static final Log LOG = LogFactory.getLog(TimelineWebServices.class); - private TimelineStore store; - private TimelineACLsManager timelineACLsManager; + private TimelineDataManager timelineDataManager; @Inject - public TimelineWebServices(TimelineStore store, - TimelineACLsManager timelineACLsManager) { - this.store = store; - this.timelineACLsManager = timelineACLsManager; + public TimelineWebServices(TimelineDataManager timelineDataManager) { + this.timelineDataManager = timelineDataManager; } @XmlRootElement(name = "about") @@ -148,61 +138,28 @@ public class TimelineWebServices { @QueryParam("limit") String limit, @QueryParam("fields") String fields) { init(res); - TimelineEntities entities = null; try { - EnumSet<Field> fieldEnums = parseFieldsStr(fields, ","); - boolean modified = extendFields(fieldEnums); - UserGroupInformation callerUGI = getUser(req); - entities = store.getEntities( + return timelineDataManager.getEntities( parseStr(entityType), - parseLongStr(limit), + parsePairStr(primaryFilter, ":"), + parsePairsStr(secondaryFilter, ",", ":"), parseLongStr(windowStart), parseLongStr(windowEnd), parseStr(fromId), parseLongStr(fromTs), - parsePairStr(primaryFilter, ":"), - parsePairsStr(secondaryFilter, ",", ":"), - fieldEnums); - if (entities != null) { - Iterator<TimelineEntity> entitiesItr = - entities.getEntities().iterator(); - while (entitiesItr.hasNext()) { - TimelineEntity entity = entitiesItr.next(); - try { - // check ACLs - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - entitiesItr.remove(); - } else { - // clean up system data - if (modified) { - entity.setPrimaryFilters(null); - } else { - cleanupOwnerInfo(entity); - } - } - } catch (YarnException e) { - LOG.error("Error when verifying access for user " + callerUGI - + " on the events of the timeline entity " - + new EntityIdentifier(entity.getEntityId(), - entity.getEntityType()), e); - entitiesItr.remove(); - } - } - } + parseLongStr(limit), + parseFieldsStr(fields, ","), + getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( "windowStart, windowEnd or limit is not a numeric value."); } catch (IllegalArgumentException e) { throw new BadRequestException("requested invalid field."); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error getting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } - if (entities == null) { - return new TimelineEntities(); - } - return entities; } /** @@ -220,33 +177,15 @@ public class TimelineWebServices { init(res); TimelineEntity entity = null; try { - EnumSet<Field> fieldEnums = parseFieldsStr(fields, ","); - boolean modified = extendFields(fieldEnums); - entity = - store.getEntity(parseStr(entityId), parseStr(entityType), - fieldEnums); - if (entity != null) { - // check ACLs - UserGroupInformation callerUGI = getUser(req); - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - entity = null; - } else { - // clean up the system data - if (modified) { - entity.setPrimaryFilters(null); - } else { - cleanupOwnerInfo(entity); - } - } - } + entity = timelineDataManager.getEntity( + parseStr(entityType), + parseStr(entityId), + parseFieldsStr(fields, ","), + getUser(req)); } catch (IllegalArgumentException e) { throw new BadRequestException( "requested invalid field."); - } catch (IOException e) { - LOG.error("Error getting entity", e); - throw new WebApplicationException(e, - Response.Status.INTERNAL_SERVER_ERROR); - } catch (YarnException e) { + } catch (Exception e) { LOG.error("Error getting entity", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -275,51 +214,23 @@ public class TimelineWebServices { @QueryParam("windowEnd") String windowEnd, @QueryParam("limit") String limit) { init(res); - TimelineEvents events = null; try { - UserGroupInformation callerUGI = getUser(req); - events = store.getEntityTimelines( + return timelineDataManager.getEvents( parseStr(entityType), parseArrayStr(entityId, ","), - parseLongStr(limit), + parseArrayStr(eventType, ","), parseLongStr(windowStart), parseLongStr(windowEnd), - parseArrayStr(eventType, ",")); - if (events != null) { - Iterator<TimelineEvents.EventsOfOneEntity> eventsItr = - events.getAllEvents().iterator(); - while (eventsItr.hasNext()) { - TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next(); - try { - TimelineEntity entity = store.getEntity( - eventsOfOneEntity.getEntityId(), - eventsOfOneEntity.getEntityType(), - EnumSet.of(Field.PRIMARY_FILTERS)); - // check ACLs - if (!timelineACLsManager.checkAccess(callerUGI, entity)) { - eventsItr.remove(); - } - } catch (Exception e) { - LOG.error("Error when verifying access for user " + callerUGI - + " on the events of the timeline entity " - + new EntityIdentifier(eventsOfOneEntity.getEntityId(), - eventsOfOneEntity.getEntityType()), e); - eventsItr.remove(); - } - } - } + parseLongStr(limit), + getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( "windowStart, windowEnd or limit is not a numeric value."); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error getting entity timelines", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } - if (events == null) { - return new TimelineEvents(); - } - return events; } /** @@ -333,9 +244,6 @@ public class TimelineWebServices { @Context HttpServletResponse res, TimelineEntities entities) { init(res); - if (entities == null) { - return new TimelinePutResponse(); - } UserGroupInformation callerUGI = getUser(req); if (callerUGI == null) { String msg = "The owner of the posted timeline entities is not set"; @@ -343,76 +251,8 @@ public class TimelineWebServices { throw new ForbiddenException(msg); } try { - List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>(); - TimelineEntities entitiesToPut = new TimelineEntities(); - List<TimelinePutResponse.TimelinePutError> errors = - new ArrayList<TimelinePutResponse.TimelinePutError>(); - for (TimelineEntity entity : entities.getEntities()) { - EntityIdentifier entityID = - new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); - - // check if there is existing entity - TimelineEntity existingEntity = null; - try { - existingEntity = - store.getEntity(entityID.getId(), entityID.getType(), - EnumSet.of(Field.PRIMARY_FILTERS)); - if (existingEntity != null - && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) { - throw new YarnException("The timeline entity " + entityID - + " was not put by " + callerUGI + " before"); - } - } catch (Exception e) { - // Skip the entity which already exists and was put by others - LOG.warn("Skip the timeline entity: " + entityID + ", because " - + e.getMessage()); - TimelinePutResponse.TimelinePutError error = - new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); - error.setErrorCode( - TimelinePutResponse.TimelinePutError.ACCESS_DENIED); - errors.add(error); - continue; - } - - // inject owner information for the access check if this is the first - // time to post the entity, in case it's the admin who is updating - // the timeline data. - try { - if (existingEntity == null) { - injectOwnerInfo(entity, callerUGI.getShortUserName()); - } - } catch (YarnException e) { - // Skip the entity which messes up the primary filter and record the - // error - LOG.warn("Skip the timeline entity: " + entityID + ", because " - + e.getMessage()); - TimelinePutResponse.TimelinePutError error = - new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); - error.setErrorCode( - TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT); - errors.add(error); - continue; - } - - entityIDs.add(entityID); - entitiesToPut.addEntity(entity); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing the entity " + entityID + ", JSON-style content: " - + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs)); - } - TimelinePutResponse response = store.put(entitiesToPut); - // add the errors of timeline system filter key conflict - response.addErrors(errors); - return response; - } catch (IOException e) { + return timelineDataManager.postEntities(entities, callerUGI); + } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); @@ -423,6 +263,15 @@ public class TimelineWebServices { response.setContentType(null); } + private static UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUGI; + } + private static SortedSet<String> parseArrayStr(String str, String delimiter) { if (str == null) { return null; @@ -495,14 +344,6 @@ public class TimelineWebServices { } } - private static boolean extendFields(EnumSet<Field> fieldEnums) { - boolean modified = false; - if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) { - fieldEnums.add(Field.PRIMARY_FILTERS); - modified = true; - } - return modified; - } private static Long parseLongStr(String str) { return str == null ? null : Long.parseLong(str.trim()); } @@ -511,34 +352,4 @@ public class TimelineWebServices { return str == null ? null : str.trim(); } - private static UserGroupInformation getUser(HttpServletRequest req) { - String remoteUser = req.getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } - return callerUGI; - } - - private static void injectOwnerInfo(TimelineEntity timelineEntity, - String owner) throws YarnException { - if (timelineEntity.getPrimaryFilters() != null && - timelineEntity.getPrimaryFilters().containsKey( - TimelineStore.SystemFilter.ENTITY_OWNER.toString())) { - throw new YarnException( - "User should not use the timeline system filter key: " - + TimelineStore.SystemFilter.ENTITY_OWNER); - } - timelineEntity.addPrimaryFilter( - TimelineStore.SystemFilter.ENTITY_OWNER - .toString(), owner); - } - - private static void cleanupOwnerInfo(TimelineEntity timelineEntity) { - if (timelineEntity.getPrimaryFilters() != null) { - timelineEntity.getPrimaryFilters().remove( - TimelineStore.SystemFilter.ENTITY_OWNER.toString()); - } - } - } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java Wed Aug 20 01:34:29 2014 @@ -69,7 +69,7 @@ public class TestApplicationHistoryClien historyServer.init(config); historyServer.start(); store = - ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory()) + ((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager()) .getHistoryStore(); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java Wed Aug 20 01:34:29 2014 @@ -23,11 +23,14 @@ import static org.junit.Assert.assertNot import static org.junit.Assert.fail; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AuthenticationFilterInitializer; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; +import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer; import org.junit.After; +import org.junit.Assert; import org.junit.Test; public class TestApplicationHistoryServer { @@ -69,6 +72,31 @@ public class TestApplicationHistoryServe } } + @Test(timeout = 50000) + public void testFilteOverrides() throws Exception { + + String[] filterInitializers = + { + AuthenticationFilterInitializer.class.getName(), + TimelineAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + "," + + TimelineAuthenticationFilterInitializer.class.getName(), + AuthenticationFilterInitializer.class.getName() + ", " + + TimelineAuthenticationFilterInitializer.class.getName() }; + for (String filterInitializer : filterInitializers) { + historyServer = new ApplicationHistoryServer(); + Configuration config = new YarnConfiguration(); + config.set("hadoop.http.filter.initializers", filterInitializer); + historyServer.init(config); + historyServer.start(); + Configuration tmp = historyServer.getConfig(); + assertEquals(TimelineAuthenticationFilterInitializer.class.getName(), + tmp.get("hadoop.http.filter.initializers")); + historyServer.stop(); + AHSWebApp.resetInstance(); + } + } + @After public void stop() { if (historyServer != null) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java Wed Aug 20 01:34:29 2014 @@ -36,14 +36,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.iq80.leveldb.DBIterator; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -52,19 +55,19 @@ import org.junit.Test; public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { private FileContext fsContext; private File fsPath; + private Configuration config = new YarnConfiguration(); @Before public void setup() throws Exception { fsContext = FileContext.getLocalFSFileContext(); - Configuration conf = new YarnConfiguration(); fsPath = new File("target", this.getClass().getSimpleName() + "-tmpDir").getAbsoluteFile(); fsContext.delete(new Path(fsPath.getAbsolutePath()), true); - conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, fsPath.getAbsolutePath()); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); store = new LeveldbTimelineStore(); - store.init(conf); + store.init(config); store.start(); loadTestData(); loadVerificationData(); @@ -263,5 +266,47 @@ public class TestLeveldbTimelineStore ex assertEquals(1, getEntities("type_2").size()); assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); } + + @Test + public void testCheckVersion() throws IOException { + LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store; + // default version + Version defaultVersion = dbStore.getCurrentVersion(); + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // compatible version + Version compatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + dbStore.storeVersion(compatibleVersion); + Assert.assertEquals(compatibleVersion, dbStore.loadVersion()); + restartTimelineStore(); + dbStore = (LeveldbTimelineStore) store; + // overwrite the compatible version + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // incompatible version + Version incompatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion() + 1, + defaultVersion.getMinorVersion()); + dbStore.storeVersion(incompatibleVersion); + try { + restartTimelineStore(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for timeline store")); + } + } + + private void restartTimelineStore() throws IOException { + // need to close so leveldb releases database lock + if (store != null) { + store.close(); + } + store = new LeveldbTimelineStore(); + store.init(config); + store.start(); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java Wed Aug 20 01:34:29 2014 @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Set; import javax.servlet.FilterConfig; +import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.ws.rs.core.MediaType; @@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AdminACLsManager; import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter; @@ -88,14 +90,15 @@ public class TestTimelineWebServices ext } catch (Exception e) { Assert.fail(); } - bind(TimelineStore.class).toInstance(store); Configuration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false); timelineACLsManager = new TimelineACLsManager(conf); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); adminACLsManager = new AdminACLsManager(conf); - bind(TimelineACLsManager.class).toInstance(timelineACLsManager); + TimelineDataManager timelineDataManager = + new TimelineDataManager(store, timelineACLsManager); + bind(TimelineDataManager.class).toInstance(timelineDataManager); serve("/*").with(GuiceContainer.class); TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter(); FilterConfig filterConfig = mock(FilterConfig.class); @@ -105,6 +108,8 @@ public class TestTimelineWebServices ext .thenReturn("simple"); when(filterConfig.getInitParameter( PseudoAuthenticationHandler.ANONYMOUS_ALLOWED)).thenReturn("true"); + ServletContext context = mock(ServletContext.class); + when(filterConfig.getServletContext()).thenReturn(context); Enumeration<Object> names = mock(Enumeration.class); when(names.hasMoreElements()).thenReturn(true, true, false); when(names.nextElement()).thenReturn( Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Wed Aug 20 01:34:29 2014 @@ -47,4 +47,10 @@ message NodeHealthStatusProto { optional bool is_node_healthy = 1; optional string health_report = 2; optional int64 last_health_report_time = 3; -} \ No newline at end of file +} + +message VersionProto { + optional int32 major_version = 1; + optional int32 minor_version = 2; +} + Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Wed Aug 20 01:34:29 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -29,17 +30,18 @@ import java.util.concurrent.locks.Reentr import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; @@ -126,9 +128,76 @@ public abstract class ContainerExecutor public abstract void deleteAsUser(String user, Path subDir, Path... basedirs) throws IOException, InterruptedException; + public abstract boolean isContainerProcessAlive(String user, String pid) + throws IOException; + + /** + * Recover an already existing container. This is a blocking call and returns + * only when the container exits. Note that the container must have been + * activated prior to this call. + * @param user the user of the container + * @param containerId The ID of the container to reacquire + * @return The exit code of the pre-existing container + * @throws IOException + */ + public int reacquireContainer(String user, ContainerId containerId) + throws IOException { + Path pidPath = getPidFilePath(containerId); + if (pidPath == null) { + LOG.warn(containerId + " is not active, returning terminated error"); + return ExitCode.TERMINATED.getExitCode(); + } + + String pid = null; + pid = ProcessIdFileReader.getProcessId(pidPath); + if (pid == null) { + throw new IOException("Unable to determine pid for " + containerId); + } + + LOG.info("Reacquiring " + containerId + " with pid " + pid); + try { + while(isContainerProcessAlive(user, pid)) { + Thread.sleep(1000); + } + } catch (InterruptedException e) { + throw new IOException("Interrupted while waiting for process " + pid + + " to exit", e); + } + + // wait for exit code file to appear + String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString()); + File file = new File(exitCodeFile); + final int sleepMsec = 100; + int msecLeft = 2000; + while (!file.exists() && msecLeft >= 0) { + if (!isContainerActive(containerId)) { + LOG.info(containerId + " was deactivated"); + return ExitCode.TERMINATED.getExitCode(); + } + try { + Thread.sleep(sleepMsec); + } catch (InterruptedException e) { + throw new IOException( + "Interrupted while waiting for exit code from " + containerId, e); + } + msecLeft -= sleepMsec; + } + if (msecLeft < 0) { + throw new IOException("Timeout while waiting for exit code from " + + containerId); + } + + try { + return Integer.parseInt(FileUtils.readFileToString(file).trim()); + } catch (NumberFormatException e) { + throw new IOException("Error parsing exit code from pid " + pid, e); + } + } + public enum ExitCode { FORCE_KILLED(137), - TERMINATED(143); + TERMINATED(143), + LOST(154); private final int code; private ExitCode(int exitCode) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Wed Aug 20 01:34:29 2014 @@ -273,25 +273,57 @@ public class DefaultContainerExecutor ex private final class UnixLocalWrapperScriptBuilder extends LocalWrapperScriptBuilder { + private final Path sessionScriptPath; public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { super(containerWorkDir); + this.sessionScriptPath = new Path(containerWorkDir, + Shell.appendScriptExtension("default_container_executor_session")); + } + + @Override + public void writeLocalWrapperScript(Path launchDst, Path pidFile) + throws IOException { + writeSessionScript(launchDst, pidFile); + super.writeLocalWrapperScript(launchDst, pidFile); } @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { - - // We need to do a move as writing to a file is not atomic - // Process reading a file being written to may get garbled data - // hence write pid to tmp file first followed by a mv + String exitCodeFile = ContainerLaunch.getExitCodeFile( + pidFile.toString()); + String tmpFile = exitCodeFile + ".tmp"; pout.println("#!/bin/bash"); - pout.println(); - pout.println("echo $$ > " + pidFile.toString() + ".tmp"); - pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); - String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; - pout.println(exec + " /bin/bash \"" + - launchDst.toUri().getPath().toString() + "\""); + pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\""); + pout.println("rc=$?"); + pout.println("echo $rc > \"" + tmpFile + "\""); + pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\""); + pout.println("exit $rc"); + } + + private void writeSessionScript(Path launchDst, Path pidFile) + throws IOException { + DataOutputStream out = null; + PrintStream pout = null; + try { + out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); + pout = new PrintStream(out); + // We need to do a move as writing to a file is not atomic + // Process reading a file being written to may get garbled data + // hence write pid to tmp file first followed by a mv + pout.println("#!/bin/bash"); + pout.println(); + pout.println("echo $$ > " + pidFile.toString() + ".tmp"); + pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); + String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; + pout.println(exec + " /bin/bash \"" + + launchDst.toUri().getPath().toString() + "\""); + } finally { + IOUtils.cleanup(LOG, pout, out); + } + lfs.setPermission(sessionScriptPath, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); } } @@ -310,6 +342,7 @@ public class DefaultContainerExecutor ex @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { + // TODO: exit code script for Windows // On Windows, the pid is the container ID, so that it can also serve as // the name of the job object created by winutils for task management. @@ -342,6 +375,12 @@ public class DefaultContainerExecutor ex return true; } + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + return containerIsAlive(pid); + } + /** * Returns true if the process with the specified pid is alive. * Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Wed Aug 20 01:34:29 2014 @@ -403,6 +403,13 @@ public class LinuxContainerExecutor exte } } + @Override + public boolean isContainerProcessAlive(String user, String pid) + throws IOException { + // Send a test signal to the process as the user to see if it's alive + return signalContainer(user, pid, Signal.NULL); + } + public void mountCgroups(List<String> cgroupKVs, String hierarchy) throws IOException { List<String> command = new ArrayList<String>( Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Wed Aug 20 01:34:29 2014 @@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.record public interface NodeStatusUpdater extends Service { + /** + * Schedule a heartbeat to the ResourceManager outside of the normal, + * periodic heartbeating process. This is typically called when the state + * of containers on the node has changed to notify the RM sooner. + */ void sendOutofBandHeartBeat(); + /** + * Get the ResourceManager identifier received during registration + * @return the ResourceManager ID + */ long getRMIdentifier(); + /** + * Query if a container has recently completed + * @param containerId the container ID + * @return true if the container has recently completed + */ public boolean isContainerRecentlyStopped(ContainerId containerId); + /** + * Add a container to the list of containers that have recently completed + * @param containerId the ID of the completed container + */ + public void addCompletedContainer(ContainerId containerId); + + /** + * Clear the list of recently completed containers + */ public void clearFinishedContainersFromCache(); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Aug 20 01:34:29 2014 @@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl exten // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - updateStoppedContainersInCache(container.getContainerId()); - addCompletedContainer(container); + addCompletedContainer(container.getContainerId()); } } if (LOG.isDebugEnabled()) { @@ -393,8 +392,7 @@ public class NodeStatusUpdaterImpl exten // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - updateStoppedContainersInCache(container.getContainerId()); - addCompletedContainer(container); + addCompletedContainer(container.getContainerId()); } } LOG.info("Sending out " + containerStatuses.size() @@ -402,9 +400,15 @@ public class NodeStatusUpdaterImpl exten return containerStatuses; } - private void addCompletedContainer(Container container) { + @Override + public void addCompletedContainer(ContainerId containerId) { synchronized (previousCompletedContainers) { - previousCompletedContainers.add(container.getContainerId()); + previousCompletedContainers.add(containerId); + } + synchronized (recentlyStoppedContainers) { + removeVeryOldStoppedContainersFromCache(); + recentlyStoppedContainers.put(containerId, + System.currentTimeMillis() + durationToTrackStoppedContainers); } } @@ -451,16 +455,6 @@ public class NodeStatusUpdaterImpl exten } } - @Private - @VisibleForTesting - public void updateStoppedContainersInCache(ContainerId containerId) { - synchronized (recentlyStoppedContainers) { - removeVeryOldStoppedContainersFromCache(); - recentlyStoppedContainers.put(containerId, - System.currentTimeMillis() + durationToTrackStoppedContainers); - } - } - @Override public void clearFinishedContainersFromCache() { synchronized (recentlyStoppedContainers) { @@ -476,8 +470,14 @@ public class NodeStatusUpdaterImpl exten Iterator<ContainerId> i = recentlyStoppedContainers.keySet().iterator(); while (i.hasNext()) { - if (recentlyStoppedContainers.get(i.next()) < currentTime) { + ContainerId cid = i.next(); + if (recentlyStoppedContainers.get(cid) < currentTime) { i.remove(); + try { + context.getNMStateStore().removeContainer(cid); + } catch (IOException e) { + LOG.error("Unable to remove container " + cid + " in store", e); + } } else { break; }
