[
https://issues.apache.org/jira/browse/YARN-11349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644764#comment-17644764
]
ASF GitHub Bot commented on YARN-11349:
---------------------------------------
slfan1989 commented on code in PR #5169:
URL: https://github.com/apache/hadoop/pull/5169#discussion_r1043266394
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1353,45 +1386,426 @@ public Connection getConn() {
return conn;
}
+ /**
+ * SQLFederationStateStore Supports Store New MasterKey.
+ *
+ * @param request The request contains RouterMasterKey, which is an
abstraction for DelegationKey.
+ * @return routerMasterKeyResponse, the response contains the
RouterMasterKey.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest
request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse the parameters and serialize the DelegationKey as a string.
+ DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+ int keyId = delegationKey.getKeyId();
+ String delegationKeyStr =
FederationStateStoreUtils.encodeWritable(delegationKey);
+
+ // Step3. store data in database.
+ try {
+
+ FederationSQLOutParameter<Integer> rowCountOUT =
+ new FederationSQLOutParameter<>("rowCount_OUT",
java.sql.Types.INTEGER, Integer.class);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ Integer rowCount = getRowCountByProcedureSQL(CALL_SP_ADD_MASTERKEY,
keyId,
+ delegationKeyStr, rowCountOUT);
+ long stopTime = clock.getTime();
+
+ // We hope that 1 record can be written to the database.
+ // If the number of records is not 1, it means that the data was written
incorrectly.
+ if (rowCount != 1) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during the insertion of masterKey, keyId = %s. " +
+ "please check the records of the database.",
String.valueOf(keyId));
+ }
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to insert the newly masterKey, keyId = %s.",
String.valueOf(keyId));
+ }
+
+ // Step4. Query Data from the database and return the result.
+ return getMasterKeyByDelegationKey(request);
}
+ /**
+ * SQLFederationStateStore Supports Remove MasterKey.
+ *
+ * Defined the sp_deleteMasterKey procedure.
+ * This procedure requires 1 input parameters, 1 output parameters.
+ * Input parameters
+ * 1. IN keyId_IN int
+ * Output parameters
+ * 2. OUT rowCount_OUT int
+ *
+ * @param request The request contains RouterMasterKey, which is an
abstraction for DelegationKey
+ * @return routerMasterKeyResponse, the response contains the
RouterMasterKey.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest
request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse parameters and get KeyId.
+ RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+ int paramKeyId = paramMasterKey.getKeyId();
+
+ // Step3. Clear data from database.
+ try {
+
+ // Execute the query
+ long startTime = clock.getTime();
+ FederationSQLOutParameter<Integer> rowCountOUT =
+ new FederationSQLOutParameter<>("rowCount_OUT",
java.sql.Types.INTEGER, Integer.class);
+ Integer rowCount = getRowCountByProcedureSQL(CALL_SP_DELETE_MASTERKEY,
+ paramKeyId, rowCountOUT);
+ long stopTime = clock.getTime();
+
+ // if it is equal to 0 it means the call
+ // did not delete the reservation from FederationStateStore
+ if (rowCount == 0) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "masterKeyId = %s does not exist.", String.valueOf(paramKeyId));
+ } else if (rowCount != 1) {
+ // if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during deleting the keyId %s. " +
+ "The database is expected to delete 1 record, " +
+ "but the number of deleted records returned by the database is
greater than 1, " +
+ "indicating that a duplicate masterKey occurred during the
deletion process.",
+ paramKeyId);
+ }
+
+ LOG.info("Delete from the StateStore the keyId: {}.", paramKeyId);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ return RouterMasterKeyResponse.newInstance(paramMasterKey);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to delete the keyId %s.", paramKeyId);
+ }
+
+ throw new YarnException("Unable to delete the masterKey, keyId = " +
paramKeyId);
}
+ /**
+ * SQLFederationStateStore Supports Remove MasterKey.
+ *
+ * Defined the sp_getMasterKey procedure.
+ * this procedure requires 2 parameters.
+ * Input parameters:
+ * 1. IN keyId_IN int
+ * Output parameters:
+ * 2. OUT masterKey_OUT varchar(1024)
+ *
+ * @param request The request contains RouterMasterKey, which is an
abstraction for DelegationKey
+ * @return routerMasterKeyResponse, the response contains the
RouterMasterKey.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterMasterKeyResponse
getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse parameters and get KeyId.
+ RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+ int paramKeyId = paramMasterKey.getKeyId();
+
+ // Step3: Call the stored procedure to get the result.
+ try {
+
+ FederationQueryRunner runner = new FederationQueryRunner();
+ FederationSQLOutParameter<String> masterKeyOUT =
+ new FederationSQLOutParameter<>("masterKey_OUT",
java.sql.Types.VARCHAR, String.class);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ RouterMasterKey routerMasterKey = runner.execute(
+ conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(),
paramKeyId, masterKeyOUT);
+ long stopTime = clock.getTime();
+
+ LOG.info("Got the information about the specified masterKey = {}
according to keyId = {}.",
+ routerMasterKey, paramKeyId);
+
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+
+ // Return query result.
+ return RouterMasterKeyResponse.newInstance(routerMasterKey);
+
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to obtain the masterKey information according to %s.",
+ String.valueOf(paramKeyId));
+ }
+
+ // Throw exception information
+ throw new YarnException(
+ "Unable to obtain the masterKey information according to " +
paramKeyId);
}
+ /**
+ * SQLFederationStateStore Supports Store RMDelegationTokenIdentifier.
+ *
+ * Defined the sp_addDelegationToken procedure.
+ * This procedure requires 4 input parameters, 1 output parameters.
+ * Input parameters:
+ * 1. IN sequenceNum_IN int
+ * 2. IN tokenIdent_IN varchar(1024)
+ * 3. IN token_IN varchar(1024)
+ * 4. IN renewDate_IN bigint
+ * Output parameters:
+ * 5. OUT rowCount_OUT int
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2. store data in database.
+ try {
+ long duration = addOrUpdateToken(request, true);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ throw new YarnException(e);
+ }
+
+ // Step3. Query Data from the database and return the result.
+ return getTokenByRouterStoreToken(request);
}
+ /**
+ * SQLFederationStateStore Supports Update RMDelegationTokenIdentifier.
+ *
+ * Defined the sp_updateDelegationToken procedure.
+ * This procedure requires 4 input parameters, 1 output parameters.
+ * Input parameters:
+ * 1. IN sequenceNum_IN int
+ * 2. IN tokenIdent_IN varchar(1024)
+ * 3. IN token_IN varchar(1024)
+ * 4. IN renewDate_IN bigint
+ * Output parameters:
+ * 5. OUT rowCount_OUT int
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2. update data in database.
+ try {
+ long duration = addOrUpdateToken(request, false);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ throw new YarnException(e);
+ }
+
+ // Step3. Query Data from the database and return the result.
+ return getTokenByRouterStoreToken(request);
}
+ /**
+ * Add Or Update RMDelegationTokenIdentifier.
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @param isAdd true, addData; false, updateData.
+ * @return method operation time.
+ * @throws IOException An IO Error occurred.
+ * @throws SQLException An SQL Error occurred.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ */
+ private long addOrUpdateToken(RouterRMTokenRequest request, boolean isAdd)
+ throws IOException, SQLException, YarnException {
+
+ // Parse parameters and get KeyId.
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+ YARNDelegationTokenIdentifier identifier =
routerStoreToken.getTokenIdentifier();
+ String tokenIdentifier =
FederationStateStoreUtils.encodeWritable(identifier);
+ String tokenInfo = routerStoreToken.getTokenInfo();
+ long renewDate = routerStoreToken.getRenewDate();
+ int sequenceNum = identifier.getSequenceNumber();
+
+ FederationQueryRunner runner = new FederationQueryRunner();
+ FederationSQLOutParameter<Integer> rowCountOUT =
+ new FederationSQLOutParameter<>("rowCount_OUT",
java.sql.Types.INTEGER, Integer.class);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ String procedure = isAdd ? CALL_SP_ADD_DELEGATIONTOKEN :
CALL_SP_UPDATE_DELEGATIONTOKEN;
+ Integer rowCount = runner.execute(conn, procedure, new
RowCountHandler("rowCount_OUT"),
+ sequenceNum, tokenIdentifier, tokenInfo, renewDate, rowCountOUT);
+ long stopTime = clock.getTime();
+
+ // Get rowCount
+ // In the process of updating the code, rowCount may be 0 or 1;
+ // if rowCount=1, it is as expected, indicating that we have updated the
Token correctly;
+ // if rowCount=0, it is not as expected,
+ // indicating that we have not updated the Token correctly.
+ if (rowCount != 1) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during the insertion of delegationToken, tokenId =
%s. " +
+ "Please check the records of the database.",
String.valueOf(sequenceNum));
+ }
+
+ // return execution time
+ return (stopTime - startTime);
+ }
+
+ /**
+ * SQLFederationStateStore Supports Remove RMDelegationTokenIdentifier.
+ *
+ * Defined the sp_deleteDelegationToken procedure.
+ * This procedure requires 1 input parameters, 1 output parameters.
+ * Input parameters:
+ * 1. IN sequenceNum_IN bigint
+ * Output parameters:
+ * 2. OUT rowCount_OUT int
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse parameters and get KeyId.
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+ YARNDelegationTokenIdentifier identifier =
routerStoreToken.getTokenIdentifier();
+ int sequenceNum = identifier.getSequenceNumber();
+
+ try {
+
+ FederationSQLOutParameter<Integer> rowCountOUT =
+ new FederationSQLOutParameter<>("rowCount_OUT",
java.sql.Types.INTEGER, Integer.class);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ Integer rowCount =
getRowCountByProcedureSQL(CALL_SP_DELETE_DELEGATIONTOKEN,
+ sequenceNum, rowCountOUT);
+ long stopTime = clock.getTime();
+
+ // if it is equal to 0 it means the call
+ // did not delete the reservation from FederationStateStore
+ if (rowCount == 0) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "TokenId %s does not exist", String.valueOf(sequenceNum));
+ } else if (rowCount != 1) {
+ // if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during deleting the delegationToken %s. " +
+ "The database is expected to delete 1 record, " +
+ "but the number of deleted records returned by the database is
greater than 1, " +
+ "indicating that a duplicate tokenId occurred during the deletion
process.",
+ String.valueOf(sequenceNum));
+ }
+
+ LOG.info("Delete from the StateStore the delegationToken, tokenId =
{}.", sequenceNum);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ return RouterRMTokenResponse.newInstance(routerStoreToken);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to delete the delegationToken, tokenId = %s.", sequenceNum);
+ }
+ throw new YarnException("Unable to delete the delegationToken, tokenId = "
+ sequenceNum);
}
+ /**
+ * The Router Supports GetTokenByRouterStoreToken.
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @return RouterRMTokenResponse.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest
request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse parameters and get KeyId.
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+ YARNDelegationTokenIdentifier identifier =
routerStoreToken.getTokenIdentifier();
+ int sequenceNum = identifier.getSequenceNumber();
+
+ try {
+ FederationQueryRunner runner = new FederationQueryRunner();
+ FederationSQLOutParameter<String> tokenIdentOUT =
+ new FederationSQLOutParameter<>("tokenIdent_OUT",
java.sql.Types.VARCHAR, String.class);
Review Comment:
Thank you very much for helping to review the code, I will fix it.
> [Federation] Router Support DelegationToken With SQL
> ----------------------------------------------------
>
> Key: YARN-11349
> URL: https://issues.apache.org/jira/browse/YARN-11349
> Project: Hadoop YARN
> Issue Type: Sub-task
> Components: federation, router
> Affects Versions: 3.4.0
> Reporter: Shilun Fan
> Assignee: Shilun Fan
> Priority: Major
> Labels: pull-request-available
>
> Router Support DelegationToken With SQLFederationStateStore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]