[ 
https://issues.apache.org/jira/browse/YARN-11226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607872#comment-17607872
 ] 

ASF GitHub Bot commented on YARN-11226:
---------------------------------------

goiri commented on code in PR #4892:
URL: https://github.com/apache/hadoop/pull/4892#discussion_r976766919


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java:
##########
@@ -830,64 +843,193 @@ public Response listReservation(String queue, String 
reservationId, long startTi
           " Please try again with a valid reservable queue.");
     }
 
-    MockRM mockRM = setupResourceManager();
+    ReservationId reservationID =
+        ReservationId.parseReservationId(reservationId);
 
-    ReservationId reservationID = 
ReservationId.parseReservationId(reservationId);
-    ReservationSystem reservationSystem = mockRM.getReservationSystem();
-    reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+    if (!reservationMap.containsKey(reservationID)) {
+      throw new NotFoundException("reservationId with id: " + reservationId + 
" not found");
+    }
 
-    // Generate reserved resources
     ClientRMService clientService = mockRM.getClientRMService();
 
-    // arrival time from which the resource(s) can be allocated.
-    long arrival = Time.now();
-
-    // deadline by when the resource(s) must be allocated.
-    // The reason for choosing 1.05 is because this gives an integer
-    // DURATION * 0.05 = 3000(ms)
-    // deadline = arrival + 3000ms
-    long deadline = (long) (arrival + 1.05 * DURATION);
-
-    // In this test of reserved resources, we will apply for 4 containers (1 
core, 1GB memory)
-    // arrival = Time.now(), and make sure deadline - arrival > duration,
-    // the current setting is greater than 3000ms
-    ReservationSubmissionRequest submissionRequest =
-        ReservationSystemTestUtil.createSimpleReservationRequest(
-            reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
-    clientService.submitReservation(submissionRequest);
-
     // listReservations
     ReservationListRequest request = ReservationListRequest.newInstance(
-        queue, reservationID.toString(), startTime, endTime, 
includeResourceAllocations);
+        queue, reservationId, startTime, endTime, includeResourceAllocations);
     ReservationListResponse resRespInfo = 
clientService.listReservations(request);
     ReservationListInfo resResponse =
         new ReservationListInfo(resRespInfo, includeResourceAllocations);
 
-    if (mockRM != null) {
-      mockRM.stop();
+    return Response.status(Status.OK).entity(resResponse).build();
+  }
+
+  @Override
+  public Response createNewReservation(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ReservationId resId = ReservationId.newInstance(Time.now(), 
resCounter.incrementAndGet());
+    LOG.info("Allocated new reservationId: {}.", resId);
+
+    NewReservation reservationId = new NewReservation(resId.toString());
+    return Response.status(Status.OK).entity(reservationId).build();
+  }
+
+  @Override
+  public Response submitReservation(ReservationSubmissionRequestInfo 
resContext,
+      HttpServletRequest hsr) throws AuthorizationException, IOException, 
InterruptedException {
+
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
     }
 
