[
https://issues.apache.org/jira/browse/YARN-11273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599219#comment-17599219
]
ASF GitHub Bot commented on YARN-11273:
---------------------------------------
slfan1989 commented on code in PR #4817:
URL: https://github.com/apache/hadoop/pull/4817#discussion_r961227793
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1037,342 @@ private static byte[] getByteArray(ByteBuffer bb) {
@Override
public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest request) throws YarnException {
- throw new NotImplementedException("Code is not implemented");
+
+ // validate
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+ CallableStatement cstmt = null;
+
+ ReservationHomeSubCluster reservationHomeSubCluster =
request.getReservationHomeSubCluster();
+ ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+ SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+ SubClusterId subClusterHomeId = null;
+
+ try {
+
+ // Defined the sp_addReservationHomeSubCluster procedure
+ // this procedure requires 4 parameters
+ // Input parameters
+ // 1)IN reservationId_IN varchar(128)
+ // 2)IN homeSubCluster_IN varchar(256)
+ // Output parameters
+ // 3)OUT storedHomeSubCluster_OUT varchar(256)
+ // 4)OUT rowCount_OUT int
+
+ // Call procedure
+ cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ // 1)IN reservationId_IN varchar(128)
+ cstmt.setString("reservationId_IN", reservationId.toString());
+ // 2)IN homeSubCluster_IN varchar(256)
+ cstmt.setString("homeSubCluster_IN", subClusterId.getId());
+ // 3) OUT storedHomeSubCluster_OUT varchar(256)
+ cstmt.registerOutParameter("storedHomeSubCluster_OUT",
java.sql.Types.VARCHAR);
+ // 4) OUT rowCount_OUT int
+ cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ cstmt.executeUpdate();
+ long stopTime = clock.getTime();
+
+ // Get SubClusterHome
+ String subClusterHomeIdString =
cstmt.getString("storedHomeSubCluster_OUT");
+ subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+ // Get rowCount
+ int rowCount = cstmt.getInt("rowCount_OUT");
+
+ // For failover reason, we check the returned subClusterId.
+ // 1.If it is equal to the subClusterId we sent, the call added the new
+ // reservation into FederationStateStore.
+ // 2.If the call returns a different subClusterId
+ // it means we already tried to insert this reservation
+ // but a component (Router/StateStore/RM) failed during the submission.
+ if (subClusterId.equals(subClusterHomeId)) {
+ // if it is equal to 0
+ // it means the call did not add a new reservation into
FederationStateStore.
+ if (rowCount == 0) {
+ LOG.info("The reservation {} was not inserted in the StateStore
because it" +
+ " was already present in subCluster {}", reservationId,
subClusterHomeId);
+ } else if (rowCount != 1) {
+ // if it is different from 1
+ // it means the call had a wrong behavior. Maybe the database is not
set correctly.
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during the insertion of subCluster %s according
to reservation %s. " +
+ "The database expects to insert 1 record, but the number of " +
+ "inserted changes is greater than 1, " +
+ "please check the records of the database.",
+ subClusterId, reservationId);
+ }
+ } else {
+ // If it is different from 0,
+ // it means that there is a data situation that does not meet the
expectations,
+ // and an exception should be thrown at this time
+ if (rowCount != 0) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "The reservation %s does exist but was overwritten.",
reservationId);
+ }
+ LOG.info("Reservation: {} already present with subCluster: {}.",
+ reservationId, subClusterHomeId);
+ }
+
+ // Record successful call time
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to insert the newly generated reservation %s to subCluster
%s.",
+ reservationId, subClusterId);
+ } finally {
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
+ }
+
+ return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
}
@Override
public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest request) throws YarnException {
- throw new NotImplementedException("Code is not implemented");
+ // validate
+ FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+ CallableStatement cstmt = null;
+ ReservationId reservationId = request.getReservationId();
+ SubClusterId subClusterId = null;
+
+ try {
+
+ // Defined the sp_getReservationHomeSubCluster procedure
+ // this procedure requires 2 parameters
+ // Input parameters
+ // 1)IN reservationId_IN varchar(128)
+ // Output parameters
+ // 2)OUT homeSubCluster_OUT varchar(256)
+
+ cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ // 1)IN reservationId_IN varchar(128)
+ cstmt.setString("reservationId_IN", reservationId.toString());
+ // 2)OUT homeSubCluster_OUT varchar(256)
+ cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ cstmt.execute();
+ long stopTime = clock.getTime();
+
+ // Get Result
+ String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
+
+ if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+ subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+ } else {
+ // If subClusterHomeIdString blank, we need to throw an exception
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Reservation %s does not exist", reservationId);
+ }
+
+ LOG.info("Got the information about the specified reservation {} in
subCluster = {}.",
+ reservationId, subClusterId);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to obtain the reservation information according to %s.",
reservationId);
+ } finally {
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
+ }
+
+ ReservationHomeSubCluster homeSubCluster =
Review Comment:
Thank you very much for helping to review the code, I will modify the code.
In the case of an exception, the exception should be thrown directly, so the
handling should be better.
> [RESERVATION] Federation StateStore: Support storage/retrieval of
> Reservations With SQL
> ---------------------------------------------------------------------------------------
>
> Key: YARN-11273
> URL: https://issues.apache.org/jira/browse/YARN-11273
> Project: Hadoop YARN
> Issue Type: Sub-task
> Components: federation
> Affects Versions: 3.4.0
> Reporter: fanshilun
> Assignee: fanshilun
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]