YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier. 
Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c86674a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c86674a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c86674a3

Branch: refs/heads/YARN-1051
Commit: c86674a3a4d99aa56bb8ed3f6df51e3fef215eba
Parents: 116f831
Author: Zhijie Shen <zjs...@apache.org>
Authored: Wed Sep 24 17:50:26 2014 -0700
Committer: Zhijie Shen <zjs...@apache.org>
Committed: Wed Sep 24 17:50:26 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../yarn/security/ContainerTokenIdentifier.java | 34 +++++++++++
 .../containermanager/ContainerManagerImpl.java  | 26 +++++++--
 .../application/ApplicationImpl.java            | 18 +++++-
 .../application/ApplicationInitEvent.java       | 13 +++++
 .../event/LogHandlerAppStartedEvent.java        | 13 +++++
 .../yarn_server_nodemanager_recovery.proto      |  1 +
 .../containermanager/TestContainerManager.java  | 12 +++-
 .../TestContainerManagerRecovery.java           | 25 +++++++-
 .../scheduler/SchedulerApplicationAttempt.java  |  6 +-
 .../security/RMContainerTokenSecretManager.java | 23 +++++++-
 .../yarn/server/resourcemanager/MockRM.java     | 22 +++++--
 .../capacity/TestContainerAllocation.java       | 60 +++++++++++++++++++-
 13 files changed, 239 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c33bf45..4e71c1b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -97,6 +97,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2102. Added the concept of a Timeline Domain to handle read/write ACLs
     on Timeline service event data. (Zhijie Shen via vinodkv)
 