-    return Response.status(Status.OK).entity(resResponse).build();
+    ReservationId reservationId = 
ReservationId.parseReservationId(resContext.getReservationId());
+    ReservationDefinitionInfo definitionInfo = 
resContext.getReservationDefinition();
+    ReservationDefinition definition =
+        RouterServerUtil.convertReservationDefinition(definitionInfo);
+    ReservationSubmissionRequest request = 
ReservationSubmissionRequest.newInstance(
+        definition, resContext.getQueue(), reservationId);
+    submitReservation(request);
+
+    LOG.info("Reservation submitted: {}.", reservationId);
+
+    SubClusterId subClusterId = getSubClusterId();
+    reservationMap.put(reservationId, subClusterId);
+
+    return Response.status(Status.ACCEPTED).build();
+  }
+
+  private void submitReservation(ReservationSubmissionRequest request) {
+    try {
+
+      // synchronize plan
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+
+      // Generate reserved resources
+      ClientRMService clientService = mockRM.getClientRMService();
+      clientService.submitReservation(request);
+    } catch (IOException | YarnException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Response updateReservation(ReservationUpdateRequestInfo resContext,
+      HttpServletRequest hsr) throws AuthorizationException, IOException, 
InterruptedException {
+
+    if (resContext == null || resContext.getReservationId() == null ||
+        resContext.getReservationDefinition() == null) {
+      return Response.status(Status.BAD_REQUEST).build();
+    }
+
+    String resId = resContext.getReservationId();
+    ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+    if (!reservationMap.containsKey(reservationId)) {
+      throw new NotFoundException("reservationId with id: " + reservationId + 
" not found");
+    }
+
+    // Generate reserved resources
+    updateReservation(resContext);
+
+    ReservationUpdateResponseInfo resRespInfo = new 
ReservationUpdateResponseInfo();
+    return Response.status(Status.OK).entity(resRespInfo).build();
+  }
+
+  private void updateReservation(ReservationUpdateRequestInfo resContext) {
+    try {
+
+      if (resContext == null) {
+        throw new BadRequestException("Input ReservationSubmissionContext 
should not be null");
+      }
+
+      ReservationDefinitionInfo resInfo = 
resContext.getReservationDefinition();
+      if (resInfo == null) {
+        throw new BadRequestException("Input ReservationDefinition should not 
be null");
+      }
+
+      ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
+      if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
+          || resReqsInfo.getReservationRequest().size() == 0) {
+        throw new BadRequestException("The ReservationDefinition should " +
+            "contain at least one ReservationRequest");
+      }
+
+      if (resContext.getReservationId() == null) {
+        throw new BadRequestException("Update operations must specify an 
existing ReservaitonId");
+      }
+
+      ReservationRequestInterpreter[] values = 
ReservationRequestInterpreter.values();
+      ReservationRequestInterpreter requestInterpreter =
+          values[resReqsInfo.getReservationRequestsInterpreter()];
+      List<ReservationRequest> list = new ArrayList<>();
+
+      for (ReservationRequestInfo resReqInfo : 
resReqsInfo.getReservationRequest()) {
+        ResourceInfo rInfo = resReqInfo.getCapability();
+        Resource capability = Resource.newInstance(rInfo.getMemorySize(), 
rInfo.getvCores());
+        int numContainers = resReqInfo.getNumContainers();
+        int minConcurrency = resReqInfo.getMinConcurrency();
+        long duration = resReqInfo.getDuration();
+        ReservationRequest rr =
+            ReservationRequest.newInstance(capability, numContainers, 
minConcurrency, duration);
+        list.add(rr);
+      }
+
+      ReservationRequests reqs = ReservationRequests.newInstance(list, 
requestInterpreter);
+      ReservationDefinition rDef = ReservationDefinition.newInstance(
+          resInfo.getArrival(), resInfo.getDeadline(), reqs,
+          resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
+          Priority.newInstance(resInfo.getPriority()));
+      ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
+          rDef, 
ReservationId.parseReservationId(resContext.getReservationId()));
+
+      ClientRMService clientService = mockRM.getClientRMService();
+      clientService.updateReservation(request);
+
+    } catch (Exception ex) {

Review Comment:
   This is a fairly broad catch, wouldn't it be better to just throw the 
exceptions?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable 
t, String errMsgFormat,
       throw new RuntimeException(msg);
     }
   }
+
+  /**
+   * Save Reservation And HomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public static void addReservationHomeSubCluster(FederationStateStoreFacade 
federationFacade,
+      ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) 
throws YarnException {
+    try {
+      // persist the mapping of reservationId and the subClusterId which has
+      // been selected as its home
+      federationFacade.addReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to insert the ReservationId %s into the 
FederationStateStore.", reservationId);
+    }
+  }
+
+  /**
+   * Update Reservation And HomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param subClusterId subClusterId
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public static void 
updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+      SubClusterId subClusterId, ReservationId reservationId,
+      ReservationHomeSubCluster homeSubCluster) throws YarnException {
+    try {
+      // update the mapping of reservationId and the home subClusterId to
+      // the new subClusterId we have selected
+      federationFacade.updateReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      SubClusterId subClusterIdInStateStore =
+          federationFacade.getReservationHomeSubCluster(reservationId);
+      if (subClusterId == subClusterIdInStateStore) {
+        LOG.info("Reservation {} already submitted on SubCluster {}.", 
reservationId, subClusterId);
+      } else {
+        RouterServerUtil.logAndThrowException(e,
+            "Unable to update the ReservationId %s into the 
FederationStateStore.", reservationId);
+      }
+    }
+  }
+
+  /**
+   * Exists ReservationHomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param reservationId reservationId
+   * @return true - exist, false - not exist
+   */
+  public static Boolean 
existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,

Review Comment:
   can this be the native `boolean` type?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java:
##########
@@ -51,4 +70,48 @@ protected void registerBadSubCluster(SubClusterId badSC) {
     interceptor.setRunning(false);
   }
 
+  protected void setupResourceManager() throws IOException {
+    try {
+      if (mockRM != null) {

Review Comment:
   You can move some of this before the try



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable 
t, String errMsgFormat,
       throw new RuntimeException(msg);
     }
   }
+
+  /**
+   * Save Reservation And HomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public static void addReservationHomeSubCluster(FederationStateStoreFacade 
federationFacade,
+      ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) 
throws YarnException {
+    try {
+      // persist the mapping of reservationId and the subClusterId which has
+      // been selected as its home
+      federationFacade.addReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to insert the ReservationId %s into the 
FederationStateStore.", reservationId);
+    }
+  }
+
+  /**
+   * Update Reservation And HomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param subClusterId subClusterId
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public static void 
updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+      SubClusterId subClusterId, ReservationId reservationId,
+      ReservationHomeSubCluster homeSubCluster) throws YarnException {
+    try {
+      // update the mapping of reservationId and the home subClusterId to
+      // the new subClusterId we have selected
+      federationFacade.updateReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      SubClusterId subClusterIdInStateStore =
+          federationFacade.getReservationHomeSubCluster(reservationId);
+      if (subClusterId == subClusterIdInStateStore) {
+        LOG.info("Reservation {} already submitted on SubCluster {}.", 
reservationId, subClusterId);
+      } else {
+        RouterServerUtil.logAndThrowException(e,
+            "Unable to update the ReservationId %s into the 
FederationStateStore.", reservationId);
+      }
+    }
+  }
+
+  /**
+   * Exists ReservationHomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param reservationId reservationId
+   * @return true - exist, false - not exist
+   */
+  public static Boolean 
existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+      ReservationId reservationId) {
+    try {
+      SubClusterId subClusterId = 
federationFacade.getReservationHomeSubCluster(reservationId);
+      if (subClusterId != null) {
+        return true;
+      }
+    } catch (YarnException e) {
+      LOG.warn("get homeSubCluster by reservationId = {} error.", 
reservationId, e);
+    }
+    return false;
+  }
+
+  public static ReservationDefinition convertReservationDefinition(

Review Comment:
   Should we have a unit test for this?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java:
##########
@@ -830,64 +843,193 @@ public Response listReservation(String queue, String 
reservationId, long startTi
           " Please try again with a valid reservable queue.");
     }
 
-    MockRM mockRM = setupResourceManager();
+    ReservationId reservationID =
+        ReservationId.parseReservationId(reservationId);
 
-    ReservationId reservationID = 
ReservationId.parseReservationId(reservationId);
-    ReservationSystem reservationSystem = mockRM.getReservationSystem();
-    reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+    if (!reservationMap.containsKey(reservationID)) {
+      throw new NotFoundException("reservationId with id: " + reservationId + 
" not found");
+    }
 
-    // Generate reserved resources
     ClientRMService clientService = mockRM.getClientRMService();
 
-    // arrival time from which the resource(s) can be allocated.
-    long arrival = Time.now();
-
-    // deadline by when the resource(s) must be allocated.
-    // The reason for choosing 1.05 is because this gives an integer
-    // DURATION * 0.05 = 3000(ms)
-    // deadline = arrival + 3000ms
-    long deadline = (long) (arrival + 1.05 * DURATION);
-
-    // In this test of reserved resources, we will apply for 4 containers (1 
core, 1GB memory)
-    // arrival = Time.now(), and make sure deadline - arrival > duration,
-    // the current setting is greater than 3000ms
-    ReservationSubmissionRequest submissionRequest =
-        ReservationSystemTestUtil.createSimpleReservationRequest(
-            reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
-    clientService.submitReservation(submissionRequest);
-
     // listReservations
     ReservationListRequest request = ReservationListRequest.newInstance(
-        queue, reservationID.toString(), startTime, endTime, 
includeResourceAllocations);
+        queue, reservationId, startTime, endTime, includeResourceAllocations);
     ReservationListResponse resRespInfo = 
clientService.listReservations(request);
     ReservationListInfo resResponse =
         new ReservationListInfo(resRespInfo, includeResourceAllocations);
 
-    if (mockRM != null) {
-      mockRM.stop();
+    return Response.status(Status.OK).entity(resResponse).build();
+  }
+
+  @Override
+  public Response createNewReservation(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ReservationId resId = ReservationId.newInstance(Time.now(), 
resCounter.incrementAndGet());
+    LOG.info("Allocated new reservationId: {}.", resId);
+
+    NewReservation reservationId = new NewReservation(resId.toString());
+    return Response.status(Status.OK).entity(reservationId).build();
+  }
+
+  @Override
+  public Response submitReservation(ReservationSubmissionRequestInfo 
resContext,
+      HttpServletRequest hsr) throws AuthorizationException, IOException, 
InterruptedException {
+
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
     }
 
-    return Response.status(Status.OK).entity(resResponse).build();
+    ReservationId reservationId = 
ReservationId.parseReservationId(resContext.getReservationId());
+    ReservationDefinitionInfo definitionInfo = 
resContext.getReservationDefinition();
+    ReservationDefinition definition =
+        RouterServerUtil.convertReservationDefinition(definitionInfo);
+    ReservationSubmissionRequest request = 
ReservationSubmissionRequest.newInstance(
+        definition, resContext.getQueue(), reservationId);
+    submitReservation(request);
+
+    LOG.info("Reservation submitted: {}.", reservationId);
+
+    SubClusterId subClusterId = getSubClusterId();
+    reservationMap.put(reservationId, subClusterId);
+
+    return Response.status(Status.ACCEPTED).build();
+  }
+
+  private void submitReservation(ReservationSubmissionRequest request) {
+    try {
+
+      // synchronize plan
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+
+      // Generate reserved resources
+      ClientRMService clientService = mockRM.getClientRMService();
+      clientService.submitReservation(request);
+    } catch (IOException | YarnException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Response updateReservation(ReservationUpdateRequestInfo resContext,
+      HttpServletRequest hsr) throws AuthorizationException, IOException, 
InterruptedException {
+
+    if (resContext == null || resContext.getReservationId() == null ||
+        resContext.getReservationDefinition() == null) {
+      return Response.status(Status.BAD_REQUEST).build();
+    }
+
+    String resId = resContext.getReservationId();
+    ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+    if (!reservationMap.containsKey(reservationId)) {
+      throw new NotFoundException("reservationId with id: " + reservationId + 
" not found");
+    }
+
+    // Generate reserved resources
+    updateReservation(resContext);
+
+    ReservationUpdateResponseInfo resRespInfo = new 
ReservationUpdateResponseInfo();
+    return Response.status(Status.OK).entity(resRespInfo).build();
+  }
+
+  private void updateReservation(ReservationUpdateRequestInfo resContext) {
+    try {
+
+      if (resContext == null) {
+        throw new BadRequestException("Input ReservationSubmissionContext 
should not be null");
+      }
+
+      ReservationDefinitionInfo resInfo = 
resContext.getReservationDefinition();
+      if (resInfo == null) {
+        throw new BadRequestException("Input ReservationDefinition should not 
be null");
+      }
+
+      ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
+      if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
+          || resReqsInfo.getReservationRequest().size() == 0) {

Review Comment:
   isEmpty()



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -1127,4 +1141,205 @@ public void testListReservation() throws Exception {
     Assert.assertEquals(1, vCore);
     Assert.assertEquals(1024, memory);
   }
+
+  @Test
+  public void testCreateNewReservation() throws Exception {
+    Response response = interceptor.createNewReservation(null);
+    Assert.assertNotNull(response);
+
+    Object entity = response.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertTrue(entity instanceof NewReservation);
+
+    NewReservation newReservation = (NewReservation) entity;
+    Assert.assertNotNull(newReservation);
+    
Assert.assertTrue(newReservation.getReservationId().contains("reservation"));
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 2);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    String applyReservationId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+        QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    Object entity = reservationResponse.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+    ReservationListInfo listInfo = (ReservationListInfo) entity;
+    Assert.assertNotNull(listInfo);
+
+    List<ReservationInfo> reservationInfos = listInfo.getReservations();
+    Assert.assertNotNull(reservationInfos);
+    Assert.assertEquals(1, reservationInfos.size());
+
+    ReservationInfo reservationInfo = reservationInfos.get(0);
+    Assert.assertNotNull(reservationInfo);
+    Assert.assertEquals(reservationInfo.getReservationId(), 
applyReservationId);
+  }
+
+  @Test
+  public void testUpdateReservation() throws Exception {
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 3);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    // update reservation
+    ReservationSubmissionRequest resSubRequest =
+        getReservationSubmissionRequest(reservationId, 6, 2048, 2);
+    ReservationDefinition reservationDefinition = 
resSubRequest.getReservationDefinition();
+    ReservationDefinitionInfo reservationDefinitionInfo =
+        new ReservationDefinitionInfo(reservationDefinition);
+
+    ReservationUpdateRequestInfo updateRequestInfo = new 
ReservationUpdateRequestInfo();
+    updateRequestInfo.setReservationId(reservationId.toString());
+    updateRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+    Response updateReservationResp = 
interceptor.updateReservation(updateRequestInfo, null);
+    Assert.assertNotNull(updateReservationResp);
+    Assert.assertEquals(Status.OK.getStatusCode(), 
updateReservationResp.getStatus());
+
+    String applyReservationId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+        QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    Object entity = reservationResponse.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+    ReservationListInfo listInfo = (ReservationListInfo) entity;
+    Assert.assertNotNull(listInfo);
+
+    List<ReservationInfo> reservationInfos = listInfo.getReservations();
+    Assert.assertNotNull(reservationInfos);
+    Assert.assertEquals(1, reservationInfos.size());
+
+    ReservationInfo reservationInfo = reservationInfos.get(0);
+    Assert.assertNotNull(reservationInfo);
+    Assert.assertEquals(reservationInfo.getReservationId(), 
applyReservationId);
+
+    ReservationDefinitionInfo resDefinitionInfo = 
reservationInfo.getReservationDefinition();
+    Assert.assertNotNull(resDefinitionInfo);
+
+    ReservationRequestsInfo reservationRequestsInfo = 
resDefinitionInfo.getReservationRequests();
+    Assert.assertNotNull(reservationRequestsInfo);
+
+    ArrayList<ReservationRequestInfo> reservationRequestInfoList =
+        reservationRequestsInfo.getReservationRequest();
+    Assert.assertNotNull(reservationRequestInfoList);
+    Assert.assertEquals(1, reservationRequestInfoList.size());
+
+    ReservationRequestInfo reservationRequestInfo = 
reservationRequestInfoList.get(0);
+    Assert.assertNotNull(reservationRequestInfo);
+    Assert.assertEquals(6, reservationRequestInfo.getNumContainers());
+
+    ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
+    Assert.assertNotNull(resourceInfo);
+
+    int vCore = resourceInfo.getvCores();
+    long memory = resourceInfo.getMemorySize();
+    Assert.assertEquals(2, vCore);
+    Assert.assertEquals(2048, memory);
+  }
+
+  @Test
+  public void testDeleteReservation() throws Exception {
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 4);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    String applyResId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+        QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    ReservationDeleteRequestInfo deleteRequestInfo =
+        new ReservationDeleteRequestInfo();
+    deleteRequestInfo.setReservationId(applyResId);
+    Response delResponse = interceptor.deleteReservation(deleteRequestInfo, 
null);
+    Assert.assertNotNull(delResponse);
+
+    LambdaTestUtils.intercept(Exception.class,
+        "reservationId with id: " + reservationId + " not found",
+        () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, 
-1, -1, false, null));
+  }
+
+  private Response submitReservation(ReservationId reservationId)
+      throws IOException, InterruptedException, YarnException {
+
+    SubClusterId homeSubClusterId = subClusters.get(0);
+    ReservationHomeSubCluster reservationHomeSubCluster =
+        ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId);
+    AddReservationHomeSubClusterRequest request =
+        
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
+    stateStore.addReservationHomeSubCluster(request);
+
+    ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+        getReservationSubmissionRequestInfo(reservationId);
+    Response response = 
interceptor.submitReservation(resSubmissionRequestInfo, null);
+    return response;
+  }
+
+  private ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo(
+      ReservationId reservationId) {
+
+    ReservationSubmissionRequest resSubRequest =
+        getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 
1);
+    ReservationDefinition reservationDefinition = 
resSubRequest.getReservationDefinition();
+
+    ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+        new ReservationSubmissionRequestInfo();
+    resSubmissionRequestInfo.setQueue(resSubRequest.getQueue());
+    resSubmissionRequestInfo.setReservationId(reservationId.toString());
+    ReservationDefinitionInfo reservationDefinitionInfo =
+        new ReservationDefinitionInfo(reservationDefinition);
+    
resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+
+    return resSubmissionRequestInfo;
+  }
+
+  private ReservationSubmissionRequest getReservationSubmissionRequest(
+      ReservationId reservationId, int numContainers, int memory, int vcore) {
+
+    // arrival time from which the resource(s) can be allocated.
+    long arrival = Time.now();
+
+    // deadline by when the resource(s) must be allocated.
+    // The reason for choosing 1.05 is because this gives an integer
+    // DURATION * 0.05 = 3000(ms)
+    // deadline = arrival + 3000ms
+    long deadline = (long) (arrival + 1.05 * DURATION);
+
+    ReservationSubmissionRequest submissionRequest = 
createSimpleReservationRequest(
+        reservationId, numContainers, arrival, deadline, DURATION, memory, 
vcore);
+
+    return submissionRequest;
+  }
+
+  public static ReservationSubmissionRequest createSimpleReservationRequest(
+      ReservationId reservationId, int numContainers, long arrival,
+      long deadline, long duration, int memory, int vcore) {
+    // create a request with a single atomic ask
+    ReservationRequest r = ReservationRequest

Review Comment:
   Small nit, this would look better as:
   ```
   ReservationRequest r = ReservationRequest.newInstance(
       Resource.newInstance(memory, vcore), numContainers, 1, duration);
   ```



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java:
##########
@@ -1127,4 +1141,205 @@ public void testListReservation() throws Exception {
     Assert.assertEquals(1, vCore);
     Assert.assertEquals(1024, memory);
   }
+
+  @Test
+  public void testCreateNewReservation() throws Exception {
+    Response response = interceptor.createNewReservation(null);
+    Assert.assertNotNull(response);
+
+    Object entity = response.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertTrue(entity instanceof NewReservation);
+
+    NewReservation newReservation = (NewReservation) entity;
+    Assert.assertNotNull(newReservation);
+    
Assert.assertTrue(newReservation.getReservationId().contains("reservation"));
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 2);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    String applyReservationId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+        QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    Object entity = reservationResponse.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+    ReservationListInfo listInfo = (ReservationListInfo) entity;
+    Assert.assertNotNull(listInfo);
+
+    List<ReservationInfo> reservationInfos = listInfo.getReservations();
+    Assert.assertNotNull(reservationInfos);
+    Assert.assertEquals(1, reservationInfos.size());
+
+    ReservationInfo reservationInfo = reservationInfos.get(0);
+    Assert.assertNotNull(reservationInfo);
+    Assert.assertEquals(reservationInfo.getReservationId(), 
applyReservationId);
+  }
+
+  @Test
+  public void testUpdateReservation() throws Exception {
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 3);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    // update reservation
+    ReservationSubmissionRequest resSubRequest =
+        getReservationSubmissionRequest(reservationId, 6, 2048, 2);
+    ReservationDefinition reservationDefinition = 
resSubRequest.getReservationDefinition();
+    ReservationDefinitionInfo reservationDefinitionInfo =
+        new ReservationDefinitionInfo(reservationDefinition);
+
+    ReservationUpdateRequestInfo updateRequestInfo = new 
ReservationUpdateRequestInfo();
+    updateRequestInfo.setReservationId(reservationId.toString());
+    updateRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+    Response updateReservationResp = 
interceptor.updateReservation(updateRequestInfo, null);
+    Assert.assertNotNull(updateReservationResp);
+    Assert.assertEquals(Status.OK.getStatusCode(), 
updateReservationResp.getStatus());
+
+    String applyReservationId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+        QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    Object entity = reservationResponse.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+    ReservationListInfo listInfo = (ReservationListInfo) entity;
+    Assert.assertNotNull(listInfo);
+
+    List<ReservationInfo> reservationInfos = listInfo.getReservations();
+    Assert.assertNotNull(reservationInfos);
+    Assert.assertEquals(1, reservationInfos.size());
+
+    ReservationInfo reservationInfo = reservationInfos.get(0);
+    Assert.assertNotNull(reservationInfo);
+    Assert.assertEquals(reservationInfo.getReservationId(), 
applyReservationId);
+
+    ReservationDefinitionInfo resDefinitionInfo = 
reservationInfo.getReservationDefinition();
+    Assert.assertNotNull(resDefinitionInfo);
+
+    ReservationRequestsInfo reservationRequestsInfo = 
resDefinitionInfo.getReservationRequests();
+    Assert.assertNotNull(reservationRequestsInfo);
+
+    ArrayList<ReservationRequestInfo> reservationRequestInfoList =
+        reservationRequestsInfo.getReservationRequest();
+    Assert.assertNotNull(reservationRequestInfoList);
+    Assert.assertEquals(1, reservationRequestInfoList.size());
+
+    ReservationRequestInfo reservationRequestInfo = 
reservationRequestInfoList.get(0);
+    Assert.assertNotNull(reservationRequestInfo);
+    Assert.assertEquals(6, reservationRequestInfo.getNumContainers());
+
+    ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
+    Assert.assertNotNull(resourceInfo);
+
+    int vCore = resourceInfo.getvCores();
+    long memory = resourceInfo.getMemorySize();
+    Assert.assertEquals(2, vCore);
+    Assert.assertEquals(2048, memory);
+  }
+
+  @Test
+  public void testDeleteReservation() throws Exception {
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 4);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    String applyResId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+        QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    ReservationDeleteRequestInfo deleteRequestInfo =
+        new ReservationDeleteRequestInfo();
+    deleteRequestInfo.setReservationId(applyResId);
+    Response delResponse = interceptor.deleteReservation(deleteRequestInfo, 
null);
+    Assert.assertNotNull(delResponse);
+
+    LambdaTestUtils.intercept(Exception.class,
+        "reservationId with id: " + reservationId + " not found",
+        () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, 
-1, -1, false, null));
+  }
+
+  private Response submitReservation(ReservationId reservationId)
+      throws IOException, InterruptedException, YarnException {
+
+    SubClusterId homeSubClusterId = subClusters.get(0);
+    ReservationHomeSubCluster reservationHomeSubCluster =
+        ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId);
+    AddReservationHomeSubClusterRequest request =
+        
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
+    stateStore.addReservationHomeSubCluster(request);
+
+    ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+        getReservationSubmissionRequestInfo(reservationId);
+    Response response = 
interceptor.submitReservation(resSubmissionRequestInfo, null);
+    return response;
+  }
+
+  private ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo(
+      ReservationId reservationId) {
+
+    ReservationSubmissionRequest resSubRequest =
+        getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 
1);
+    ReservationDefinition reservationDefinition = 
resSubRequest.getReservationDefinition();
+
+    ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+        new ReservationSubmissionRequestInfo();
+    resSubmissionRequestInfo.setQueue(resSubRequest.getQueue());
+    resSubmissionRequestInfo.setReservationId(reservationId.toString());
+    ReservationDefinitionInfo reservationDefinitionInfo =
+        new ReservationDefinitionInfo(reservationDefinition);
+    
resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+
+    return resSubmissionRequestInfo;
+  }
+
+  private ReservationSubmissionRequest getReservationSubmissionRequest(
+      ReservationId reservationId, int numContainers, int memory, int vcore) {
+
+    // arrival time from which the resource(s) can be allocated.
+    long arrival = Time.now();
+
+    // deadline by when the resource(s) must be allocated.
+    // The reason for choosing 1.05 is because this gives an integer
+    // DURATION * 0.05 = 3000(ms)
+    // deadline = arrival + 3000ms
+    long deadline = (long) (arrival + 1.05 * DURATION);
+
+    ReservationSubmissionRequest submissionRequest = 
createSimpleReservationRequest(
+        reservationId, numContainers, arrival, deadline, DURATION, memory, 
vcore);
+
+    return submissionRequest;
+  }
+
+  public static ReservationSubmissionRequest createSimpleReservationRequest(
+      ReservationId reservationId, int numContainers, long arrival,
+      long deadline, long duration, int memory, int vcore) {
+    // create a request with a single atomic ask
+    ReservationRequest r = ReservationRequest
+        .newInstance(Resource.newInstance(memory, vcore), numContainers, 1, 
duration);
+    ReservationRequests reqs = ReservationRequests.newInstance(
+        Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
+        deadline, reqs, "testClientRMService#reservation", "0", 
Priority.UNDEFINED);
+    ReservationSubmissionRequest request = ReservationSubmissionRequest

Review Comment:
   Same here for the break line



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -222,4 +236,115 @@ public static void logAndThrowRunTimeException(Throwable 
t, String errMsgFormat,
       throw new RuntimeException(msg);
     }
   }
