CRZbulabula commented on code in PR #17735:
URL: https://github.com/apache/iotdb/pull/17735#discussion_r3279235988
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv
env) {
}
}
- private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_TTL,
- new TSetTTLReq(
- Collections.singletonList(String.join(".",
plan.getPathPattern())),
- plan.getTTL(),
- plan.isDataBase()),
- dataNodeLocationMap);
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(plan.getPathPattern(), plan.getTTL(),
plan.isDataBase()));
+ if (hasFailedDataNode(clientHandler)) {
+ LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+ setFailure(
+ new ProcedureException(
+ new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+ }
+ }
+
+ private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+ if (previousTTLStateCaptured) {
+ return;
+ }
+ final Map<String, Long> ttlMap =
env.getConfigManager().getTTLManager().getAllTTL();
+ previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+ if (plan.isDataBase()) {
+ previousDatabaseWildcardTTL =
+ getTTLOrDefault(ttlMap,
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+ }
+ previousTTLStateCaptured = true;
+ }
+
+ protected TSStatus writeConfigNodePlan(
+ final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+ try {
+ return env.getConfigManager()
+ .getConsensusManager()
+ .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) :
setTTLPlan);
+ } catch (ConsensusException e) {
+
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ final TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ res.setMessage(e.getMessage());
+ return res;
+ }
+ }
+
+ protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final
TSetTTLReq req) {
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req,
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
- for (TSStatus status : statusMap.values()) {
- // all dataNodes must clear the related schemaengine cache
+ return clientHandler;
+ }
+
+ private TSetTTLReq buildSetTTLReq(
+ final String[] pathPattern, final long ttl, final boolean isDataBase) {
+ return new TSetTTLReq(
+ Collections.singletonList(String.join(".", pathPattern)), ttl,
isDataBase);
+ }
+
+ private boolean hasFailedDataNode(
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+ if (!clientHandler.getRequestIndices().isEmpty()) {
+ return true;
+ }
+ for (TSStatus status : clientHandler.getResponseMap().values()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
- setFailure(
- new ProcedureException(
- new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
- return;
+ return true;
}
}
+ return false;
+ }
+
+ private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[]
pathPattern) {
+ return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+ }
+
+ private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+ final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length +
1);
+ pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ return pathNodes;
+ }
+
+ private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+ if (plan.isDataBase()) {
+ restoreTTLOnConfigNode(
+ env, getDatabaseWildcardPathPattern(plan.getPathPattern()),
previousDatabaseWildcardTTL);
+ }
+ }
+
+ private void restoreTTLOnConfigNode(
+ final ConfigNodeProcedureEnv env, final String[] pathPattern, final long
ttl)
+ throws ProcedureException {
+ final SetTTLPlan rollbackPlan =
+ new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL :
ttl);
Review Comment:
这里的语义转换值得加一行 inline 注释:`ttl == TTL_NOT_EXIST`(写前不存在)→ 构造 `SetTTLPlan` with
`NULL_TTL`,然后被 `ConfigPlanExecutor.SetTTL` 分支(`getTTL() == NULL_TTL` 时)路由到
`ttlInfo.unsetTTL`。这是「靠 plan 的 ttl 字段作为 set/unset
信号」的隐式约定,跨文件理解成本高,标注一下能省后人不少时间。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]