+    YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier.
+    (Xuan Gong via zjshen)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
index ca847e0..0bb016a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
@@ -34,8 +34,11 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
 
 /**
  * TokenIdentifier for a container. Encodes {@link ContainerId},
@@ -59,10 +62,19 @@ public class ContainerTokenIdentifier extends 
TokenIdentifier {
   private long rmIdentifier;
   private Priority priority;
   private long creationTime;
+  private LogAggregationContext logAggregationContext;
 
   public ContainerTokenIdentifier(ContainerId containerID,
       String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
       int masterKeyId, long rmIdentifier, Priority priority, long 
creationTime) {
+    this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
+        rmIdentifier, priority, creationTime, null);
+  }
+
+  public ContainerTokenIdentifier(ContainerId containerID, String hostName,
+      String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
+      long rmIdentifier, Priority priority, long creationTime,
+      LogAggregationContext logAggregationContext) {
     this.containerId = containerID;
     this.nmHostAddr = hostName;
     this.appSubmitter = appSubmitter;
@@ -72,6 +84,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier 
{
     this.rmIdentifier = rmIdentifier;
     this.priority = priority;
     this.creationTime = creationTime;
+    this.logAggregationContext = logAggregationContext;
   }
 
   /**
@@ -119,6 +132,10 @@ public class ContainerTokenIdentifier extends 
TokenIdentifier {
     return this.rmIdentifier;
   }
 
+  public LogAggregationContext getLogAggregationContext() {
+    return this.logAggregationContext;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
@@ -138,6 +155,15 @@ public class ContainerTokenIdentifier extends 
TokenIdentifier {
     out.writeLong(this.rmIdentifier);
     out.writeInt(this.priority.getPriority());
     out.writeLong(this.creationTime);
+    if (this.logAggregationContext == null) {
+      out.writeInt(-1);
+    } else {
+      byte[] logAggregationContext =
+          ((LogAggregationContextPBImpl) this.logAggregationContext).getProto()
+            .toByteArray();
+      out.writeInt(logAggregationContext.length);
+      out.write(logAggregationContext);
+    }
   }
 
   @Override
@@ -158,6 +184,14 @@ public class ContainerTokenIdentifier extends 
TokenIdentifier {
     this.rmIdentifier = in.readLong();
     this.priority = Priority.newInstance(in.readInt());
     this.creationTime = in.readLong();
+    int size = in.readInt();
+    if (size != -1) {
+      byte[] bytes = new byte[size];
+      in.readFully(bytes);
+      this.logAggregationContext =
+          new LogAggregationContextPBImpl(
+            LogAggregationContextProto.parseFrom(bytes));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 12166e0..17c9e3e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -72,9 +72,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -275,11 +277,17 @@ public class ContainerManagerImpl extends 
CompositeService implements
           aclProto.getAcl());
     }
 
+    LogAggregationContext logAggregationContext = null;
+    if (p.getLogAggregationContext() != null) {
+      logAggregationContext =
+          new LogAggregationContextPBImpl(p.getLogAggregationContext());
+    }
+
     LOG.info("Recovering application " + appId);
     ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
         creds, context);
     context.getApplications().put(appId, app);
-    app.handle(new ApplicationInitEvent(appId, acls));
+    app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
 
   @SuppressWarnings("unchecked")
@@ -719,13 +727,19 @@ public class ContainerManagerImpl extends 
CompositeService implements
 
   private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
       String user, Credentials credentials,
-      Map<ApplicationAccessType, String> appAcls) {
+      Map<ApplicationAccessType, String> appAcls,
+      LogAggregationContext logAggregationContext) {
 
     ContainerManagerApplicationProto.Builder builder =
         ContainerManagerApplicationProto.newBuilder();
     builder.setId(((ApplicationIdPBImpl) appId).getProto());
     builder.setUser(user);
 
+    if (logAggregationContext != null) {
+      builder.setLogAggregationContext((
+          (LogAggregationContextPBImpl)logAggregationContext).getProto());
+    }
+
     builder.clearCredentials();
     if (credentials != null) {
       DataOutputBuffer dob = new DataOutputBuffer();
@@ -826,12 +840,16 @@ public class ContainerManagerImpl extends 
CompositeService implements
         if (null == context.getApplications().putIfAbsent(applicationID,
           application)) {
           LOG.info("Creating a new application reference for app " + 
applicationID);
+          LogAggregationContext logAggregationContext =
+              containerTokenIdentifier.getLogAggregationContext();
           Map<ApplicationAccessType, String> appAcls =
               container.getLaunchContext().getApplicationACLs();
           context.getNMStateStore().storeApplication(applicationID,
-              buildAppProto(applicationID, user, credentials, appAcls));
+              buildAppProto(applicationID, user, credentials, appAcls,
+                logAggregationContext));
           dispatcher.getEventHandler().handle(
-            new ApplicationInitEvent(applicationID, appAcls));
+            new ApplicationInitEvent(applicationID, appAcls,
+              logAggregationContext));
         }
 
         this.context.getNMStateStore().storeContainer(containerId, request);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index cc5544c..fbcd4a1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -33,6 +33,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -54,6 +55,8 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The state machine for the representation of an Application
  * within the NodeManager.
@@ -72,6 +75,8 @@ public class ApplicationImpl implements Application {
 
   private static final Log LOG = LogFactory.getLog(Application.class);
 
+  private LogAggregationContext logAggregationContext;
+
   Map<ContainerId, Container> containers =
       new HashMap<ContainerId, Container>();
 
@@ -234,10 +239,11 @@ public class ApplicationImpl implements Application {
       app.applicationACLs = initEvent.getApplicationACLs();
       app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
       // Inform the logAggregator
+      app.logAggregationContext = initEvent.getLogAggregationContext();
       app.dispatcher.getEventHandler().handle(
           new LogHandlerAppStartedEvent(app.appId, app.user,
               app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
-              app.applicationACLs)); 
+              app.applicationACLs, app.logAggregationContext)); 
     }
   }
 
@@ -467,4 +473,14 @@ public class ApplicationImpl implements Application {
   public String toString() {
     return appId.toString();
   }
+
+  @VisibleForTesting
+  public LogAggregationContext getLogAggregationContext() {
+    try {
+      this.readLock.lock();
+      return this.logAggregationContext;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
index 5746b6a..097cfb5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
@@ -22,18 +22,31 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 
 public class ApplicationInitEvent extends ApplicationEvent {
 
   private final Map<ApplicationAccessType, String> applicationACLs;
+  private final LogAggregationContext logAggregationContext;  
 
   public ApplicationInitEvent(ApplicationId appId,
       Map<ApplicationAccessType, String> acls) {
+    this(appId, acls, null);
+  }
+
+  public ApplicationInitEvent(ApplicationId appId,
+      Map<ApplicationAccessType, String> acls,
+      LogAggregationContext logAggregationContext) {
     super(appId, ApplicationEventType.INIT_APPLICATION);
     this.applicationACLs = acls;
+    this.logAggregationContext = logAggregationContext;
   }
 
   public Map<ApplicationAccessType, String> getApplicationACLs() {
     return this.applicationACLs;
   }
+
+  public LogAggregationContext getLogAggregationContext() {
+    return this.logAggregationContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
index 6c07674..993f69c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 
 public class LogHandlerAppStartedEvent extends LogHandlerEvent {
@@ -32,16 +33,25 @@ public class LogHandlerAppStartedEvent extends 
LogHandlerEvent {
   private final String user;
   private final Credentials credentials;
   private final Map<ApplicationAccessType, String> appAcls;
+  private final LogAggregationContext logAggregationContext;
 
   public LogHandlerAppStartedEvent(ApplicationId appId, String user,
       Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
       Map<ApplicationAccessType, String> appAcls) {
+    this(appId, user, credentials, retentionPolicy, appAcls, null);
+  }
+
+  public LogHandlerAppStartedEvent(ApplicationId appId, String user,
+      Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
+      Map<ApplicationAccessType, String> appAcls,
+      LogAggregationContext logAggregationContext) {
     super(LogHandlerEventType.APPLICATION_STARTED);
     this.applicationId = appId;
     this.user = user;
     this.credentials = credentials;
     this.retentionPolicy = retentionPolicy;
     this.appAcls = appAcls;
+    this.logAggregationContext = logAggregationContext;
   }
 
   public ApplicationId getApplicationId() {
@@ -64,4 +74,7 @@ public class LogHandlerAppStartedEvent extends 
LogHandlerEvent {
     return this.appAcls;
   }
 
+  public LogAggregationContext getLogAggregationContext() {
+    return this.logAggregationContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index e6f39f6..d8fdd8b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -29,6 +29,7 @@ message ContainerManagerApplicationProto {
   optional string user = 2;
   optional bytes credentials = 3;
   repeated ApplicationACLMapProto acls = 4;
+  optional LogAggregationContextProto log_aggregation_context = 5;
 }
 
 message DeletionServiceDeleteTaskProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index f2109b5..da39cb6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -795,11 +796,20 @@ public class TestContainerManager extends 
BaseContainerManagerTest {
       NodeId nodeId, String user,
       NMContainerTokenSecretManager containerTokenSecretManager)
       throws IOException {
+    return createContainerToken(cId, rmIdentifier, nodeId, user,
+      containerTokenSecretManager, null);
+  }
+
+  public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+      NodeId nodeId, String user,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext)
+      throws IOException {
     Resource r = BuilderUtils.newResource(1024, 1);
     ContainerTokenIdentifier containerTokenIdentifier =
         new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
           System.currentTimeMillis() + 100000L, 123, rmIdentifier,
-          Priority.newInstance(0), 0);
+          Priority.newInstance(0), 0, logAggregationContext);
     Token containerToken =
         BuilderUtils
           .newContainerToken(nodeId, containerTokenSecretManager

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 0319664..2c69843 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -58,6 +59,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
@@ -126,8 +128,12 @@ public class TestContainerManagerRecovery {
     ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
         localResources, containerEnv, containerCmds, serviceData,
         containerTokens, acls);
+    // create the logAggregationContext
+    LogAggregationContext logAggregationContext =
+        LogAggregationContext.newInstance("includePattern", "excludePattern",
+          1000);
     StartContainersResponse startResponse = startContainer(context, cm, cid,
-        clc);
+        clc, logAggregationContext);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
     Application app = context.getApplications().get(appId);
@@ -157,6 +163,18 @@ public class TestContainerManagerRecovery {
     assertEquals(1, context.getApplications().size());
     app = context.getApplications().get(appId);
     assertNotNull(app);
+
+    // check whether LogAggregationContext is recovered correctly
+    LogAggregationContext recovered =
+        ((ApplicationImpl) app).getLogAggregationContext();
+    assertNotNull(recovered);
+    assertEquals(logAggregationContext.getRollingIntervalSeconds(),
+      recovered.getRollingIntervalSeconds());
+    assertEquals(logAggregationContext.getIncludePattern(),
+      recovered.getIncludePattern());
+    assertEquals(logAggregationContext.getExcludePattern(),
+      recovered.getExcludePattern());
+
     waitForAppState(app, ApplicationState.INITING);
     assertTrue(context.getApplicationACLsManager().checkAccess(
         UserGroupInformation.createRemoteUser(modUser),
@@ -224,13 +242,14 @@ public class TestContainerManagerRecovery {
 
   private StartContainersResponse startContainer(Context context,
       final ContainerManagerImpl cm, ContainerId cid,
-      ContainerLaunchContext clc) throws Exception {
+      ContainerLaunchContext clc, LogAggregationContext logAggregationContext)
+          throws Exception {
     UserGroupInformation user = UserGroupInformation.createRemoteUser(
         cid.getApplicationAttemptId().toString());
     StartContainerRequest scReq = StartContainerRequest.newInstance(
         clc, TestContainerManager.createContainerToken(cid, 0,
             context.getNodeId(), user.getShortUserName(),
-            context.getContainerTokenSecretManager()));
+            context.getContainerTokenSecretManager(), logAggregationContext));
     final List<StartContainerRequest> scReqList =
         new ArrayList<StartContainerRequest>();
     scReqList.add(scReq);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index c9b0303..84975b6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -91,6 +92,7 @@ public class SchedulerApplicationAttempt {
   private Resource amResource = Resources.none();
   private boolean unmanagedAM = true;
   private boolean amRunning = false;
+  private LogAggregationContext logAggregationContext;
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
@@ -138,6 +140,8 @@ public class SchedulerApplicationAttempt {
               .getApplicationSubmissionContext();
       if (appSubmissionContext != null) {
         unmanagedAM = appSubmissionContext.getUnmanagedAM();
+        this.logAggregationContext =
+            appSubmissionContext.getLogAggregationContext();
       }
     }
   }
@@ -444,7 +448,7 @@ public class SchedulerApplicationAttempt {
         container.setContainerToken(rmContext.getContainerTokenSecretManager()
           .createContainerToken(container.getId(), container.getNodeId(),
             getUser(), container.getResource(), container.getPriority(),
-            rmContainer.getCreationTime()));
+            rmContainer.getCreationTime(), this.logAggregationContext));
         NMToken nmToken =
             rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
               getApplicationAttemptId(), container);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
index 13943f8..15dd1a9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -177,6 +178,25 @@ public class RMContainerTokenSecretManager extends
   public Token createContainerToken(ContainerId containerId, NodeId nodeId,
       String appSubmitter, Resource capability, Priority priority,
       long createTime) {
+    return createContainerToken(containerId, nodeId, appSubmitter, capability,
+      priority, createTime, null);
+  }
+
+  /**
+   * Helper function for creating ContainerTokens
+   * 
+   * @param containerId
+   * @param nodeId
+   * @param appSubmitter
+   * @param capability
+   * @param priority
+   * @param createTime
+   * @param logAggregationContext
+   * @return the container-token
+   */
+  public Token createContainerToken(ContainerId containerId, NodeId nodeId,
+      String appSubmitter, Resource capability, Priority priority,
+      long createTime, LogAggregationContext logAggregationContext) {
     byte[] password;
     ContainerTokenIdentifier tokenIdentifier;
     long expiryTimeStamp =
@@ -189,7 +209,8 @@ public class RMContainerTokenSecretManager extends
           new ContainerTokenIdentifier(containerId, nodeId.toString(),
             appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
               .getMasterKey().getKeyId(),
-            ResourceManager.getClusterTimeStamp(), priority, createTime);
+            ResourceManager.getClusterTimeStamp(), priority, createTime,
+            logAggregationContext);
       password = this.createPassword(tokenIdentifier);
 
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 1338a6c..4f5fdeb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -278,7 +279,7 @@ public class MockRM extends ResourceManager {
       boolean waitForAccepted, boolean keepContainers) throws Exception {
     return submitApp(masterMemory, name, user, acls, unmanaged, queue,
         maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
-        false, null, 0);
+        false, null, 0, null);
   }
 
   public RMApp submitApp(int masterMemory, long 
attemptFailuresValidityInterval)
@@ -287,7 +288,7 @@ public class MockRM extends ResourceManager {
       .getShortUserName(), null, false, null,
       super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
-      false, null, attemptFailuresValidityInterval);
+      false, null, attemptFailuresValidityInterval, null);
   }
 
   public RMApp submitApp(int masterMemory, String name, String user,
@@ -297,14 +298,24 @@ public class MockRM extends ResourceManager {
       ApplicationId applicationId) throws Exception {
     return submitApp(masterMemory, name, user, acls, unmanaged, queue,
       maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
-      isAppIdProvided, applicationId, 0);
+      isAppIdProvided, applicationId, 0, null);
   }
 