+
+  /**
+   * Save Reservation And HomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public static void addReservationHomeSubCluster(FederationStateStoreFacade 
federationFacade,
+      ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) 
throws YarnException {
+    try {
+      // persist the mapping of reservationId and the subClusterId which has
+      // been selected as its home
+      federationFacade.addReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to insert the ReservationId %s into the 
FederationStateStore.", reservationId);
+    }
+  }
+
+  /**
+   * Update Reservation And HomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param subClusterId subClusterId
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public static void 
updateReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+      SubClusterId subClusterId, ReservationId reservationId,
+      ReservationHomeSubCluster homeSubCluster) throws YarnException {
+    try {
+      // update the mapping of reservationId and the home subClusterId to
+      // the new subClusterId we have selected
+      federationFacade.updateReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      SubClusterId subClusterIdInStateStore =
+          federationFacade.getReservationHomeSubCluster(reservationId);
+      if (subClusterId == subClusterIdInStateStore) {
+        LOG.info("Reservation {} already submitted on SubCluster {}.", 
reservationId, subClusterId);
+      } else {
+        RouterServerUtil.logAndThrowException(e,
+            "Unable to update the ReservationId %s into the 
FederationStateStore.", reservationId);
+      }
+    }
+  }
+
+  /**
+   * Exists ReservationHomeSubCluster Mapping.
+   *
+   * @param federationFacade federation facade
+   * @param reservationId reservationId
+   * @return true - exist, false - not exist
+   */
+  public static Boolean 
existsReservationHomeSubCluster(FederationStateStoreFacade federationFacade,
+      ReservationId reservationId) {
+    try {
+      SubClusterId subClusterId = 
federationFacade.getReservationHomeSubCluster(reservationId);
+      if (subClusterId != null) {
+        return true;
+      }
+    } catch (YarnException e) {
+      LOG.warn("get homeSubCluster by reservationId = {} error.", 
reservationId, e);
+    }
+    return false;
+  }
+
+  public static ReservationDefinition convertReservationDefinition(
+      ReservationDefinitionInfo definitionInfo) {
+
+    // basic variable
+    long arrival = definitionInfo.getArrival();
+    long deadline = definitionInfo.getDeadline();
+
+    // ReservationRequests reservationRequests
+    String name = definitionInfo.getReservationName();
+    String recurrenceExpression = definitionInfo.getRecurrenceExpression();
+    Priority priority = Priority.newInstance(definitionInfo.getPriority());
+
+    // reservation requests info
+    List<ReservationRequest> reservationRequestList = new ArrayList<>();
+
+    ReservationRequestsInfo reservationRequestsInfo = 
definitionInfo.getReservationRequests();
+
+    List<ReservationRequestInfo> reservationRequestInfos =
+        reservationRequestsInfo.getReservationRequest();
+
+    for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
+      ResourceInfo resourceInfo = resRequestInfo.getCapability();
+      Resource capability =
+          Resource.newInstance(resourceInfo.getMemorySize(), 
resourceInfo.getvCores());
+      ReservationRequest reservationRequest = 
ReservationRequest.newInstance(capability,
+          resRequestInfo.getNumContainers(), 
resRequestInfo.getMinConcurrency(),
+          resRequestInfo.getDuration());
+      reservationRequestList.add(reservationRequest);
+    }
+
+    ReservationRequestInterpreter[] values = 
ReservationRequestInterpreter.values();
+    ReservationRequestInterpreter reservationRequestInterpreter =
+        values[reservationRequestsInfo.getReservationRequestsInterpreter()];
+    ReservationRequests reservationRequests =
+        ReservationRequests.newInstance(reservationRequestList, 
reservationRequestInterpreter);
+
+    ReservationDefinition definition =

