[
https://issues.apache.org/jira/browse/YARN-11226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607872#comment-17607872
]
ASF GitHub Bot commented on YARN-11226:
---------------------------------------
goiri commented on code in PR #4892:
URL: https://github.com/apache/hadoop/pull/4892#discussion_r976766919
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java:
##########
@@ -830,64 +843,193 @@ public Response listReservation(String queue, String
reservationId, long startTi
" Please try again with a valid reservable queue.");
}
- MockRM mockRM = setupResourceManager();
+ ReservationId reservationID =
+ ReservationId.parseReservationId(reservationId);
- ReservationId reservationID =
ReservationId.parseReservationId(reservationId);
- ReservationSystem reservationSystem = mockRM.getReservationSystem();
- reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+ if (!reservationMap.containsKey(reservationID)) {
+ throw new NotFoundException("reservationId with id: " + reservationId +
" not found");
+ }
- // Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();
- // arrival time from which the resource(s) can be allocated.
- long arrival = Time.now();
-
- // deadline by when the resource(s) must be allocated.
- // The reason for choosing 1.05 is because this gives an integer
- // DURATION * 0.05 = 3000(ms)
- // deadline = arrival + 3000ms
- long deadline = (long) (arrival + 1.05 * DURATION);
-
- // In this test of reserved resources, we will apply for 4 containers (1
core, 1GB memory)
- // arrival = Time.now(), and make sure deadline - arrival > duration,
- // the current setting is greater than 3000ms
- ReservationSubmissionRequest submissionRequest =
- ReservationSystemTestUtil.createSimpleReservationRequest(
- reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
- clientService.submitReservation(submissionRequest);
-
// listReservations
ReservationListRequest request = ReservationListRequest.newInstance(
- queue, reservationID.toString(), startTime, endTime,
includeResourceAllocations);
+ queue, reservationId, startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo =
clientService.listReservations(request);
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
- if (mockRM != null) {
- mockRM.stop();
+ return Response.status(Status.OK).entity(resResponse).build();
+ }
+
+ @Override
+ public Response createNewReservation(HttpServletRequest hsr)
+ throws AuthorizationException, IOException, InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ ReservationId resId = ReservationId.newInstance(Time.now(),
resCounter.incrementAndGet());
+ LOG.info("Allocated new reservationId: {}.", resId);
+
+ NewReservation reservationId = new NewReservation(resId.toString());
+ return Response.status(Status.OK).entity(reservationId).build();
+ }
+
+ @Override
+ public Response submitReservation(ReservationSubmissionRequestInfo
resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
}
- return Response.status(Status.OK).entity(resResponse).build();
+ ReservationId reservationId =
ReservationId.parseReservationId(resContext.getReservationId());
+ ReservationDefinitionInfo definitionInfo =
resContext.getReservationDefinition();
+ ReservationDefinition definition =
+ RouterServerUtil.convertReservationDefinition(definitionInfo);
+ ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(
+ definition, resContext.getQueue(), reservationId);
+ submitReservation(request);
+
+ LOG.info("Reservation submitted: {}.", reservationId);
+
+ SubClusterId subClusterId = getSubClusterId();
+ reservationMap.put(reservationId, subClusterId);
+
+ return Response.status(Status.ACCEPTED).build();
+ }
+
+ private void submitReservation(ReservationSubmissionRequest request) {
+ try {
+
+ // synchronize plan
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+
+ // Generate reserved resources
+ ClientRMService clientService = mockRM.getClientRMService();
+ clientService.submitReservation(request);
+ } catch (IOException | YarnException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Response updateReservation(ReservationUpdateRequestInfo resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException {
+
+ if (resContext == null || resContext.getReservationId() == null ||
+ resContext.getReservationDefinition() == null) {
+ return Response.status(Status.BAD_REQUEST).build();
+ }
+
+ String resId = resContext.getReservationId();
+ ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+ if (!reservationMap.containsKey(reservationId)) {
+ throw new NotFoundException("reservationId with id: " + reservationId +
" not found");
+ }
+
+ // Generate reserved resources
+ updateReservation(resContext);
+
+ ReservationUpdateResponseInfo resRespInfo = new
ReservationUpdateResponseInfo();
+ return Response.status(Status.OK).entity(resRespInfo).build();
+ }
+
+ private void updateReservation(ReservationUpdateRequestInfo resContext) {
+ try {
+
+ if (resContext == null) {
+ throw new BadRequestException("Input ReservationSubmissionContext
should not be null");
+ }
+
+ ReservationDefinitionInfo resInfo =
resContext.getReservationDefinition();
+ if (resInfo == null) {
+ throw new BadRequestException("Input ReservationDefinition should not
be null");
+ }
+
+ ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
+ if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
+ || resReqsInfo.getReservationRequest().size() == 0) {
+ throw new BadRequestException("The ReservationDefinition should " +
+ "contain at least one ReservationRequest");
+ }
+
+ if (resContext.getReservationId() == null) {
+ throw new BadRequestException("Update operations must specify an
existing ReservaitonId");
+ }
+
+ ReservationRequestInterpreter[] values =
ReservationRequestInterpreter.values();
+ ReservationRequestInterpreter requestInterpreter =
+ values[resReqsInfo.getReservationRequestsInterpreter()];
+ List<ReservationRequest> list = new ArrayList<>();
+
+ for (ReservationRequestInfo resReqInfo :
resReqsInfo.getReservationRequest()) {
+ ResourceInfo rInfo = resReqInfo.getCapability();
+ Resource capability = Resource.newInstance(rInfo.getMemorySize(),
rInfo.getvCores());
+ int numContainers = resReqInfo.getNumContainers();
+ int minConcurrency = resReqInfo.getMinConcurrency();
+ long duration = resReqInfo.getDuration();
+ ReservationRequest rr =
+ ReservationRequest.newInstance(capability, numContainers,
minConcurrency, duration);
+ list.add(rr);
+ }
+
+ ReservationRequests reqs = ReservationRequests.newInstance(list,
requestInterpreter);
+ ReservationDefinition rDef = ReservationDefinition.newInstance(
+ resInfo.getArrival(), resInfo.getDeadline(), reqs,
+ resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
+ Priority.newInstance(resInfo.getPriority()));
+ ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
+ rDef,
ReservationId.parseReservationId(resContext.getReservationId()));
+
+ ClientRMService clientService = mockRM.getClientRMService();
+ clientService.updateReservation(request);
+
+ } catch (Exception ex) {
Review Comment:
This is a fairly broad catch, wouldn't it be better to just throw the
exceptions?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable
t, String errMsgFormat,
throw new RuntimeException(msg);
}
}
+
+ /**
+ * Save Reservation And HomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param reservationId reservationId
+ * @param homeSubCluster homeSubCluster
+ * @throws YarnException on failure
+ */
+ public static void addReservationHomeSubCluster(FederationStateStoreFacade
federationFacade,
+ ReservationId reservationId, ReservationHomeSubCluster homeSubCluster)
throws YarnException {
+ try {
+ // persist the mapping of reservationId and the subClusterId which has
+ // been selected as its home
+ federationFacade.addReservationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to insert the ReservationId %s into the
FederationStateStore.", reservationId);
+ }
+ }
+
+ /**
+ * Update Reservation And HomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param subClusterId subClusterId
+ * @param reservationId reservationId
+ * @param homeSubCluster homeSubCluster
+ * @throws YarnException on failure
+ */
+ public static void
updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+ SubClusterId subClusterId, ReservationId reservationId,
+ ReservationHomeSubCluster homeSubCluster) throws YarnException {
+ try {
+ // update the mapping of reservationId and the home subClusterId to
+ // the new subClusterId we have selected
+ federationFacade.updateReservationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ SubClusterId subClusterIdInStateStore =
+ federationFacade.getReservationHomeSubCluster(reservationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Reservation {} already submitted on SubCluster {}.",
reservationId, subClusterId);
+ } else {
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to update the ReservationId %s into the
FederationStateStore.", reservationId);
+ }
+ }
+ }
+
+ /**
+ * Exists ReservationHomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param reservationId reservationId
+ * @return true - exist, false - not exist
+ */
+ public static Boolean
existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
Review Comment:
can this be the native `boolean` type?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java:
##########
@@ -51,4 +70,48 @@ protected void registerBadSubCluster(SubClusterId badSC) {
interceptor.setRunning(false);
}
+ protected void setupResourceManager() throws IOException {
+ try {
+ if (mockRM != null) {
Review Comment:
You can move some of this before the try
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable
t, String errMsgFormat,
throw new RuntimeException(msg);
}
}
+
+ /**
+ * Save Reservation And HomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param reservationId reservationId
+ * @param homeSubCluster homeSubCluster
+ * @throws YarnException on failure
+ */
+ public static void addReservationHomeSubCluster(FederationStateStoreFacade
federationFacade,
+ ReservationId reservationId, ReservationHomeSubCluster homeSubCluster)
throws YarnException {
+ try {
+ // persist the mapping of reservationId and the subClusterId which has
+ // been selected as its home
+ federationFacade.addReservationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to insert the ReservationId %s into the
FederationStateStore.", reservationId);
+ }
+ }
+
+ /**
+ * Update Reservation And HomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param subClusterId subClusterId
+ * @param reservationId reservationId
+ * @param homeSubCluster homeSubCluster
+ * @throws YarnException on failure
+ */
+ public static void
updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+ SubClusterId subClusterId, ReservationId reservationId,
+ ReservationHomeSubCluster homeSubCluster) throws YarnException {
+ try {
+ // update the mapping of reservationId and the home subClusterId to
+ // the new subClusterId we have selected
+ federationFacade.updateReservationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ SubClusterId subClusterIdInStateStore =
+ federationFacade.getReservationHomeSubCluster(reservationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Reservation {} already submitted on SubCluster {}.",
reservationId, subClusterId);
+ } else {
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to update the ReservationId %s into the
FederationStateStore.", reservationId);
+ }
+ }
+ }
+
+ /**
+ * Exists ReservationHomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param reservationId reservationId
+ * @return true - exist, false - not exist
+ */
+ public static Boolean
existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+ ReservationId reservationId) {
+ try {
+ SubClusterId subClusterId =
federationFacade.getReservationHomeSubCluster(reservationId);
+ if (subClusterId != null) {
+ return true;
+ }
+ } catch (YarnException e) {
+ LOG.warn("get homeSubCluster by reservationId = {} error.",
reservationId, e);
+ }
+ return false;
+ }
+
+ public static ReservationDefinition convertReservationDefinition(
Review Comment:
Should we have a unit test for this?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java:
##########
@@ -830,64 +843,193 @@ public Response listReservation(String queue, String
reservationId, long startTi
" Please try again with a valid reservable queue.");
}
- MockRM mockRM = setupResourceManager();
+ ReservationId reservationID =
+ ReservationId.parseReservationId(reservationId);
- ReservationId reservationID =
ReservationId.parseReservationId(reservationId);
- ReservationSystem reservationSystem = mockRM.getReservationSystem();
- reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+ if (!reservationMap.containsKey(reservationID)) {
+ throw new NotFoundException("reservationId with id: " + reservationId +
" not found");
+ }
- // Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();
- // arrival time from which the resource(s) can be allocated.
- long arrival = Time.now();
-
- // deadline by when the resource(s) must be allocated.
- // The reason for choosing 1.05 is because this gives an integer
- // DURATION * 0.05 = 3000(ms)
- // deadline = arrival + 3000ms
- long deadline = (long) (arrival + 1.05 * DURATION);
-
- // In this test of reserved resources, we will apply for 4 containers (1
core, 1GB memory)
- // arrival = Time.now(), and make sure deadline - arrival > duration,
- // the current setting is greater than 3000ms
- ReservationSubmissionRequest submissionRequest =
- ReservationSystemTestUtil.createSimpleReservationRequest(
- reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
- clientService.submitReservation(submissionRequest);
-
// listReservations
ReservationListRequest request = ReservationListRequest.newInstance(
- queue, reservationID.toString(), startTime, endTime,
includeResourceAllocations);
+ queue, reservationId, startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo =
clientService.listReservations(request);
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
- if (mockRM != null) {
- mockRM.stop();
+ return Response.status(Status.OK).entity(resResponse).build();
+ }
+
+ @Override
+ public Response createNewReservation(HttpServletRequest hsr)
+ throws AuthorizationException, IOException, InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ ReservationId resId = ReservationId.newInstance(Time.now(),
resCounter.incrementAndGet());
+ LOG.info("Allocated new reservationId: {}.", resId);
+
+ NewReservation reservationId = new NewReservation(resId.toString());
+ return Response.status(Status.OK).entity(reservationId).build();
+ }
+
+ @Override
+ public Response submitReservation(ReservationSubmissionRequestInfo
resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
}
- return Response.status(Status.OK).entity(resResponse).build();
+ ReservationId reservationId =
ReservationId.parseReservationId(resContext.getReservationId());
+ ReservationDefinitionInfo definitionInfo =
resContext.getReservationDefinition();
+ ReservationDefinition definition =
+ RouterServerUtil.convertReservationDefinition(definitionInfo);
+ ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(
+ definition, resContext.getQueue(), reservationId);
+ submitReservation(request);
+
+ LOG.info("Reservation submitted: {}.", reservationId);
+
+ SubClusterId subClusterId = getSubClusterId();
+ reservationMap.put(reservationId, subClusterId);
+
+ return Response.status(Status.ACCEPTED).build();
+ }
+
+ private void submitReservation(ReservationSubmissionRequest request) {
+ try {
+
+ // synchronize plan
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+
+ // Generate reserved resources
+ ClientRMService clientService = mockRM.getClientRMService();
+ clientService.submitReservation(request);
+ } catch (IOException | YarnException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Response updateReservation(ReservationUpdateRequestInfo resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException {
+
+ if (resContext == null || resContext.getReservationId() == null ||
+ resContext.getReservationDefinition() == null) {
+ return Response.status(Status.BAD_REQUEST).build();
+ }
+
+ String resId = resContext.getReservationId();
+ ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+ if (!reservationMap.containsKey(reservationId)) {
+ throw new NotFoundException("reservationId with id: " + reservationId +
" not found");
+ }
+
+ // Generate reserved resources
+ updateReservation(resContext);
+
+ ReservationUpdateResponseInfo resRespInfo = new
ReservationUpdateResponseInfo();
+ return Response.status(Status.OK).entity(resRespInfo).build();
+ }
+
+ private void updateReservation(ReservationUpdateRequestInfo resContext) {
+ try {
+
+ if (resContext == null) {
+ throw new BadRequestException("Input ReservationSubmissionContext
should not be null");
+ }
+
+ ReservationDefinitionInfo resInfo =
resContext.getReservationDefinition();
+ if (resInfo == null) {
+ throw new BadRequestException("Input ReservationDefinition should not
be null");
+ }
+
+ ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
+ if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
+ || resReqsInfo.getReservationRequest().size() == 0) {
Review Comment:
isEmpty()
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -1127,4 +1141,205 @@ public void testListReservation() throws Exception {
Assert.assertEquals(1, vCore);
Assert.assertEquals(1024, memory);
}
+
+ @Test
+ public void testCreateNewReservation() throws Exception {
+ Response response = interceptor.createNewReservation(null);
+ Assert.assertNotNull(response);
+
+ Object entity = response.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertTrue(entity instanceof NewReservation);
+
+ NewReservation newReservation = (NewReservation) entity;
+ Assert.assertNotNull(newReservation);
+
Assert.assertTrue(newReservation.getReservationId().contains("reservation"));
+ }
+
+ @Test
+ public void testSubmitReservation() throws Exception {
+
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 2);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ String applyReservationId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ Object entity = reservationResponse.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+ ReservationListInfo listInfo = (ReservationListInfo) entity;
+ Assert.assertNotNull(listInfo);
+
+ List<ReservationInfo> reservationInfos = listInfo.getReservations();
+ Assert.assertNotNull(reservationInfos);
+ Assert.assertEquals(1, reservationInfos.size());
+
+ ReservationInfo reservationInfo = reservationInfos.get(0);
+ Assert.assertNotNull(reservationInfo);
+ Assert.assertEquals(reservationInfo.getReservationId(),
applyReservationId);
+ }
+
+ @Test
+ public void testUpdateReservation() throws Exception {
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 3);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ // update reservation
+ ReservationSubmissionRequest resSubRequest =
+ getReservationSubmissionRequest(reservationId, 6, 2048, 2);
+ ReservationDefinition reservationDefinition =
resSubRequest.getReservationDefinition();
+ ReservationDefinitionInfo reservationDefinitionInfo =
+ new ReservationDefinitionInfo(reservationDefinition);
+
+ ReservationUpdateRequestInfo updateRequestInfo = new
ReservationUpdateRequestInfo();
+ updateRequestInfo.setReservationId(reservationId.toString());
+ updateRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+ Response updateReservationResp =
interceptor.updateReservation(updateRequestInfo, null);
+ Assert.assertNotNull(updateReservationResp);
+ Assert.assertEquals(Status.OK.getStatusCode(),
updateReservationResp.getStatus());
+
+ String applyReservationId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ Object entity = reservationResponse.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+ ReservationListInfo listInfo = (ReservationListInfo) entity;
+ Assert.assertNotNull(listInfo);
+
+ List<ReservationInfo> reservationInfos = listInfo.getReservations();
+ Assert.assertNotNull(reservationInfos);
+ Assert.assertEquals(1, reservationInfos.size());
+
+ ReservationInfo reservationInfo = reservationInfos.get(0);
+ Assert.assertNotNull(reservationInfo);
+ Assert.assertEquals(reservationInfo.getReservationId(),
applyReservationId);
+
+ ReservationDefinitionInfo resDefinitionInfo =
reservationInfo.getReservationDefinition();
+ Assert.assertNotNull(resDefinitionInfo);
+
+ ReservationRequestsInfo reservationRequestsInfo =
resDefinitionInfo.getReservationRequests();
+ Assert.assertNotNull(reservationRequestsInfo);
+
+ ArrayList<ReservationRequestInfo> reservationRequestInfoList =
+ reservationRequestsInfo.getReservationRequest();
+ Assert.assertNotNull(reservationRequestInfoList);
+ Assert.assertEquals(1, reservationRequestInfoList.size());
+
+ ReservationRequestInfo reservationRequestInfo =
reservationRequestInfoList.get(0);
+ Assert.assertNotNull(reservationRequestInfo);
+ Assert.assertEquals(6, reservationRequestInfo.getNumContainers());
+
+ ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
+ Assert.assertNotNull(resourceInfo);
+
+ int vCore = resourceInfo.getvCores();
+ long memory = resourceInfo.getMemorySize();
+ Assert.assertEquals(2, vCore);
+ Assert.assertEquals(2048, memory);
+ }
+
+ @Test
+ public void testDeleteReservation() throws Exception {
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 4);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ String applyResId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ ReservationDeleteRequestInfo deleteRequestInfo =
+ new ReservationDeleteRequestInfo();
+ deleteRequestInfo.setReservationId(applyResId);
+ Response delResponse = interceptor.deleteReservation(deleteRequestInfo,
null);
+ Assert.assertNotNull(delResponse);
+
+ LambdaTestUtils.intercept(Exception.class,
+ "reservationId with id: " + reservationId + " not found",
+ () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId,
-1, -1, false, null));
+ }
+
+ private Response submitReservation(ReservationId reservationId)
+ throws IOException, InterruptedException, YarnException {
+
+ SubClusterId homeSubClusterId = subClusters.get(0);
+ ReservationHomeSubCluster reservationHomeSubCluster =
+ ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId);
+ AddReservationHomeSubClusterRequest request =
+
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
+ stateStore.addReservationHomeSubCluster(request);
+
+ ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+ getReservationSubmissionRequestInfo(reservationId);
+ Response response =
interceptor.submitReservation(resSubmissionRequestInfo, null);
+ return response;
+ }
+
+ private ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo(
+ ReservationId reservationId) {
+
+ ReservationSubmissionRequest resSubRequest =
+ getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024,
1);
+ ReservationDefinition reservationDefinition =
resSubRequest.getReservationDefinition();
+
+ ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+ new ReservationSubmissionRequestInfo();
+ resSubmissionRequestInfo.setQueue(resSubRequest.getQueue());
+ resSubmissionRequestInfo.setReservationId(reservationId.toString());
+ ReservationDefinitionInfo reservationDefinitionInfo =
+ new ReservationDefinitionInfo(reservationDefinition);
+
resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+
+ return resSubmissionRequestInfo;
+ }
+
+ private ReservationSubmissionRequest getReservationSubmissionRequest(
+ ReservationId reservationId, int numContainers, int memory, int vcore) {
+
+ // arrival time from which the resource(s) can be allocated.
+ long arrival = Time.now();
+
+ // deadline by when the resource(s) must be allocated.
+ // The reason for choosing 1.05 is because this gives an integer
+ // DURATION * 0.05 = 3000(ms)
+ // deadline = arrival + 3000ms
+ long deadline = (long) (arrival + 1.05 * DURATION);
+
+ ReservationSubmissionRequest submissionRequest =
createSimpleReservationRequest(
+ reservationId, numContainers, arrival, deadline, DURATION, memory,
vcore);
+
+ return submissionRequest;
+ }
+
+ public static ReservationSubmissionRequest createSimpleReservationRequest(
+ ReservationId reservationId, int numContainers, long arrival,
+ long deadline, long duration, int memory, int vcore) {
+ // create a request with a single atomic ask
+ ReservationRequest r = ReservationRequest
Review Comment:
Small nit, this would look better as:
```
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(memory, vcore), numContainers, 1, duration);
```
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -1127,4 +1141,205 @@ public void testListReservation() throws Exception {
Assert.assertEquals(1, vCore);
Assert.assertEquals(1024, memory);
}
+
+ @Test
+ public void testCreateNewReservation() throws Exception {
+ Response response = interceptor.createNewReservation(null);
+ Assert.assertNotNull(response);
+
+ Object entity = response.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertTrue(entity instanceof NewReservation);
+
+ NewReservation newReservation = (NewReservation) entity;
+ Assert.assertNotNull(newReservation);
+
Assert.assertTrue(newReservation.getReservationId().contains("reservation"));
+ }
+
+ @Test
+ public void testSubmitReservation() throws Exception {
+
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 2);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ String applyReservationId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ Object entity = reservationResponse.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+ ReservationListInfo listInfo = (ReservationListInfo) entity;
+ Assert.assertNotNull(listInfo);
+
+ List<ReservationInfo> reservationInfos = listInfo.getReservations();
+ Assert.assertNotNull(reservationInfos);
+ Assert.assertEquals(1, reservationInfos.size());
+
+ ReservationInfo reservationInfo = reservationInfos.get(0);
+ Assert.assertNotNull(reservationInfo);
+ Assert.assertEquals(reservationInfo.getReservationId(),
applyReservationId);
+ }
+
+ @Test
+ public void testUpdateReservation() throws Exception {
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 3);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ // update reservation
+ ReservationSubmissionRequest resSubRequest =
+ getReservationSubmissionRequest(reservationId, 6, 2048, 2);
+ ReservationDefinition reservationDefinition =
resSubRequest.getReservationDefinition();
+ ReservationDefinitionInfo reservationDefinitionInfo =
+ new ReservationDefinitionInfo(reservationDefinition);
+
+ ReservationUpdateRequestInfo updateRequestInfo = new
ReservationUpdateRequestInfo();
+ updateRequestInfo.setReservationId(reservationId.toString());
+ updateRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+ Response updateReservationResp =
interceptor.updateReservation(updateRequestInfo, null);
+ Assert.assertNotNull(updateReservationResp);
+ Assert.assertEquals(Status.OK.getStatusCode(),
updateReservationResp.getStatus());
+
+ String applyReservationId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ Object entity = reservationResponse.getEntity();
+ Assert.assertNotNull(entity);
+ Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+ ReservationListInfo listInfo = (ReservationListInfo) entity;
+ Assert.assertNotNull(listInfo);
+
+ List<ReservationInfo> reservationInfos = listInfo.getReservations();
+ Assert.assertNotNull(reservationInfos);
+ Assert.assertEquals(1, reservationInfos.size());
+
+ ReservationInfo reservationInfo = reservationInfos.get(0);
+ Assert.assertNotNull(reservationInfo);
+ Assert.assertEquals(reservationInfo.getReservationId(),
applyReservationId);
+
+ ReservationDefinitionInfo resDefinitionInfo =
reservationInfo.getReservationDefinition();
+ Assert.assertNotNull(resDefinitionInfo);
+
+ ReservationRequestsInfo reservationRequestsInfo =
resDefinitionInfo.getReservationRequests();
+ Assert.assertNotNull(reservationRequestsInfo);
+
+ ArrayList<ReservationRequestInfo> reservationRequestInfoList =
+ reservationRequestsInfo.getReservationRequest();
+ Assert.assertNotNull(reservationRequestInfoList);
+ Assert.assertEquals(1, reservationRequestInfoList.size());
+
+ ReservationRequestInfo reservationRequestInfo =
reservationRequestInfoList.get(0);
+ Assert.assertNotNull(reservationRequestInfo);
+ Assert.assertEquals(6, reservationRequestInfo.getNumContainers());
+
+ ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
+ Assert.assertNotNull(resourceInfo);
+
+ int vCore = resourceInfo.getvCores();
+ long memory = resourceInfo.getMemorySize();
+ Assert.assertEquals(2, vCore);
+ Assert.assertEquals(2048, memory);
+ }
+
+ @Test
+ public void testDeleteReservation() throws Exception {
+ // submit reservation
+ ReservationId reservationId = ReservationId.newInstance(Time.now(), 4);
+ Response response = submitReservation(reservationId);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+ String applyResId = reservationId.toString();
+ Response reservationResponse = interceptor.listReservation(
+ QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
+ Assert.assertNotNull(reservationResponse);
+
+ ReservationDeleteRequestInfo deleteRequestInfo =
+ new ReservationDeleteRequestInfo();
+ deleteRequestInfo.setReservationId(applyResId);
+ Response delResponse = interceptor.deleteReservation(deleteRequestInfo,
null);
+ Assert.assertNotNull(delResponse);
+
+ LambdaTestUtils.intercept(Exception.class,
+ "reservationId with id: " + reservationId + " not found",
+ () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId,
-1, -1, false, null));
+ }
+
+ private Response submitReservation(ReservationId reservationId)
+ throws IOException, InterruptedException, YarnException {
+
+ SubClusterId homeSubClusterId = subClusters.get(0);
+ ReservationHomeSubCluster reservationHomeSubCluster =
+ ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId);
+ AddReservationHomeSubClusterRequest request =
+
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
+ stateStore.addReservationHomeSubCluster(request);
+
+ ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+ getReservationSubmissionRequestInfo(reservationId);
+ Response response =
interceptor.submitReservation(resSubmissionRequestInfo, null);
+ return response;
+ }
+
+ private ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo(
+ ReservationId reservationId) {
+
+ ReservationSubmissionRequest resSubRequest =
+ getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024,
1);
+ ReservationDefinition reservationDefinition =
resSubRequest.getReservationDefinition();
+
+ ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+ new ReservationSubmissionRequestInfo();
+ resSubmissionRequestInfo.setQueue(resSubRequest.getQueue());
+ resSubmissionRequestInfo.setReservationId(reservationId.toString());
+ ReservationDefinitionInfo reservationDefinitionInfo =
+ new ReservationDefinitionInfo(reservationDefinition);
+
resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+
+ return resSubmissionRequestInfo;
+ }
+
+ private ReservationSubmissionRequest getReservationSubmissionRequest(
+ ReservationId reservationId, int numContainers, int memory, int vcore) {
+
+ // arrival time from which the resource(s) can be allocated.
+ long arrival = Time.now();
+
+ // deadline by when the resource(s) must be allocated.
+ // The reason for choosing 1.05 is because this gives an integer
+ // DURATION * 0.05 = 3000(ms)
+ // deadline = arrival + 3000ms
+ long deadline = (long) (arrival + 1.05 * DURATION);
+
+ ReservationSubmissionRequest submissionRequest =
createSimpleReservationRequest(
+ reservationId, numContainers, arrival, deadline, DURATION, memory,
vcore);
+
+ return submissionRequest;
+ }
+
+ public static ReservationSubmissionRequest createSimpleReservationRequest(
+ ReservationId reservationId, int numContainers, long arrival,
+ long deadline, long duration, int memory, int vcore) {
+ // create a request with a single atomic ask
+ ReservationRequest r = ReservationRequest
+ .newInstance(Resource.newInstance(memory, vcore), numContainers, 1,
duration);
+ ReservationRequests reqs = ReservationRequests.newInstance(
+ Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
+ deadline, reqs, "testClientRMService#reservation", "0",
Priority.UNDEFINED);
+ ReservationSubmissionRequest request = ReservationSubmissionRequest
Review Comment:
Same here for the break line
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable
t, String errMsgFormat,
throw new RuntimeException(msg);
}
}
+
+ /**
+ * Save Reservation And HomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param reservationId reservationId
+ * @param homeSubCluster homeSubCluster
+ * @throws YarnException on failure
+ */
+ public static void addReservationHomeSubCluster(FederationStateStoreFacade
federationFacade,
+ ReservationId reservationId, ReservationHomeSubCluster homeSubCluster)
throws YarnException {
+ try {
+ // persist the mapping of reservationId and the subClusterId which has
+ // been selected as its home
+ federationFacade.addReservationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to insert the ReservationId %s into the
FederationStateStore.", reservationId);
+ }
+ }
+
+ /**
+ * Update Reservation And HomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param subClusterId subClusterId
+ * @param reservationId reservationId
+ * @param homeSubCluster homeSubCluster
+ * @throws YarnException on failure
+ */
+ public static void
updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+ SubClusterId subClusterId, ReservationId reservationId,
+ ReservationHomeSubCluster homeSubCluster) throws YarnException {
+ try {
+ // update the mapping of reservationId and the home subClusterId to
+ // the new subClusterId we have selected
+ federationFacade.updateReservationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ SubClusterId subClusterIdInStateStore =
+ federationFacade.getReservationHomeSubCluster(reservationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Reservation {} already submitted on SubCluster {}.",
reservationId, subClusterId);
+ } else {
+ RouterServerUtil.logAndThrowException(e,
+ "Unable to update the ReservationId %s into the
FederationStateStore.", reservationId);
+ }
+ }
+ }
+
+ /**
+ * Exists ReservationHomeSubCluster Mapping.
+ *
+ * @param federationFacade federation facade
+ * @param reservationId reservationId
+ * @return true - exist, false - not exist
+ */
+ public static Boolean
existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+ ReservationId reservationId) {
+ try {
+ SubClusterId subClusterId =
federationFacade.getReservationHomeSubCluster(reservationId);
+ if (subClusterId != null) {
+ return true;
+ }
+ } catch (YarnException e) {
+ LOG.warn("get homeSubCluster by reservationId = {} error.",
reservationId, e);
+ }
+ return false;
+ }
+
+ public static ReservationDefinition convertReservationDefinition(
+ ReservationDefinitionInfo definitionInfo) {
+
+ // basic variable
+ long arrival = definitionInfo.getArrival();
+ long deadline = definitionInfo.getDeadline();
+
+ // ReservationRequests reservationRequests
+ String name = definitionInfo.getReservationName();
+ String recurrenceExpression = definitionInfo.getRecurrenceExpression();
+ Priority priority = Priority.newInstance(definitionInfo.getPriority());
+
+ // reservation requests info
+ List<ReservationRequest> reservationRequestList = new ArrayList<>();
+
+ ReservationRequestsInfo reservationRequestsInfo =
definitionInfo.getReservationRequests();
+
+ List<ReservationRequestInfo> reservationRequestInfos =
+ reservationRequestsInfo.getReservationRequest();
+
+ for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
+ ResourceInfo resourceInfo = resRequestInfo.getCapability();
+ Resource capability =
+ Resource.newInstance(resourceInfo.getMemorySize(),
resourceInfo.getvCores());
+ ReservationRequest reservationRequest =
ReservationRequest.newInstance(capability,
+ resRequestInfo.getNumContainers(),
resRequestInfo.getMinConcurrency(),
+ resRequestInfo.getDuration());
+ reservationRequestList.add(reservationRequest);
+ }
+
+ ReservationRequestInterpreter[] values =
ReservationRequestInterpreter.values();
+ ReservationRequestInterpreter reservationRequestInterpreter =
+ values[reservationRequestsInfo.getReservationRequestsInterpreter()];
+ ReservationRequests reservationRequests =
+ ReservationRequests.newInstance(reservationRequestList,
reservationRequestInterpreter);
+
+ ReservationDefinition definition =
Review Comment:
First two lines into one line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java:
##########
@@ -51,4 +70,48 @@ protected void registerBadSubCluster(SubClusterId badSC) {
interceptor.setRunning(false);
}
+ protected void setupResourceManager() throws IOException {
+ try {
+ if (mockRM != null) {
+ return;
+ }
+
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ CapacitySchedulerConfiguration conf = new
CapacitySchedulerConfiguration();
+
+ // Define default queue
+ conf.setCapacity(QUEUE_DEFAULT_FULL, 20);
+ // Define dedicated queues
+ String[] queues = new String[]{QUEUE_DEFAULT, QUEUE_DEDICATED};
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, queues);
+ conf.setCapacity(QUEUE_DEDICATED_FULL, 80);
+ conf.setReservable(QUEUE_DEDICATED_FULL, true);
+
+ conf.setClass(YarnConfiguration.RM_SCHEDULER,
+ CapacityScheduler.class, ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
false);
+
+ mockRM = new MockRM(conf);
+ mockRM.start();
+ mockRM.registerNode("127.0.0.1:5678", 100*1024, 100);
+
+ Map<SubClusterId, DefaultRequestInterceptorREST> interceptors =
super.getInterceptors();
+ for (DefaultRequestInterceptorREST item : interceptors.values()) {
+ MockDefaultRequestInterceptorREST interceptor =
(MockDefaultRequestInterceptorREST) item;
+ interceptor.setMockRM(mockRM);
+ }
+ } catch (Exception e) {
+ LOG.error("setupResourceManager failed.", e);
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (mockRM != null) {
+ mockRM.stop();
Review Comment:
`mockRM = null;`
> [Federation] Add createNewReservation, submitReservation, updateReservation,
> deleteReservation REST APIs for Router
> -------------------------------------------------------------------------------------------------------------------
>
> Key: YARN-11226
> URL: https://issues.apache.org/jira/browse/YARN-11226
> Project: Hadoop YARN
> Issue Type: Sub-task
> Components: federation
> Affects Versions: 3.4.0, 3.3.4
> Reporter: fanshilun
> Assignee: fanshilun
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]