+  public RMApp submitApp(int masterMemory,
+      LogAggregationContext logAggregationContext) throws Exception {
+    return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
+      .getShortUserName(), null, false, null,
+      super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
+      false, null, 0, logAggregationContext);
+   }
+
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType,
       boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
-      ApplicationId applicationId, long attemptFailuresValidityInterval)
+      ApplicationId applicationId, long attemptFailuresValidityInterval,
+      LogAggregationContext logAggregationContext)
       throws Exception {
     ApplicationId appId = isAppIdProvided ? applicationId : null;
     ApplicationClientProtocol client = getClientRMService();
@@ -342,6 +353,9 @@ public class MockRM extends ResourceManager {
     }
     sub.setAMContainerSpec(clc);
     sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
+    if (logAggregationContext != null) {
+      sub.setLogAggregationContext(logAggregationContext);
+    }
     req.setApplicationSubmissionContext(sub);
     UserGroupInformation fakeUser =
       UserGroupInformation.createUserForTesting(user, new String[] 
{"someGroup"});

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c86674a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index a9bfc2f..85ef381 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.security.SecurityUtilTestHelper;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -47,6 +49,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -195,6 +198,58 @@ public class TestContainerAllocation {
     Assert.assertEquals(1, containers.size());
   }
 
+  // This is to test whether LogAggregationContext is passed into
+  // container tokens correctly
+  @Test
+  public void testLogAggregationContextPassedIntoContainerToken()
+      throws Exception {
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
+    MockNM nm2 = rm1.registerNode("127.0.0.1:2345", 8000);
+    // LogAggregationContext is set as null
+    Assert
+      .assertNull(getLogAggregationContextFromContainerToken(rm1, nm1, null));
+
+    // create a not-null LogAggregationContext
+    final int interval = 2000;
+    LogAggregationContext logAggregationContext =
+        LogAggregationContext.newInstance(
+          "includePattern", "excludePattern", interval);
+    LogAggregationContext returned =
+        getLogAggregationContextFromContainerToken(rm1, nm2,
+          logAggregationContext);
+    Assert.assertEquals("includePattern", returned.getIncludePattern());
+    Assert.assertEquals("excludePattern", returned.getExcludePattern());
+    Assert.assertEquals(interval, returned.getRollingIntervalSeconds());
+    rm1.stop();
+  }
+
+  private LogAggregationContext getLogAggregationContextFromContainerToken(
+      MockRM rm1, MockNM nm1, LogAggregationContext logAggregationContext)
+      throws Exception {
+    RMApp app2 = rm1.submitApp(200, logAggregationContext);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+    nm1.nodeHeartbeat(true);
+    // request a container.
+    am2.allocate("127.0.0.1", 512, 1, new ArrayList<ContainerId>());
+    ContainerId containerId =
+        ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
+
+    // acquire the container.
+    List<Container> containers =
+        am2.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers();
+    Assert.assertEquals(containerId, containers.get(0).getId());
+    // container token is generated.
+    Assert.assertNotNull(containers.get(0).getContainerToken());
+    ContainerTokenIdentifier token =
+        BuilderUtils.newContainerTokenIdentifier(containers.get(0)
+          .getContainerToken());
+    return token.getLogAggregationContext();
+  }
+
   private volatile int numRetries = 0;
   private class TestRMSecretManagerService extends RMSecretManagerService {
 
@@ -210,10 +265,11 @@ public class TestContainerAllocation {
         @Override
         public Token createContainerToken(ContainerId containerId,
             NodeId nodeId, String appSubmitter, Resource capability,
-            Priority priority, long createTime) {
+            Priority priority, long createTime,
+            LogAggregationContext logAggregationContext) {
           numRetries++;
           return super.createContainerToken(containerId, nodeId, appSubmitter,
-            capability, priority, createTime);
+            capability, priority, createTime, logAggregationContext);
         }
       };
     }

Reply via email to