Review Comment:
   First two lines into one line.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java:
##########
@@ -51,4 +70,48 @@ protected void registerBadSubCluster(SubClusterId badSC) {
     interceptor.setRunning(false);
   }
 
+  protected void setupResourceManager() throws IOException {
+    try {
+      if (mockRM != null) {
+        return;
+      }
+
+      DefaultMetricsSystem.setMiniClusterMode(true);
+      CapacitySchedulerConfiguration conf = new 
CapacitySchedulerConfiguration();
+
+      // Define default queue
+      conf.setCapacity(QUEUE_DEFAULT_FULL, 20);
+      // Define dedicated queues
+      String[] queues = new String[]{QUEUE_DEFAULT, QUEUE_DEDICATED};
+      conf.setQueues(CapacitySchedulerConfiguration.ROOT, queues);
+      conf.setCapacity(QUEUE_DEDICATED_FULL, 80);
+      conf.setReservable(QUEUE_DEDICATED_FULL, true);
+
+      conf.setClass(YarnConfiguration.RM_SCHEDULER,
+          CapacityScheduler.class, ResourceScheduler.class);
+      conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+      conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, 
false);
+
+      mockRM = new MockRM(conf);
+      mockRM.start();
+      mockRM.registerNode("127.0.0.1:5678", 100*1024, 100);
+
+      Map<SubClusterId, DefaultRequestInterceptorREST> interceptors = 
super.getInterceptors();
+      for (DefaultRequestInterceptorREST item : interceptors.values()) {
+        MockDefaultRequestInterceptorREST interceptor = 
(MockDefaultRequestInterceptorREST) item;
+        interceptor.setMockRM(mockRM);
+      }
+    } catch (Exception e) {
+      LOG.error("setupResourceManager failed.", e);
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    if (mockRM != null) {
+      mockRM.stop();

Review Comment:
   `mockRM = null;`





> [Federation] Add createNewReservation, submitReservation, updateReservation, 
> deleteReservation REST APIs for Router
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: YARN-11226
>                 URL: https://issues.apache.org/jira/browse/YARN-11226
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: federation
>    Affects Versions: 3.4.0, 3.3.4
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to