CRZbulabula commented on code in PR #17735:
URL: https://github.com/apache/iotdb/pull/17735#discussion_r3279235996
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv
env) {
}
}
- private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_TTL,
- new TSetTTLReq(
- Collections.singletonList(String.join(".",
plan.getPathPattern())),
- plan.getTTL(),
- plan.isDataBase()),
- dataNodeLocationMap);
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(plan.getPathPattern(), plan.getTTL(),
plan.isDataBase()));
+ if (hasFailedDataNode(clientHandler)) {
+ LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+ setFailure(
+ new ProcedureException(
+ new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+ }
+ }
+
+ private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+ if (previousTTLStateCaptured) {
+ return;
+ }
+ final Map<String, Long> ttlMap =
env.getConfigManager().getTTLManager().getAllTTL();
+ previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+ if (plan.isDataBase()) {
+ previousDatabaseWildcardTTL =
+ getTTLOrDefault(ttlMap,
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+ }
+ previousTTLStateCaptured = true;
+ }
+
+ protected TSStatus writeConfigNodePlan(
+ final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+ try {
+ return env.getConfigManager()
+ .getConsensusManager()
+ .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) :
setTTLPlan);
+ } catch (ConsensusException e) {
+
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ final TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ res.setMessage(e.getMessage());
+ return res;
+ }
+ }
+
+ protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final
TSetTTLReq req) {
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req,
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
- for (TSStatus status : statusMap.values()) {
- // all dataNodes must clear the related schemaengine cache
+ return clientHandler;
+ }
+
+ private TSetTTLReq buildSetTTLReq(
+ final String[] pathPattern, final long ttl, final boolean isDataBase) {
+ return new TSetTTLReq(
+ Collections.singletonList(String.join(".", pathPattern)), ttl,
isDataBase);
+ }
+
+ private boolean hasFailedDataNode(
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+ if (!clientHandler.getRequestIndices().isEmpty()) {
+ return true;
+ }
+ for (TSStatus status : clientHandler.getResponseMap().values()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
- setFailure(
- new ProcedureException(
- new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
- return;
+ return true;
}
}
+ return false;
+ }
+
+ private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[]
pathPattern) {
+ return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+ }
+
+ private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+ final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length +
1);
+ pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ return pathNodes;
+ }
+
+ private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+ if (plan.isDataBase()) {
+ restoreTTLOnConfigNode(
+ env, getDatabaseWildcardPathPattern(plan.getPathPattern()),
previousDatabaseWildcardTTL);
+ }
+ }
+
+ private void restoreTTLOnConfigNode(
+ final ConfigNodeProcedureEnv env, final String[] pathPattern, final long
ttl)
+ throws ProcedureException {
+ final SetTTLPlan rollbackPlan =
+ new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL :
ttl);
+ rollbackPlan.setDataBase(false);
Review Comment:
回滚时显式 `setDataBase(false)` 是正确的——database TTL 在 set 时会自动写 `db` + `db.**`
两条,但回滚阶段我们已经在外层 `rollbackConfigNodeTTL` 里分别对两个路径各发一次,如果这里再 `isDataBase=true`
会重复处理 `db.**`,把刚刚还原的旧值再覆盖一次。建议加一句 comment 说明「by design: 由外层分别 restore,这里不能让
SetTTLPlan 再自动扩展通配符」。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]