Author: jianhe
Date: Sun Mar 30 21:35:54 2014
New Revision: 1583203
URL: http://svn.apache.org/r1583203
Log:
YARN-1893. Mark AtMostOnce annotation to ApplicationMasterProtocol#allocate.
Contributed by Xuan Gong.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1583203&r1=1583202&r2=1583203&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sun Mar 30 21:35:54 2014
@@ -596,6 +596,9 @@ Release 2.4.0 - UNRELEASED
YARN-1873. Fixed TestDistributedShell failure when the test cases are out
of
order. (Mit Desai via zjshen)
+ YARN-1893. Mark AtMostOnce annotation to
ApplicationMasterProtocol#allocate.
+ (Xuan Gong via jianhe)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java?rev=1583203&r1=1583202&r2=1583203&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java
Sun Mar 30 21:35:54 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -162,6 +163,7 @@ public interface ApplicationMasterProtoc
*/
@Public
@Stable
+ @AtMostOnce
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException;
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java?rev=1583203&r1=1583202&r2=1583203&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
Sun Mar 30 21:35:54 2014
@@ -33,6 +33,8 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
@@ -67,14 +69,18 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -82,6 +88,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -96,6 +103,7 @@ import org.apache.hadoop.yarn.server.api
import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
@@ -257,11 +265,13 @@ public abstract class ProtocolHATestBase
}
protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
- boolean overrideRTS) throws Exception {
+ boolean overrideRTS, boolean overrideApplicationMasterService)
+ throws Exception {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster =
new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,
- numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS);
+ numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS,
+ overrideApplicationMasterService);
cluster.resetStartFailoverFlag(false);
cluster.init(conf);
cluster.start();
@@ -285,17 +295,19 @@ public abstract class ProtocolHATestBase
private boolean overrideClientRMService;
private boolean overrideRTS;
+ private boolean overrideApplicationMasterService;
private final AtomicBoolean startFailover = new AtomicBoolean(false);
private final AtomicBoolean failoverTriggered = new AtomicBoolean(false);
public MiniYARNClusterForHATesting(String testName,
int numResourceManagers, int numNodeManagers, int numLocalDirs,
int numLogDirs, boolean enableAHS, boolean overrideClientRMService,
- boolean overrideRTS) {
+ boolean overrideRTS, boolean overrideApplicationMasterService) {
super(testName, numResourceManagers, numNodeManagers, numLocalDirs,
numLogDirs, enableAHS);
this.overrideClientRMService = overrideClientRMService;
this.overrideRTS = overrideRTS;
+ this.overrideApplicationMasterService = overrideApplicationMasterService;
}
public boolean getStartFailoverFlag() {
@@ -324,6 +336,11 @@ public abstract class ProtocolHATestBase
if (count >= maximumWaittingTime) {
return false;
}
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // DO NOTHING
+ }
return true;
}
@@ -354,6 +371,14 @@ public abstract class ProtocolHATestBase
}
return super.createResourceTrackerService();
}
+ @Override
+ protected ApplicationMasterService createApplicationMasterService() {
+ if (overrideApplicationMasterService) {
+ return new CustomedApplicationMasterService(this.rmContext,
+ this.scheduler);
+ }
+ return super.createApplicationMasterService();
+ }
};
}
@@ -717,5 +742,31 @@ public abstract class ProtocolHATestBase
return super.nodeHeartbeat(request);
}
}
+
+ private class CustomedApplicationMasterService extends
+ ApplicationMasterService {
+ public CustomedApplicationMasterService(RMContext rmContext,
+ YarnScheduler scheduler) {
+ super(rmContext, scheduler);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ resetStartFailoverFlag(true);
+ // make sure failover has been triggered
+ Assert.assertTrue(waittingForFailOver());
+ return createFakeAllocateResponse();
+ }
+
+ }
+
+ public AllocateResponse createFakeAllocateResponse() {
+ return AllocateResponse.newInstance(-1,
+ new ArrayList<ContainerStatus>(),
+ new ArrayList<Container>(), new ArrayList<NodeReport>(),
+ Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1,
+ null, new ArrayList<NMToken>());
+ }
}
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java?rev=1583203&r1=1583202&r2=1583203&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java
Sun Mar 30 21:35:54 2014
@@ -51,7 +51,7 @@ public class TestApplicationClientProtoc
@Before
public void initiate() throws Exception {
- startHACluster(1, true, false);
+ startHACluster(1, true, false, false);
Configuration conf = new YarnConfiguration(this.conf);
client = createAndStartYarnClient(conf);
}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java?rev=1583203&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java
Sun Mar 30 21:35:54 2014
@@ -0,0 +1,94 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{
+ private ApplicationMasterProtocol amClient;
+ private ApplicationAttemptId attemptId ;
+ RMAppAttempt appAttempt;
+
+ @Before
+ public void initiate() throws Exception {
+ startHACluster(0, false, false, true);
+ attemptId = this.cluster.createFakeApplicationAttemptId();
+ amClient = ClientRMProxy
+ .createRMProxy(this.conf, ApplicationMasterProtocol.class);
+
+ AMRMTokenIdentifier id =
+ new AMRMTokenIdentifier(attemptId);
+ Token<AMRMTokenIdentifier> appToken =
+ new Token<AMRMTokenIdentifier>(id, this.cluster.getResourceManager()
+ .getRMContext().getAMRMTokenSecretManager());
+ appToken.setService(new Text("appToken service"));
+ UserGroupInformation.setLoginUser(UserGroupInformation
+ .createRemoteUser(UserGroupInformation.getCurrentUser()
+ .getUserName()));
+ UserGroupInformation.getCurrentUser().addToken(appToken);
+ syncToken(appToken);
+ }
+
+ @After
+ public void shutDown() {
+ if(this.amClient != null) {
+ RPC.stopProxy(this.amClient);
+ }
+ }
+
+ @Test(timeout = 15000)
+ public void testAllocateOnHA() throws YarnException, IOException {
+ AllocateRequest request = AllocateRequest.newInstance(0, 50f,
+ new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>(),
+ ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
+ new ArrayList<String>()));
+ AllocateResponse response = amClient.allocate(request);
+ Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
+ }
+
+ private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
+ for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
+ this.cluster.getResourceManager(i).getRMContext()
+ .getAMRMTokenSecretManager().addPersistedPassword(token);
+ }
+ }
+}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java?rev=1583203&r1=1583202&r2=1583203&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
Sun Mar 30 21:35:54 2014
@@ -41,7 +41,7 @@ public class TestResourceTrackerOnHA ext
@Before
public void initiate() throws Exception {
- startHACluster(0, false, true);
+ startHACluster(0, false, true, false);
this.resourceTracker = getRMClient();
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1583203&r1=1583202&r2=1583203&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Sun Mar 30 21:35:54 2014
@@ -739,4 +739,8 @@ public class MiniYARNCluster extends Com
}
};
}
+
+ public int getNumOfResourceManager() {
+ return this.resourceManagers.length;
+ }
}