[
https://issues.apache.org/jira/browse/YARN-11226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644737#comment-17644737
]
ASF GitHub Bot commented on YARN-11226:
---------------------------------------
slfan1989 commented on code in PR #5175:
URL: https://github.com/apache/hadoop/pull/5175#discussion_r1043222279
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java:
##########
@@ -859,44 +878,191 @@ public Response listReservation(String queue, String
reservationId, long startTi
" Please try again with a valid reservable queue.");
}
- MockRM mockRM = setupResourceManager();
+ ReservationId reservationID =
+ ReservationId.parseReservationId(reservationId);
- ReservationId reservationID =
ReservationId.parseReservationId(reservationId);
- ReservationSystem reservationSystem = mockRM.getReservationSystem();
- reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+ if (!reservationMap.containsKey(reservationID)) {
+ throw new NotFoundException("reservationId with id: " + reservationId +
" not found");
+ }
- // Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();
- // arrival time from which the resource(s) can be allocated.
- long arrival = Time.now();
-
- // deadline by when the resource(s) must be allocated.
- // The reason for choosing 1.05 is because this gives an integer
- // DURATION * 0.05 = 3000(ms)
- // deadline = arrival + 3000ms
- long deadline = (long) (arrival + 1.05 * DURATION);
-
- // In this test of reserved resources, we will apply for 4 containers (1
core, 1GB memory)
- // arrival = Time.now(), and make sure deadline - arrival > duration,
- // the current setting is greater than 3000ms
- ReservationSubmissionRequest submissionRequest =
- ReservationSystemTestUtil.createSimpleReservationRequest(
- reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
- clientService.submitReservation(submissionRequest);
-
// listReservations
ReservationListRequest request = ReservationListRequest.newInstance(
- queue, reservationID.toString(), startTime, endTime,
includeResourceAllocations);
+ queue, reservationId, startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo =
clientService.listReservations(request);
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
- if (mockRM != null) {
- mockRM.stop();
+ return Response.status(Status.OK).entity(resResponse).build();
+ }
+
+ @Override
+ public Response createNewReservation(HttpServletRequest hsr)
+ throws AuthorizationException, IOException, InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ ReservationId resId = ReservationId.newInstance(Time.now(),
resCounter.incrementAndGet());
+ LOG.info("Allocated new reservationId: {}.", resId);
+
+ NewReservation reservationId = new NewReservation(resId.toString());
+ return Response.status(Status.OK).entity(reservationId).build();
+ }
+
+ @Override
+ public Response submitReservation(ReservationSubmissionRequestInfo
resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
}
- return Response.status(Status.OK).entity(resResponse).build();
+ ReservationId reservationId =
ReservationId.parseReservationId(resContext.getReservationId());
+ ReservationDefinitionInfo definitionInfo =
resContext.getReservationDefinition();
+ ReservationDefinition definition =
+ RouterServerUtil.convertReservationDefinition(definitionInfo);
+ ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(
+ definition, resContext.getQueue(), reservationId);
+ submitReservation(request);
+
+ LOG.info("Reservation submitted: {}.", reservationId);
+
+ SubClusterId subClusterId = getSubClusterId();
+ reservationMap.put(reservationId, subClusterId);
+
+ return Response.status(Status.ACCEPTED).build();
+ }
+
+ private void submitReservation(ReservationSubmissionRequest request) {
+ try {
+ // synchronize plan
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+ // Generate reserved resources
+ ClientRMService clientService = mockRM.getClientRMService();
+ clientService.submitReservation(request);
+ } catch (IOException | YarnException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Response updateReservation(ReservationUpdateRequestInfo resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException {
+
+ if (resContext == null || resContext.getReservationId() == null ||
+ resContext.getReservationDefinition() == null) {
Review Comment:
Thank you very much for helping to review the code, I will fix it.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java:
##########
@@ -859,44 +878,191 @@ public Response listReservation(String queue, String
reservationId, long startTi
" Please try again with a valid reservable queue.");
}
- MockRM mockRM = setupResourceManager();
+ ReservationId reservationID =
+ ReservationId.parseReservationId(reservationId);
- ReservationId reservationID =
ReservationId.parseReservationId(reservationId);
- ReservationSystem reservationSystem = mockRM.getReservationSystem();
- reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+ if (!reservationMap.containsKey(reservationID)) {
+ throw new NotFoundException("reservationId with id: " + reservationId +
" not found");
+ }
- // Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();
- // arrival time from which the resource(s) can be allocated.
- long arrival = Time.now();
-
- // deadline by when the resource(s) must be allocated.
- // The reason for choosing 1.05 is because this gives an integer
- // DURATION * 0.05 = 3000(ms)
- // deadline = arrival + 3000ms
- long deadline = (long) (arrival + 1.05 * DURATION);
-
- // In this test of reserved resources, we will apply for 4 containers (1
core, 1GB memory)
- // arrival = Time.now(), and make sure deadline - arrival > duration,
- // the current setting is greater than 3000ms
- ReservationSubmissionRequest submissionRequest =
- ReservationSystemTestUtil.createSimpleReservationRequest(
- reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
- clientService.submitReservation(submissionRequest);
-
// listReservations
ReservationListRequest request = ReservationListRequest.newInstance(
- queue, reservationID.toString(), startTime, endTime,
includeResourceAllocations);
+ queue, reservationId, startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo =
clientService.listReservations(request);
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
- if (mockRM != null) {
- mockRM.stop();
+ return Response.status(Status.OK).entity(resResponse).build();
+ }
+
+ @Override
+ public Response createNewReservation(HttpServletRequest hsr)
+ throws AuthorizationException, IOException, InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ ReservationId resId = ReservationId.newInstance(Time.now(),
resCounter.incrementAndGet());
+ LOG.info("Allocated new reservationId: {}.", resId);
+
+ NewReservation reservationId = new NewReservation(resId.toString());
+ return Response.status(Status.OK).entity(reservationId).build();
+ }
+
+ @Override
+ public Response submitReservation(ReservationSubmissionRequestInfo
resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException {
+
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
}
- return Response.status(Status.OK).entity(resResponse).build();
+ ReservationId reservationId =
ReservationId.parseReservationId(resContext.getReservationId());
+ ReservationDefinitionInfo definitionInfo =
resContext.getReservationDefinition();
+ ReservationDefinition definition =
+ RouterServerUtil.convertReservationDefinition(definitionInfo);
+ ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(
+ definition, resContext.getQueue(), reservationId);
+ submitReservation(request);
+
+ LOG.info("Reservation submitted: {}.", reservationId);
+
+ SubClusterId subClusterId = getSubClusterId();
+ reservationMap.put(reservationId, subClusterId);
+
+ return Response.status(Status.ACCEPTED).build();
+ }
+
+ private void submitReservation(ReservationSubmissionRequest request) {
+ try {
+ // synchronize plan
+ ReservationSystem reservationSystem = mockRM.getReservationSystem();
+ reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+ // Generate reserved resources
+ ClientRMService clientService = mockRM.getClientRMService();
+ clientService.submitReservation(request);
+ } catch (IOException | YarnException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Response updateReservation(ReservationUpdateRequestInfo resContext,
+ HttpServletRequest hsr) throws AuthorizationException, IOException,
InterruptedException {
+
+ if (resContext == null || resContext.getReservationId() == null ||
+ resContext.getReservationDefinition() == null) {
+ return Response.status(Status.BAD_REQUEST).build();
+ }
+
+ String resId = resContext.getReservationId();
+ ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+ if (!reservationMap.containsKey(reservationId)) {
+ throw new NotFoundException("reservationId with id: " + reservationId +
" not found");
+ }
+
+ // Generate reserved resources
+ updateReservation(resContext);
+
+ ReservationUpdateResponseInfo resRespInfo = new
ReservationUpdateResponseInfo();
+ return Response.status(Status.OK).entity(resRespInfo).build();
+ }
+
+ private void updateReservation(ReservationUpdateRequestInfo resContext)
throws IOException {
+
+ if (resContext == null) {
+ throw new BadRequestException("Input ReservationSubmissionContext should
not be null");
+ }
+
+ ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
+ if (resInfo == null) {
+ throw new BadRequestException("Input ReservationDefinition should not be
null");
+ }
+
+ ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
+ if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
+ || resReqsInfo.getReservationRequest().isEmpty()) {
+ throw new BadRequestException("The ReservationDefinition should " +
+ "contain at least one ReservationRequest");
+ }
+
+ if (resContext.getReservationId() == null) {
+ throw new BadRequestException("Update operations must specify an
existing ReservaitonId");
+ }
+
+ ReservationRequestInterpreter[] values =
ReservationRequestInterpreter.values();
+ ReservationRequestInterpreter requestInterpreter =
+ values[resReqsInfo.getReservationRequestsInterpreter()];
+ List<ReservationRequest> list = new ArrayList<>();
+
+ for (ReservationRequestInfo resReqInfo :
resReqsInfo.getReservationRequest()) {
+ ResourceInfo rInfo = resReqInfo.getCapability();
+ Resource capability = Resource.newInstance(rInfo.getMemorySize(),
rInfo.getvCores());
+ int numContainers = resReqInfo.getNumContainers();
+ int minConcurrency = resReqInfo.getMinConcurrency();
+ long duration = resReqInfo.getDuration();
+ ReservationRequest rr = ReservationRequest.newInstance(
+ capability, numContainers, minConcurrency, duration);
+ list.add(rr);
+ }
+
+ ReservationRequests reqs = ReservationRequests.newInstance(list,
requestInterpreter);
+ ReservationDefinition rDef = ReservationDefinition.newInstance(
+ resInfo.getArrival(), resInfo.getDeadline(), reqs,
+ resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
+ Priority.newInstance(resInfo.getPriority()));
+ ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
+ rDef, ReservationId.parseReservationId(resContext.getReservationId()));
+
+ ClientRMService clientService = mockRM.getClientRMService();
+ try {
+ clientService.updateReservation(request);
+ } catch (YarnException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public Response deleteReservation(ReservationDeleteRequestInfo resContext,
HttpServletRequest hsr)
+ throws AuthorizationException, IOException, InterruptedException {
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+
+ try {
+ String resId = resContext.getReservationId();
+ ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+ if (!reservationMap.containsKey(reservationId)) {
+ throw new NotFoundException("reservationId with id: " + reservationId
+ " not found");
+ }
+
+ ReservationDeleteRequest reservationDeleteRequest =
+ ReservationDeleteRequest.newInstance(reservationId);
+ ClientRMService clientService = mockRM.getClientRMService();
+ clientService.deleteReservation(reservationDeleteRequest);
+
+ ReservationDeleteResponseInfo resRespInfo = new
ReservationDeleteResponseInfo();
+ reservationMap.remove(reservationId);
+
+ return Response.status(Status.OK).entity(resRespInfo).build();
+ } catch (YarnException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ public MockRM getMockRM() {
+ return mockRM;
+ }
+
+ @VisibleForTesting
+ public void setMockRM(MockRM mockRM) {
+ this.mockRM = mockRM;
Review Comment:
I will fix it.
> [Federation] Add createNewReservation, submitReservation, updateReservation,
> deleteReservation REST APIs for Router
> -------------------------------------------------------------------------------------------------------------------
>
> Key: YARN-11226
> URL: https://issues.apache.org/jira/browse/YARN-11226
> Project: Hadoop YARN
> Issue Type: Sub-task
> Components: federation
> Affects Versions: 3.4.0, 3.3.4
> Reporter: Shilun Fan
> Assignee: Shilun Fan
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]