[ 
https://issues.apache.org/jira/browse/YARN-11273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17597289#comment-17597289
 ] 

ASF GitHub Bot commented on YARN-11273:
---------------------------------------

goiri commented on code in PR #4817:
URL: https://github.com/apache/hadoop/pull/4817#discussion_r957556044


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1035,275 @@ private static byte[] getByteArray(ByteBuffer bb) {
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+    CallableStatement cstmt = null;
+
+    ReservationHomeSubCluster reservationHomeSubCluster = 
request.getReservationHomeSubCluster();
+    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+    SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+    SubClusterId subClusterHomeId = null;
+
+    try {
+      // Call procedure
+      cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, reservationId.toString());
+      cstmt.setString(2, subClusterId.getId());
+      cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      // Get SubClusterHome
+      String subClusterHomeIdString = cstmt.getString(3);
+      subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+      // Get rowCount
+      int rowCount = cstmt.getInt(4);
+
+      // For failover reason, we check the returned subClusterId.
+      // 1.If it is equal to the subClusterId we sent, the call added the new
+      // reservation into FederationStateStore.
+      // 2.If the call returns a different subClusterId
+      // it means we already tried to insert this reservation
+      // but a component (Router/StateStore/RM) failed during the submission.
+      if (subClusterId.equals(subClusterHomeId)) {
+        // if it is equal to 0
+        // it means the call did not add a new reservation into 
FederationStateStore.
+        if (rowCount == 0) {
+          LOG.info("The reservation {} was not inserted in the StateStore 
because it" +
+              " was already present in subCluster {}", reservationId, 
subClusterHomeId);
+        } else if (rowCount != 1) {
+          // if it is different from 1
+          // it means the call had a wrong behavior. Maybe the database is not 
set correctly.
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "Wrong behavior during the insertion of subCluster %s.", 
subClusterId);
+        }
+      } else {
+        // If it is different from 0,
+        // it means that there is a data situation that does not meet the 
expectations,
+        // and an exception should be thrown at this time
+        if (rowCount != 0) {
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "The reservation %s does exist but was overwritten.", 
reservationId);
+        }
+        LOG.info("Reservation: {} already present with subCluster: {}.",
+            reservationId, subClusterHomeId);
+      }
+
+      // Record successful call time
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly generated reservation %s to subCluster 
%s.",
+          reservationId, subClusterId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
   }
 
   @Override
   public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
       GetReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+    SubClusterId subClusterId = null;
+
+    try {
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, reservationId.toString());
+      cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.execute();
+      long stopTime = clock.getTime();
+
+      // Get Result
+      String subClusterHomeIdString = cstmt.getString(2);
+
+      if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+        subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+      } else {
+        // If subClusterHomeIdString blank, we need to throw an exception
+        FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+            "Reservation %s does not exist", reservationId);
+      }
+
+      LOG.info("Got the information about the specified reservation {} in 
subCluster = {}.",
+          reservationId, subClusterId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the reservation information according to %s.", 
reservationId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    ReservationHomeSubCluster homeSubCluster =
+        ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+    return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
   }
 
   @Override
   public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
       GetReservationsHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    CallableStatement cstmt = null;
