[
https://issues.apache.org/jira/browse/YARN-11273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17597503#comment-17597503
]
ASF GitHub Bot commented on YARN-11273:
---------------------------------------
slfan1989 commented on code in PR #4817:
URL: https://github.com/apache/hadoop/pull/4817#discussion_r957972465
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1035,275 @@ private static byte[] getByteArray(ByteBuffer bb) {
@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
- throw new NotImplementedException("Code is not implemented");
+ // validate
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+ CallableStatement cstmt = null;
+
+ ReservationHomeSubCluster reservationHomeSubCluster =
request.getReservationHomeSubCluster();
+ ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+ SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+ SubClusterId subClusterHomeId = null;
+
+ try {
+ // Call procedure
+ cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, reservationId.toString());
+ cstmt.setString(2, subClusterId.getId());
+ cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
+ cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ cstmt.executeUpdate();
+ long stopTime = clock.getTime();
+
+ // Get SubClusterHome
+ String subClusterHomeIdString = cstmt.getString(3);
+ subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+ // Get rowCount
+ int rowCount = cstmt.getInt(4);
+
+ // For failover reason, we check the returned subClusterId.
+ // 1.If it is equal to the subClusterId we sent, the call added the new
+ // reservation into FederationStateStore.
+ // 2.If the call returns a different subClusterId
+ // it means we already tried to insert this reservation
+ // but a component (Router/StateStore/RM) failed during the submission.
+ if (subClusterId.equals(subClusterHomeId)) {
+ // if it is equal to 0
+ // it means the call did not add a new reservation into
FederationStateStore.
+ if (rowCount == 0) {
+ LOG.info("The reservation {} was not inserted in the StateStore
because it" +
+ " was already present in subCluster {}", reservationId,
subClusterHomeId);
+ } else if (rowCount != 1) {
+ // if it is different from 1
+ // it means the call had a wrong behavior. Maybe the database is not
set correctly.
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during the insertion of subCluster %s.",
subClusterId);
+ }
+ } else {
+ // If it is different from 0,
+ // it means that there is a data situation that does not meet the
expectations,
+ // and an exception should be thrown at this time
+ if (rowCount != 0) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "The reservation %s does exist but was overwritten.",
reservationId);
+ }
+ LOG.info("Reservation: {} already present with subCluster: {}.",
+ reservationId, subClusterHomeId);
+ }
+
+ // Record successful call time
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to insert the newly generated reservation %s to subCluster
%s.",
+ reservationId, subClusterId);
+ } finally {
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
+ }
+
+ return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
}
@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
- throw new NotImplementedException("Code is not implemented");
+ // validate
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+ CallableStatement cstmt = null;
+ ReservationId reservationId = request.getReservationId();
+ SubClusterId subClusterId = null;
+
+ try {
+ cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, reservationId.toString());
+ cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ cstmt.execute();
+ long stopTime = clock.getTime();
+
+ // Get Result
+ String subClusterHomeIdString = cstmt.getString(2);
+
+ if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+ subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+ } else {
+ // If subClusterHomeIdString blank, we need to throw an exception
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Reservation %s does not exist", reservationId);
+ }
+
+ LOG.info("Got the information about the specified reservation {} in
subCluster = {}.",
+ reservationId, subClusterId);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to obtain the reservation information according to %s.",
reservationId);
+ } finally {
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
+ }
+
+ ReservationHomeSubCluster homeSubCluster =
+ ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+ return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
}
@Override
public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException {
- throw new NotImplementedException("Code is not implemented");
+ CallableStatement cstmt = null;
+ ResultSet rs = null;
+ List<ReservationHomeSubCluster> reservationsHomeSubClusters = new
ArrayList<>();
+
+ try {
+ cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ rs = cstmt.executeQuery();
+ long stopTime = clock.getTime();
+
+ while (rs.next()) {
+ // Extract the output for each tuple
+ String dbReservationId = rs.getString(1);
Review Comment:
Thank you very much for your suggestion, I agree with your point of view,
the readability of this part of the code is really not very good, I add some
comments and name the variables for indices.
> [RESERVATION] Federation StateStore: Support storage/retrieval of
> Reservations With SQL
> ---------------------------------------------------------------------------------------
>
> Key: YARN-11273
> URL: https://issues.apache.org/jira/browse/YARN-11273
> Project: Hadoop YARN
> Issue Type: Sub-task
> Components: federation
> Affects Versions: 3.4.0
> Reporter: fanshilun
> Assignee: fanshilun
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]