[
https://issues.apache.org/jira/browse/YARN-11177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17584517#comment-17584517
]
ASF GitHub Bot commented on YARN-11177:
---------------------------------------
goiri commented on code in PR #4764:
URL: https://github.com/apache/hadoop/pull/4764#discussion_r954396015
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -925,13 +1041,61 @@ public ReservationListResponse listReservations(
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (request == null || request.getReservationId() == null
+ || request.getReservationDefinition() == null) {
Review Comment:
Indentation
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse
moveApplicationAcrossQueues(
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (request == null) {
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ String errMsg = "Missing getNewReservation request.";
+ RouterServerUtil.logAndThrowException(errMsg, null);
+ }
+
+ long startTime = clock.getTime();
+ Map<SubClusterId, SubClusterInfo> subClustersActive =
+ federationFacade.getSubClusters(true);
+
+ for (int i = 0; i < numSubmitRetries; ++i) {
+ SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+ LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+ ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
+ GetNewReservationResponse response = null;
+ try {
+ response = clientRMProxy.getNewReservation(request);
+ if (response != null) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededGetNewReservationRetrieved(stopTime -
startTime);
+ return response;
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to create a new Reservation in SubCluster {}.",
subClusterId.getId(), e);
+ subClustersActive.remove(subClusterId);
+ }
+ }
+
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ String errMsg = "Failed to create a new reservation.";
+ throw new YarnException(errMsg);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (request == null || request.getReservationId() == null
+ || request.getReservationDefinition() == null ||
request.getQueue() == null) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(
+ "Missing submitReservation request or reservationId " +
+ "or reservation definition or queue.", null);
+ }
+
+ long startTime = clock.getTime();
+ ReservationId reservationId = request.getReservationId();
+
+ long retryCount = 0;
+ boolean firstRetry = true;
+
+ while (retryCount < numSubmitRetries) {
+
+ SubClusterId subClusterId =
policyFacade.getReservationHomeSubCluster(request);
+ LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+ reservationId, retryCount, subClusterId);
+
+ ReservationHomeSubCluster reservationHomeSubCluster =
+ ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+ // If it is the first attempt,use StateStore to add the
+ // mapping of reservationId and subClusterId.
+ // if the number of attempts is greater than 1, use StateStore to update
the mapping.
+ if (firstRetry) {
+ try {
+ // persist the mapping of reservationId and the subClusterId which
has
+ // been selected as its home
+ subClusterId =
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+ firstRetry = false;
+ } catch (YarnException e) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to insert the ReservationId %s into the
FederationStateStore.",
+ reservationId);
+ }
+ } else {
+ try {
+ // update the mapping of reservationId and the home subClusterId to
+ // the new subClusterId we have selected
+
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+ } catch (YarnException e) {
+ SubClusterId subClusterIdInStateStore =
+ federationFacade.getReservationHomeSubCluster(reservationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Reservation {} already submitted on SubCluster {}.",
+ reservationId, subClusterId);
+ } else {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to update the ReservationId %s into the
FederationStateStore.",
+ reservationId);
Review Comment:
Indentation
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse
moveApplicationAcrossQueues(
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (request == null) {
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ String errMsg = "Missing getNewReservation request.";
+ RouterServerUtil.logAndThrowException(errMsg, null);
+ }
+
+ long startTime = clock.getTime();
+ Map<SubClusterId, SubClusterInfo> subClustersActive =
+ federationFacade.getSubClusters(true);
+
+ for (int i = 0; i < numSubmitRetries; ++i) {
+ SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+ LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+ ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
+ GetNewReservationResponse response = null;
+ try {
+ response = clientRMProxy.getNewReservation(request);
+ if (response != null) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededGetNewReservationRetrieved(stopTime -
startTime);
+ return response;
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to create a new Reservation in SubCluster {}.",
subClusterId.getId(), e);
+ subClustersActive.remove(subClusterId);
+ }
+ }
+
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ String errMsg = "Failed to create a new reservation.";
+ throw new YarnException(errMsg);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (request == null || request.getReservationId() == null
+ || request.getReservationDefinition() == null ||
request.getQueue() == null) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(
+ "Missing submitReservation request or reservationId " +
+ "or reservation definition or queue.", null);
+ }
+
+ long startTime = clock.getTime();
+ ReservationId reservationId = request.getReservationId();
+
+ long retryCount = 0;
+ boolean firstRetry = true;
+
+ while (retryCount < numSubmitRetries) {
+
+ SubClusterId subClusterId =
policyFacade.getReservationHomeSubCluster(request);
+ LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+ reservationId, retryCount, subClusterId);
+
+ ReservationHomeSubCluster reservationHomeSubCluster =
+ ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+ // If it is the first attempt,use StateStore to add the
+ // mapping of reservationId and subClusterId.
+ // if the number of attempts is greater than 1, use StateStore to update
the mapping.
+ if (firstRetry) {
+ try {
+ // persist the mapping of reservationId and the subClusterId which
has
+ // been selected as its home
+ subClusterId =
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+ firstRetry = false;
+ } catch (YarnException e) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to insert the ReservationId %s into the
FederationStateStore.",
+ reservationId);
+ }
+ } else {
+ try {
+ // update the mapping of reservationId and the home subClusterId to
+ // the new subClusterId we have selected
+
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+ } catch (YarnException e) {
+ SubClusterId subClusterIdInStateStore =
+ federationFacade.getReservationHomeSubCluster(reservationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Reservation {} already submitted on SubCluster {}.",
+ reservationId, subClusterId);
+ } else {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to update the ReservationId %s into the
FederationStateStore.",
+ reservationId);
+ }
+ }
+ }
+
+ // Obtain the ApplicationClientProtocol of the corresponding RM
according to the subClusterId,
+ // and call the submitReservation method, If the request is responded to,
+ // If the request is responded, it will return directly, otherwise
retryCount+1,
+ // continue to submit other request.
+ try {
+ ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
+ ReservationSubmissionResponse response =
clientRMProxy.submitReservation(request);
+ if (response != null) {
+ LOG.info("Reservation {} submitted on {}.",
request.getReservationId(), subClusterId);
+ long stopTime = clock.getTime();
+ routerMetrics.succeededSubmitReservationRetrieved(stopTime -
startTime);
+ return response;
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to submit the reservation {} to SubCluster {} error =
{}.",
+ reservationId, subClusterId.getId(), e.getMessage(), e);
+ }
+
+ retryCount++;
+ }
+
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ String msg = String.format("Reservation %s failed to be submitted.",
+ request.getReservationId());
Review Comment:
reservationId
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse
moveApplicationAcrossQueues(
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (request == null) {
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ String errMsg = "Missing getNewReservation request.";
+ RouterServerUtil.logAndThrowException(errMsg, null);
+ }
+
+ long startTime = clock.getTime();
+ Map<SubClusterId, SubClusterInfo> subClustersActive =
Review Comment:
Single line?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
NodeAttributeType.STRING, "nvida");
Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
}
+
+ @Test
+ public void testGetNewReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getNewReservation request.", () ->
interceptor.getNewReservation(null));
+
+ // normal request
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ ReservationId reservationId = response.getReservationId();
+ Assert.assertNotNull(reservationId);
+ Assert.assertTrue(reservationId.toString().contains("reservation"));
+ Assert.assertEquals(reservationId.getClusterTimestamp(),
ResourceManager.getClusterTimeStamp());
+ }
+
+ @Test
+ public void testSubmitReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ SubClusterId subClusterId =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId);
+ Assert.assertTrue(subClusters.contains(subClusterId));
+ }
+
+ @Test
+ public void testSubmitReservationEmptyRequest() throws Exception {
+ LOG.info("Test FederationClientInterceptor : SubmitReservation request
empty.");
+
+ // null request1
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(null));
+
+ // null request2
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(
+ ReservationSubmissionRequest.newInstance(null, null, null)));
Review Comment:
indentation is not correct
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse
moveApplicationAcrossQueues(
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (request == null) {
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ String errMsg = "Missing getNewReservation request.";
+ RouterServerUtil.logAndThrowException(errMsg, null);
+ }
+
+ long startTime = clock.getTime();
+ Map<SubClusterId, SubClusterInfo> subClustersActive =
+ federationFacade.getSubClusters(true);
+
+ for (int i = 0; i < numSubmitRetries; ++i) {
+ SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+ LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+ ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
+ GetNewReservationResponse response = null;
+ try {
+ response = clientRMProxy.getNewReservation(request);
+ if (response != null) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededGetNewReservationRetrieved(stopTime -
startTime);
+ return response;
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to create a new Reservation in SubCluster {}.",
subClusterId.getId(), e);
+ subClustersActive.remove(subClusterId);
+ }
+ }
+
+ routerMetrics.incrGetNewReservationFailedRetrieved();
+ String errMsg = "Failed to create a new reservation.";
+ throw new YarnException(errMsg);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ if (request == null || request.getReservationId() == null
+ || request.getReservationDefinition() == null ||
request.getQueue() == null) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(
+ "Missing submitReservation request or reservationId " +
+ "or reservation definition or queue.", null);
+ }
+
+ long startTime = clock.getTime();
+ ReservationId reservationId = request.getReservationId();
+
+ long retryCount = 0;
+ boolean firstRetry = true;
+
+ while (retryCount < numSubmitRetries) {
+
+ SubClusterId subClusterId =
policyFacade.getReservationHomeSubCluster(request);
+ LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+ reservationId, retryCount, subClusterId);
+
+ ReservationHomeSubCluster reservationHomeSubCluster =
+ ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+ // If it is the first attempt,use StateStore to add the
+ // mapping of reservationId and subClusterId.
+ // if the number of attempts is greater than 1, use StateStore to update
the mapping.
+ if (firstRetry) {
+ try {
+ // persist the mapping of reservationId and the subClusterId which
has
+ // been selected as its home
+ subClusterId =
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+ firstRetry = false;
+ } catch (YarnException e) {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to insert the ReservationId %s into the
FederationStateStore.",
+ reservationId);
+ }
+ } else {
+ try {
+ // update the mapping of reservationId and the home subClusterId to
+ // the new subClusterId we have selected
+
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+ } catch (YarnException e) {
+ SubClusterId subClusterIdInStateStore =
+ federationFacade.getReservationHomeSubCluster(reservationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Reservation {} already submitted on SubCluster {}.",
+ reservationId, subClusterId);
+ } else {
+ routerMetrics.incrSubmitReservationFailedRetrieved();
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to update the ReservationId %s into the
FederationStateStore.",
+ reservationId);
+ }
+ }
+ }
+
+ // Obtain the ApplicationClientProtocol of the corresponding RM
according to the subClusterId,
+ // and call the submitReservation method, If the request is responded to,
+ // If the request is responded, it will return directly, otherwise
retryCount+1,
+ // continue to submit other request.
+ try {
+ ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
+ ReservationSubmissionResponse response =
clientRMProxy.submitReservation(request);
+ if (response != null) {
+ LOG.info("Reservation {} submitted on {}.",
request.getReservationId(), subClusterId);
Review Comment:
reservationId is extracted already
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
NodeAttributeType.STRING, "nvida");
Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
}
+
+ @Test
+ public void testGetNewReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getNewReservation request.", () ->
interceptor.getNewReservation(null));
+
+ // normal request
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ ReservationId reservationId = response.getReservationId();
+ Assert.assertNotNull(reservationId);
+ Assert.assertTrue(reservationId.toString().contains("reservation"));
+ Assert.assertEquals(reservationId.getClusterTimestamp(),
ResourceManager.getClusterTimeStamp());
+ }
+
+ @Test
+ public void testSubmitReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ SubClusterId subClusterId =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId);
+ Assert.assertTrue(subClusters.contains(subClusterId));
+ }
+
+ @Test
+ public void testSubmitReservationEmptyRequest() throws Exception {
+ LOG.info("Test FederationClientInterceptor : SubmitReservation request
empty.");
+
+ // null request1
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(null));
+
+ // null request2
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(
+ ReservationSubmissionRequest.newInstance(null, null, null)));
+
+ // null request3
+ ReservationSubmissionRequest request3 =
+ ReservationSubmissionRequest.newInstance(null, "q1", null);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(request3));
+
+ // null request4
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
+ ReservationSubmissionRequest request4 =
+ ReservationSubmissionRequest.newInstance(null, null, reservationId);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(request4));
+
+ // null request5
+ long defaultDuration = 600000;
+ long arrival = Time.now();
+ long deadline = arrival + (int)(defaultDuration * 1.1);
+
+ ReservationRequest rRequest = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 1, 1, defaultDuration);
+ ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
+ ReservationDefinition rDefinition = createReservationDefinition(arrival,
deadline, rRequests,
+ ReservationRequestInterpreter.R_ALL, "u1");
+ ReservationSubmissionRequest request5 =
+ ReservationSubmissionRequest.newInstance(rDefinition, null,
reservationId);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(request5));
+ }
+
+ @Test
+ public void testSubmitReservationMultipleSubmission() throws Exception {
+ LOG.info("Test FederationClientInterceptor: Submit Reservation -
Multiple");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // First Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ SubClusterId subClusterId1 =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId1);
+ Assert.assertTrue(subClusters.contains(subClusterId1));
+
+ // First Retry
+ ReservationSubmissionResponse submissionResponse1 =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse1);
+ SubClusterId subClusterId2 =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId2);
+ Assert.assertEquals(subClusterId1, subClusterId2);
+ }
+
+ @Test
+ public void testUpdateReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : UpdateReservation request.");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ // Update Reservation
+ ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1);
+ ReservationUpdateRequest updateRequest =
+ ReservationUpdateRequest.newInstance(rDefinition2, reservationId);
+ ReservationUpdateResponse updateResponse =
+ interceptor.updateReservation(updateRequest);
+ Assert.assertNotNull(updateResponse);
+
+ SubClusterId subClusterId =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId);
+ }
+
+ @Test
+ public void testDeleteReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : DeleteReservation request.");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ // Delete Reservation
+ ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationId);
+ ReservationDeleteResponse deleteResponse =
interceptor.deleteReservation(deleteRequest);
+ Assert.assertNotNull(deleteResponse);
+
+ LambdaTestUtils.intercept(YarnException.class,
+ "Reservation " + reservationId + " does not exist",
+ () -> stateStoreUtil.queryReservationHomeSC(reservationId));
+ }
+
+
+ private ReservationDefinition createReservationDefinition(int memory, int
core) {
+ // get reservationId
+ long defaultDuration = 600000;
Review Comment:
constant final etc
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
NodeAttributeType.STRING, "nvida");
Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
}
+
+ @Test
+ public void testGetNewReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+ // null request
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getNewReservation request.", () ->
interceptor.getNewReservation(null));
+
+ // normal request
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ ReservationId reservationId = response.getReservationId();
+ Assert.assertNotNull(reservationId);
+ Assert.assertTrue(reservationId.toString().contains("reservation"));
+ Assert.assertEquals(reservationId.getClusterTimestamp(),
ResourceManager.getClusterTimeStamp());
+ }
+
+ @Test
+ public void testSubmitReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ SubClusterId subClusterId =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId);
+ Assert.assertTrue(subClusters.contains(subClusterId));
+ }
+
+ @Test
+ public void testSubmitReservationEmptyRequest() throws Exception {
+ LOG.info("Test FederationClientInterceptor : SubmitReservation request
empty.");
+
+ // null request1
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(null));
+
+ // null request2
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(
+ ReservationSubmissionRequest.newInstance(null, null, null)));
+
+ // null request3
+ ReservationSubmissionRequest request3 =
+ ReservationSubmissionRequest.newInstance(null, "q1", null);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(request3));
+
+ // null request4
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
+ ReservationSubmissionRequest request4 =
+ ReservationSubmissionRequest.newInstance(null, null, reservationId);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(request4));
+
+ // null request5
+ long defaultDuration = 600000;
+ long arrival = Time.now();
+ long deadline = arrival + (int)(defaultDuration * 1.1);
+
+ ReservationRequest rRequest = ReservationRequest.newInstance(
+ Resource.newInstance(1024, 1), 1, 1, defaultDuration);
+ ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
+ ReservationDefinition rDefinition = createReservationDefinition(arrival,
deadline, rRequests,
+ ReservationRequestInterpreter.R_ALL, "u1");
+ ReservationSubmissionRequest request5 =
+ ReservationSubmissionRequest.newInstance(rDefinition, null,
reservationId);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitReservation request or reservationId or reservation
definition or queue.",
+ () -> interceptor.submitReservation(request5));
+ }
+
+ @Test
+ public void testSubmitReservationMultipleSubmission() throws Exception {
+ LOG.info("Test FederationClientInterceptor: Submit Reservation -
Multiple");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // First Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ SubClusterId subClusterId1 =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId1);
+ Assert.assertTrue(subClusters.contains(subClusterId1));
+
+ // First Retry
+ ReservationSubmissionResponse submissionResponse1 =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse1);
+ SubClusterId subClusterId2 =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId2);
+ Assert.assertEquals(subClusterId1, subClusterId2);
+ }
+
+ @Test
+ public void testUpdateReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : UpdateReservation request.");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ // Update Reservation
+ ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1);
+ ReservationUpdateRequest updateRequest =
+ ReservationUpdateRequest.newInstance(rDefinition2, reservationId);
+ ReservationUpdateResponse updateResponse =
+ interceptor.updateReservation(updateRequest);
+ Assert.assertNotNull(updateResponse);
+
+ SubClusterId subClusterId =
stateStoreUtil.queryReservationHomeSC(reservationId);
+ Assert.assertNotNull(subClusterId);
+ }
+
+ @Test
+ public void testDeleteReservation() throws Exception {
+ LOG.info("Test FederationClientInterceptor : DeleteReservation request.");
+
+ // get new reservationId
+ GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+ GetNewReservationResponse response =
interceptor.getNewReservation(request);
+ Assert.assertNotNull(response);
+
+ // allow plan follower to synchronize, manually trigger an assignment
+ Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+ for (MockRM mockRM : mockRMs.values()) {
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan("root.decided", true);
+ }
+
+ // Submit Reservation
+ ReservationId reservationId = response.getReservationId();
+ ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+ ReservationSubmissionRequest rSubmissionRequest =
ReservationSubmissionRequest.newInstance(
+ rDefinition, "decided", reservationId);
+
+ ReservationSubmissionResponse submissionResponse =
+ interceptor.submitReservation(rSubmissionRequest);
+ Assert.assertNotNull(submissionResponse);
+
+ // Delete Reservation
+ ReservationDeleteRequest deleteRequest =
ReservationDeleteRequest.newInstance(reservationId);
+ ReservationDeleteResponse deleteResponse =
interceptor.deleteReservation(deleteRequest);
+ Assert.assertNotNull(deleteResponse);
+
+ LambdaTestUtils.intercept(YarnException.class,
+ "Reservation " + reservationId + " does not exist",
+ () -> stateStoreUtil.queryReservationHomeSC(reservationId));
+ }
+
+
+ private ReservationDefinition createReservationDefinition(int memory, int
core) {
+ // get reservationId
+ long defaultDuration = 600000;
Review Comment:
10 * 60 * 1000
> Support getNewReservation, submitReservation, updateReservation,
> deleteReservation API's for Federation
> -------------------------------------------------------------------------------------------------------
>
> Key: YARN-11177
> URL: https://issues.apache.org/jira/browse/YARN-11177
> Project: Hadoop YARN
> Issue Type: Sub-task
> Reporter: fanshilun
> Assignee: fanshilun
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.0
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]