+    ResultSet rs = null;
+    List<ReservationHomeSubCluster> reservationsHomeSubClusters = new 
ArrayList<>();
+
+    try {
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
+
+      while (rs.next()) {
+        // Extract the output for each tuple
+        String dbReservationId = rs.getString(1);

Review Comment:
   Is there a better way to having indices?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1035,275 @@ private static byte[] getByteArray(ByteBuffer bb) {
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(

Review Comment:
   Is there a way to add tests to any of this?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1035,275 @@ private static byte[] getByteArray(ByteBuffer bb) {
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+    CallableStatement cstmt = null;
+
+    ReservationHomeSubCluster reservationHomeSubCluster = 
request.getReservationHomeSubCluster();
+    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+    SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+    SubClusterId subClusterHomeId = null;
+
+    try {
+      // Call procedure
+      cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, reservationId.toString());
+      cstmt.setString(2, subClusterId.getId());
+      cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      // Get SubClusterHome
+      String subClusterHomeIdString = cstmt.getString(3);
+      subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+      // Get rowCount
+      int rowCount = cstmt.getInt(4);
+
+      // For failover reason, we check the returned subClusterId.
+      // 1.If it is equal to the subClusterId we sent, the call added the new
+      // reservation into FederationStateStore.
+      // 2.If the call returns a different subClusterId
+      // it means we already tried to insert this reservation
+      // but a component (Router/StateStore/RM) failed during the submission.
+      if (subClusterId.equals(subClusterHomeId)) {
+        // if it is equal to 0
+        // it means the call did not add a new reservation into 
FederationStateStore.
+        if (rowCount == 0) {
+          LOG.info("The reservation {} was not inserted in the StateStore 
because it" +
+              " was already present in subCluster {}", reservationId, 
subClusterHomeId);
+        } else if (rowCount != 1) {
+          // if it is different from 1
+          // it means the call had a wrong behavior. Maybe the database is not 
set correctly.
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "Wrong behavior during the insertion of subCluster %s.", 
subClusterId);
+        }
+      } else {
+        // If it is different from 0,
+        // it means that there is a data situation that does not meet the 
expectations,
+        // and an exception should be thrown at this time
+        if (rowCount != 0) {
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "The reservation %s does exist but was overwritten.", 
reservationId);
+        }
+        LOG.info("Reservation: {} already present with subCluster: {}.",
+            reservationId, subClusterHomeId);
+      }
+
+      // Record successful call time
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly generated reservation %s to subCluster 
%s.",
+          reservationId, subClusterId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
   }
 
   @Override
   public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
       GetReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+    SubClusterId subClusterId = null;
+
+    try {
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, reservationId.toString());
+      cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.execute();
+      long stopTime = clock.getTime();
+
+      // Get Result
+      String subClusterHomeIdString = cstmt.getString(2);
+
+      if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+        subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+      } else {
+        // If subClusterHomeIdString blank, we need to throw an exception
+        FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+            "Reservation %s does not exist", reservationId);
+      }
+
+      LOG.info("Got the information about the specified reservation {} in 
subCluster = {}.",
+          reservationId, subClusterId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the reservation information according to %s.", 
reservationId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    ReservationHomeSubCluster homeSubCluster =
+        ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+    return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
   }
 
   @Override
   public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
       GetReservationsHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    CallableStatement cstmt = null;
+    ResultSet rs = null;
+    List<ReservationHomeSubCluster> reservationsHomeSubClusters = new 
ArrayList<>();
+
+    try {
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
+
+      while (rs.next()) {
+        // Extract the output for each tuple
+        String dbReservationId = rs.getString(1);
+        String dbHomeSubCluster = rs.getString(2);
+
+        // Generate parameters
+        ReservationId reservationId = 
ReservationId.parseReservationId(dbReservationId);
+        SubClusterId homeSubCluster = 
SubClusterId.newInstance(dbHomeSubCluster);
+        ReservationHomeSubCluster reservationHomeSubCluster =
+            ReservationHomeSubCluster.newInstance(reservationId, 
homeSubCluster);
+        reservationsHomeSubClusters.add(reservationHomeSubCluster);
+      }
+
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (Exception e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to obtain the information for all the reservations.", e);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
+    }
+
+    return GetReservationsHomeSubClusterResponse.newInstance(
+        reservationsHomeSubClusters);
   }
 
   @Override
   public DeleteReservationHomeSubClusterResponse 
deleteReservationHomeSubCluster(
       DeleteReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+
+    try {
+      cstmt = getCallableStatement(CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, reservationId.toString());
+      cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      int rowCount = cstmt.getInt(2);
+
+      // if it is equal to 0 it means the call
+      // did not delete the reservation from FederationStateStore
+      if (rowCount == 0) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Reservation %s does not exist", reservationId);
+      } else if (rowCount != 1) {
+        // if it is different from 1 it means the call
+        // had a wrong behavior. Maybe the database is not set correctly.
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during deleting the reservation %s.", 
reservationId);

Review Comment:
   Can this be more specific?





> [RESERVATION] Federation StateStore: Support storage/retrieval of 
> Reservations With SQL
> ---------------------------------------------------------------------------------------
>
>                 Key: YARN-11273
>                 URL: https://issues.apache.org/jira/browse/YARN-11273
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: federation
>    Affects Versions: 3.4.0
>            Reporter: fanshilun
>            Assignee: fanshilun
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to