Author: bikas
Date: Tue Jul 9 23:26:05 2013
New Revision: 1501609
URL: http://svn.apache.org/r1501609
Log:
Merge r1501606 from branch-2 to branch-2.1-beta for YARN-369. Handle ( or throw
a proper error when receiving) status updates from application masters that
have not registered (Mayank Bansal & Abhishek Kapoor via bikas)
Added:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java
- copied unchanged from r1501606,
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt?rev=1501609&r1=1501608&r2=1501609&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt Tue
Jul 9 23:26:05 2013
@@ -623,6 +623,10 @@ Release 2.1.0-beta - 2013-07-02
YARN-845. RM crash with NPE on NODE_UPDATE (Mayank Bansal via bikas)
+ YARN-369. Handle ( or throw a proper error when receiving) status updates
+ from application masters that have not registered (Mayank Bansal &
+ Abhishek Kapoor via bikas)
+
BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1501609&r1=1501608&r2=1501609&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
(original)
+++
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
Tue Jul 9 23:26:05 2013
@@ -195,17 +195,35 @@ public class ApplicationMasterService ex
// Allow only one thread in AM to do registerApp at a time.
synchronized (lastResponse) {
- LOG.info("AM registration " + applicationAttemptId);
+ if (hasApplicationMasterRegistered(applicationAttemptId)) {
+ String message =
+ "Application Master is already registered : "
+ + applicationAttemptId.getApplicationId();
+ LOG.warn(message);
+ RMAuditLogger.logFailure(
+ this.rmContext.getRMApps()
+ .get(applicationAttemptId.getApplicationId()).getUser(),
+ AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
+ applicationAttemptId.getApplicationId(), applicationAttemptId);
+ throw new InvalidApplicationMasterRequestException(message);
+ }
+
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
-
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRegistrationEvent(applicationAttemptId, request
- .getHost(), request.getRpcPort(), request.getTrackingUrl()));
-
RMApp app = this.rmContext.getRMApps().get(appID);
- RMAuditLogger.logSuccess(app.getUser(),
- AuditConstants.REGISTER_AM, "ApplicationMasterService", appID,
- applicationAttemptId);
+
+ // Setting the response id to 0 to identify if the
+ // application master is register for the respective attemptid
+ lastResponse.setResponseId(0);
+ responseMap.put(applicationAttemptId, lastResponse);
+ LOG.info("AM registration " + applicationAttemptId);
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(
+ new RMAppAttemptRegistrationEvent(applicationAttemptId, request
+ .getHost(), request.getRpcPort(), request.getTrackingUrl()));
+ RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM,
+ "ApplicationMasterService", appID, applicationAttemptId);
// Pick up min/max resource from scheduler...
RegisterApplicationMasterResponse response = recordFactory
@@ -257,6 +275,24 @@ public class ApplicationMasterService ex
}
}
+ /**
+ * @param appAttemptId
+ * @return true if application is registered for the respective attemptid
+ */
+ public boolean hasApplicationMasterRegistered(
+ ApplicationAttemptId appAttemptId) {
+ boolean hasApplicationMasterRegistered = false;
+ AllocateResponse lastResponse = responseMap.get(appAttemptId);
+ if (lastResponse != null) {
+ synchronized (lastResponse) {
+ if (lastResponse.getResponseId() >= 0) {
+ hasApplicationMasterRegistered = true;
+ }
+ }
+ }
+ return hasApplicationMasterRegistered;
+ }
+
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
@@ -272,6 +308,20 @@ public class ApplicationMasterService ex
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
return resync;
}
+
+ if (!hasApplicationMasterRegistered(appAttemptId)) {
+ String message =
+ "Application Master is trying to allocate before registering for: "
+ + appAttemptId.getApplicationId();
+ LOG.error(message);
+ RMAuditLogger.logFailure(
+ this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
+ .getUser(), AuditConstants.REGISTER_AM, "",
+ "ApplicationMasterService", message, appAttemptId.getApplicationId(),
+ appAttemptId);
+ throw new InvalidApplicationMasterRequestException(message);
+ }
+
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
return lastResponse;
@@ -442,7 +492,9 @@ public class ApplicationMasterService ex
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
- response.setResponseId(0);
+ // set response id to -1 before application master for the following
+ // attemptID get registered
+ response.setResponseId(-1);
LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, response);
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1501609&r1=1501608&r2=1501609&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
(original)
+++
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
Tue Jul 9 23:26:05 2013
@@ -78,10 +78,19 @@ public class MockAM {
finalState, attempt.getAppAttemptState());
}
- public RegisterApplicationMasterResponse registerAppAttempt() throws
Exception {
- waitForState(RMAppAttemptState.LAUNCHED);
+ public RegisterApplicationMasterResponse registerAppAttempt()
+ throws Exception {
+ return registerAppAttempt(true);
+ }
+
+ public RegisterApplicationMasterResponse registerAppAttempt(boolean wait)
+ throws Exception {
+ if (wait) {
+ waitForState(RMAppAttemptState.LAUNCHED);
+ }
responseId = 0;
- RegisterApplicationMasterRequest req =
Records.newRecord(RegisterApplicationMasterRequest.class);
+ RegisterApplicationMasterRequest req =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
req.setApplicationAttemptId(attemptId);
req.setHost("");
req.setRpcPort(1);
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1501609&r1=1501608&r2=1501609&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
(original)
+++
hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
Tue Jul 9 23:26:05 2013
@@ -19,12 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -166,4 +169,58 @@ public class TestApplicationMasterLaunch
am.waitForState(RMAppAttemptState.FINISHED);
rm.stop();
}
+
+
+ @SuppressWarnings("unused")
+ @Test(timeout = 100000)
+ public void testallocateBeforeAMRegistration() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ boolean thrown = false;
+ rootLogger.setLevel(Level.DEBUG);
+ MockRM rm = new MockRM();
+ rm.start();
+ MockNM nm1 = rm.registerNode("h1:1234", 5000);
+ RMApp app = rm.submitApp(2000);
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+
+ // request for containers
+ int request = 2;
+ try {
+ AllocateResponse ar =
+ am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
+ } catch (Exception e) {
+ Assert.assertEquals("Application Master is trying to allocate before "
+ + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
+ e.getMessage());
+ thrown = true;
+ }
+ // kick the scheduler
+ nm1.nodeHeartbeat(true);
+ try {
+ AllocateResponse amrs =
+ am.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>());
+ } catch (Exception e) {
+ Assert.assertEquals("Application Master is trying to allocate before "
+ + "registering for: " + attempt.getAppAttemptId().getApplicationId(),
+ e.getMessage());
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+ am.registerAppAttempt();
+ thrown = false;
+ try {
+ am.registerAppAttempt(false);
+ }
+ catch (Exception e) {
+ Assert.assertEquals("Application Master is already registered : "
+ + attempt.getAppAttemptId().getApplicationId(),
+ e.getMessage());
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+ }
}