Author: zjshen
Date: Wed Mar 19 03:52:46 2014
New Revision: 1579126
URL: http://svn.apache.org/r1579126
Log:
YARN-1690. Made DistributedShell send timeline entities+events. Contributed by
Mayank Bansal.
svn merge --ignore-ancestry -c 1579123 ../../trunk/
Modified:
hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt?rev=1579126&r1=1579125&r2=1579126&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt Wed Mar
19 03:52:46 2014
@@ -119,6 +119,9 @@ Release 2.4.0 - UNRELEASED
YARN-1705. Reset cluster-metrics on transition to standby. (Rohith via
kasha)
+ YARN-1690. Made DistributedShell send timeline entities+events. (Mayank
Bansal
+ via zjshen)
+
IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal
via
Modified:
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1579126&r1=1579125&r2=1579126&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
(original)
+++
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
Wed Mar 19 03:52:46 2014
@@ -45,6 +45,7 @@ import org.apache.commons.cli.ParseExcep
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -55,6 +56,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -80,7 +82,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@@ -90,6 +95,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
@@ -160,6 +166,18 @@ public class ApplicationMaster {
private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+ @VisibleForTesting
+ @Private
+ public static enum DSEvent {
+ DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START,
DS_CONTAINER_END
+ }
+
+ @VisibleForTesting
+ @Private
+ public static enum DSEntity {
+ DS_APP_ATTEMPT, DS_CONTAINER
+ }
+
// Configuration
private Configuration conf;
@@ -242,6 +260,9 @@ public class ApplicationMaster {
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
+ // Timeline Client
+ private TimelineClient timelineClient;
+
private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c";
@@ -261,7 +282,8 @@ public class ApplicationMaster {
result = appMaster.finish();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
- System.exit(1);
+ LogManager.shutdown();
+ ExitUtil.terminate(1, t);
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
@@ -316,7 +338,6 @@ public class ApplicationMaster {
* @throws IOException
*/
public boolean init(String[] args) throws ParseException, IOException {
-
Options opts = new Options();
opts.addOption("app_attempt_id", true,
"App Attempt ID. Not to be used unless for testing purposes");
@@ -464,6 +485,11 @@ public class ApplicationMaster {
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0"));
+ // Creating the Timeline Client
+ timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(conf);
+ timelineClient.start();
+
return true;
}
@@ -485,6 +511,13 @@ public class ApplicationMaster {
@SuppressWarnings({ "unchecked" })
public void run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");
+ try {
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+ DSEvent.DS_APP_ATTEMPT_START);
+ } catch (Exception e) {
+ LOG.error("App Attempt start event coud not be pulished for "
+ + appAttemptID.toString(), e);
+ }
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
@@ -564,6 +597,13 @@ public class ApplicationMaster {
amRMClient.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainersToRequest);
+ try {
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+ DSEvent.DS_APP_ATTEMPT_END);
+ } catch (Exception e) {
+ LOG.error("App Attempt start event coud not be pulished for "
+ + appAttemptID.toString(), e);
+ }
}
@VisibleForTesting
@@ -668,6 +708,12 @@ public class ApplicationMaster {
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
+ try {
+ publishContainerEndEvent(timelineClient, containerStatus);
+ } catch (Exception e) {
+ LOG.error("Container start event could not be pulished for "
+ + containerStatus.getContainerId().toString(), e);
+ }
}
// ask for more containers if any failed
@@ -782,6 +828,13 @@ public class ApplicationMaster {
if (container != null) {
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId,
container.getNodeId());
}
+ try {
+ ApplicationMaster.publishContainerStartEvent(
+ applicationMaster.timelineClient, container);
+ } catch (Exception e) {
+ LOG.error("Container start event coud not be pulished for "
+ + container.getId().toString(), e);
+ }
}
@Override
@@ -968,4 +1021,54 @@ public class ApplicationMaster {
org.apache.commons.io.IOUtils.closeQuietly(ds);
}
}
+
+ private static void publishContainerStartEvent(TimelineClient timelineClient,
+ Container container) throws IOException, YarnException {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId(container.getId().toString());
+ entity.setEntityType(DSEntity.DS_CONTAINER.toString());
+ entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
+ .toString());
+ TimelineEvent event = new TimelineEvent();
+ event.setTimestamp(System.currentTimeMillis());
+ event.setEventType(DSEvent.DS_CONTAINER_START.toString());
+ event.addEventInfo("Node", container.getNodeId().toString());
+ event.addEventInfo("Resources", container.getResource().toString());
+ entity.addEvent(event);
+
+ timelineClient.putEntities(entity);
+ }
+
+ private static void publishContainerEndEvent(TimelineClient timelineClient,
+ ContainerStatus container) throws IOException, YarnException {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId(container.getContainerId().toString());
+ entity.setEntityType(DSEntity.DS_CONTAINER.toString());
+ entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
+ .toString());
+ TimelineEvent event = new TimelineEvent();
+ event.setTimestamp(System.currentTimeMillis());
+ event.setEventType(DSEvent.DS_CONTAINER_END.toString());
+ event.addEventInfo("State", container.getState().name());
+ event.addEventInfo("Exit Status", container.getExitStatus());
+ entity.addEvent(event);
+
+ timelineClient.putEntities(entity);
+ }
+
+ private static void publishApplicationAttemptEvent(
+ TimelineClient timelineClient, String appAttemptId, DSEvent appEvent)
+ throws IOException, YarnException {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId(appAttemptId);
+ entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
+ entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
+ .toString());
+ TimelineEvent event = new TimelineEvent();
+ event.setEventType(appEvent.toString());
+ event.setTimestamp(System.currentTimeMillis());
+ entity.addEvent(event);
+
+ timelineClient.putEntities(entity);
+ }
}
Modified:
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1579126&r1=1579125&r2=1579126&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
(original)
+++
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
Wed Mar 19 03:52:46 2014
@@ -36,11 +36,14 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -68,6 +71,7 @@ public class TestDistributedShell {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
+ conf.set("yarn.log.dir", "target");
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(
TestDistributedShell.class.getSimpleName(), 1, 1, 1);
@@ -92,6 +96,12 @@ public class TestDistributedShell {
os.write(bytesOut.toByteArray());
os.close();
}
+ FileContext fsContext = FileContext.getLocalFSFileContext();
+ fsContext
+ .delete(
+ new Path(conf
+ .get("yarn.timeline-service.leveldb-timeline-store.path")),
+ true);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
@@ -108,6 +118,12 @@ public class TestDistributedShell {
yarnCluster = null;
}
}
+ FileContext fsContext = FileContext.getLocalFSFileContext();
+ fsContext
+ .delete(
+ new Path(conf
+ .get("yarn.timeline-service.leveldb-timeline-store.path")),
+ true);
}
@Test(timeout=90000)
@@ -171,7 +187,27 @@ public class TestDistributedShell {
t.join();
LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(result.get());
-
+
+ TimelineEntities entitiesAttempts = yarnCluster
+ .getApplicationHistoryServer()
+ .getTimelineStore()
+ .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
+ null, null, null, null, null, null);
+ Assert.assertNotNull(entitiesAttempts);
+ Assert.assertEquals(1, entitiesAttempts.getEntities().size());
+ Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
+ .size());
+ Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType()
+ .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
+ TimelineEntities entities = yarnCluster
+ .getApplicationHistoryServer()
+ .getTimelineStore()
+ .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
+ null, null, null, null, null);
+ Assert.assertNotNull(entities);
+ Assert.assertEquals(2, entities.getEntities().size());
+ Assert.assertEquals(entities.getEntities().get(0).getEntityType()
+ .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
}
@Test(timeout=90000)
Modified:
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java?rev=1579126&r1=1579125&r2=1579126&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
(original)
+++
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
Wed Mar 19 03:52:46 2014
@@ -169,5 +169,12 @@ public class ApplicationHistoryServer ex
throw new YarnRuntimeException(msg, e);
}
}
-
+ /**
+ * @return ApplicationTimelineStore
+ */
+ @Private
+ @VisibleForTesting
+ public TimelineStore getTimelineStore() {
+ return timelineStore;
+ }
}
Modified:
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1579126&r1=1579125&r2=1579126&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
(original)
+++
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Wed Mar 19 03:52:46 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import
org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
@@ -100,6 +101,9 @@ public class MiniYARNCluster extends Com
private ResourceManager[] resourceManagers;
private String[] rmIds;
+ private ApplicationHistoryServer appHistoryServer;
+ private ApplicationHistoryServerWrapper appHistoryServerWrapper;
+
private boolean useFixedPorts;
private boolean useRpc = false;
private int failoverTimeout;
@@ -241,6 +245,8 @@ public class MiniYARNCluster extends Com
addService(new NodeManagerWrapper(index));
}
+ addService(new ApplicationHistoryServerWrapper());
+
super.serviceInit(
conf instanceof YarnConfiguration ? conf : new
YarnConfiguration(conf));
}
@@ -649,4 +655,67 @@ public class MiniYARNCluster extends Com
}
return false;
}
+
+ private class ApplicationHistoryServerWrapper extends AbstractService {
+ public ApplicationHistoryServerWrapper() {
+ super(ApplicationHistoryServerWrapper.class.getName());
+ }
+
+ @Override
+ protected synchronized void serviceInit(Configuration conf)
+ throws Exception {
+ if (!conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
+ YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS);
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
+ }
+ appHistoryServer = new ApplicationHistoryServer();
+ appHistoryServer.init(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected synchronized void serviceStart() throws Exception {
+ try {
+ new Thread() {
+ public void run() {
+ appHistoryServer.start();
+ };
+ }.start();
+ int waitCount = 0;
+ while (appHistoryServer.getServiceState() == STATE.INITED
+ && waitCount++ < 60) {
+ LOG.info("Waiting for Timeline Server to start...");
+ Thread.sleep(1500);
+ }
+ if (appHistoryServer.getServiceState() != STATE.STARTED) {
+ // AHS could have failed.
+ throw new IOException(
+ "ApplicationHistoryServer failed to start. Final state is "
+ + appHistoryServer.getServiceState());
+ }
+ super.serviceStart();
+ } catch (Throwable t) {
+ throw new YarnRuntimeException(t);
+ }
+ LOG.info("MiniYARN ApplicationHistoryServer address: "
+ + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS));
+ LOG.info("MiniYARN ApplicationHistoryServer web address: "
+ +
getConfig().get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS));
+ }
+
+ @Override
+ protected synchronized void serviceStop() throws Exception {
+ if (appHistoryServer != null) {
+ appHistoryServer.stop();
+ }
+ super.serviceStop();
+ }
+ }
+
+ public ApplicationHistoryServer getApplicationHistoryServer() {
+ return this.appHistoryServer;
+ }
}