CRZbulabula commented on code in PR #17735:
URL: https://github.com/apache/iotdb/pull/17735#discussion_r3279235969


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -86,18 +93,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, 
SetTTLState state)
     }
   }
 
-  private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
-    TSStatus res;
-    try {
-      res =
-          env.getConfigManager()
-              .getConsensusManager()
-              .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : 
this.plan);
-    } catch (ConsensusException e) {
-      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
-      res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(e.getMessage());
-    }
+  protected void setConfigNodeTTL(final ConfigNodeProcedureEnv env) {
+    capturePreviousTTLState(env);
+    final TSStatus res = writeConfigNodePlan(env, plan);

Review Comment:
   **(中等严重)崩溃恢复下 `previousTTL` 可能被错误覆盖**
   
   `capturePreviousTTLState` → `writeConfigNodePlan` 在同一个 `executeFromState` 
调用内,而 procedure 框架只在 state 切换时持久化 `previousTTLStateCaptured`/`previousTTL`。若 
ConfigNode 在 consensus 写入成功后、`setNextState(UPDATE_DATANODE_CACHE)` 
持久化之前崩溃,重启时会重放 `SET_CONFIGNODE_TTL`:此时 `previousTTLStateCaptured = 
false`,`getAllTTL()` 返回的已经是**新值**,被记录为 previousTTL。后续若 DataNode 
更新失败触发回滚,就会把「新值」当成「旧值」回滚,失去回滚意义。
   
   建议:把 capture 拆成独立的前置 state(例如 `CAPTURE_PREVIOUS_TTL` → `SET_CONFIGNODE_TTL` 
→ `UPDATE_DATANODE_CACHE`),让 captured snapshot 在写 consensus 之前先随 procedure 
状态一起被持久化。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -47,14 +49,19 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
 public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEnv, SetTTLState> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SetTTLProcedure.class);
+  private static final long TTL_NOT_EXIST = Long.MIN_VALUE;
 
   private SetTTLPlan plan;
+  private long previousTTL = TTL_NOT_EXIST;

Review Comment:
   建议加一行注释,说明为什么 sentinel 选 `Long.MIN_VALUE` 而不是 
`TTLCache.NULL_TTL`(`-1`)——这两个值在回滚路径里语义不同:`TTL_NOT_EXIST` 表示「写之前根本没设过 TTL」,而 
`NULL_TTL` 是「明确 unset」的 marker。混淆这两者会导致回滚行为错乱,值得留个 hint 给后来读代码的人。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -159,14 +293,24 @@ public void serialize(DataOutputStream stream) throws 
IOException {
             : ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+    stream.writeBoolean(previousTTLStateCaptured);
+    stream.writeLong(previousTTL);
+    stream.writeLong(previousDatabaseWildcardTTL);
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      ReadWriteIOUtils.readInt(byteBuffer);
+      final int length = ReadWriteIOUtils.readInt(byteBuffer);
+      final int position = byteBuffer.position();
       this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
+      byteBuffer.position(position + length);
+      if (byteBuffer.remaining() >= 17) {

Review Comment:
   `>= 17` 是 magic number(`1 byte boolean + 2 * 8 bytes long`)。建议抽成常量,例如:
   ```java
   private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2;
   ...
   if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) { ... }
   ```
   后续如果再加字段也只改一处。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv 
env) {
     }
   }
 
-  private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+  protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.SET_TTL,
-            new TSetTTLReq(
-                Collections.singletonList(String.join(".", 
plan.getPathPattern())),
-                plan.getTTL(),
-                plan.isDataBase()),
-            dataNodeLocationMap);
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), 
plan.isDataBase()));
+    if (hasFailedDataNode(clientHandler)) {
+      LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+      setFailure(
+          new ProcedureException(
+              new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+    }
+  }
+
+  private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+    if (previousTTLStateCaptured) {
+      return;
+    }
+    final Map<String, Long> ttlMap = 
env.getConfigManager().getTTLManager().getAllTTL();
+    previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+    if (plan.isDataBase()) {
+      previousDatabaseWildcardTTL =
+          getTTLOrDefault(ttlMap, 
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+    }
+    previousTTLStateCaptured = true;
+  }
+
+  protected TSStatus writeConfigNodePlan(
+      final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+    try {
+      return env.getConfigManager()
+          .getConsensusManager()
+          .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : 
setTTLPlan);
+    } catch (ConsensusException e) {
+      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      final TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
+    }
+  }
+
+  protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final 
TSetTTLReq req) {
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, 
dataNodeLocationMap);
     
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
-    for (TSStatus status : statusMap.values()) {
-      // all dataNodes must clear the related schemaengine cache
+    return clientHandler;
+  }
+
+  private TSetTTLReq buildSetTTLReq(
+      final String[] pathPattern, final long ttl, final boolean isDataBase) {
+    return new TSetTTLReq(
+        Collections.singletonList(String.join(".", pathPattern)), ttl, 
isDataBase);
+  }
+
+  private boolean hasFailedDataNode(
+      final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+    if (!clientHandler.getRequestIndices().isEmpty()) {
+      return true;
+    }
+    for (TSStatus status : clientHandler.getResponseMap().values()) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
-        setFailure(
-            new ProcedureException(
-                new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
-        return;
+        return true;
       }
     }
+    return false;
+  }
+
+  private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[] 
pathPattern) {
+    return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+  }
+
+  private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+    final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 
1);
+    pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+    return pathNodes;
+  }
+
+  private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnConfigNode(
+          env, getDatabaseWildcardPathPattern(plan.getPathPattern()), 
previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnConfigNode(
+      final ConfigNodeProcedureEnv env, final String[] pathPattern, final long 
ttl)
+      throws ProcedureException {
+    final SetTTLPlan rollbackPlan =
+        new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : 
ttl);
+    rollbackPlan.setDataBase(false);

Review Comment:
   回滚时显式 `setDataBase(false)` 是正确的——database TTL 在 set 时会自动写 `db` + `db.**` 
两条,但回滚阶段我们已经在外层 `rollbackConfigNodeTTL` 里分别对两个路径各发一次,如果这里再 `isDataBase=true` 
会重复处理 `db.**`,把刚刚还原的旧值再覆盖一次。建议加一句 comment 说明「by design: 由外层分别 restore,这里不能让 
SetTTLPlan 再自动扩展通配符」。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java:
##########
@@ -70,7 +70,8 @@ public TSStatus setTTL(SetTTLPlan plan) {
     try {
       // check ttl rule capacity
       final int tTlRuleCapacity = 
CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
-      if (getTTLCount() >= tTlRuleCapacity) {
+      final int newTTLRuleCount = calculateNewTTLRuleCount(plan);
+      if (newTTLRuleCount > 0 && ttlCache.getTtlCount() + newTTLRuleCount > 
tTlRuleCapacity) {

Review Comment:
   **容量校验逻辑修正得很到位**:`newTTLRuleCount > 0` 短路了「更新现有规则」的场景,这正好让 
`testUpdateExistingTTLWhenCurrentStateIsAlreadyOversize` 中「当前已超量但只是 update」也能成功。
   
   一个边界值得确认:当 `tTlRuleCapacity` 被运维下调到 0 时(异常配置),`newTTLRuleCount > 0` 
仍会拒绝任何新建,这是期望的行为;但建议在错误消息里把 `tTlRuleCapacity` 与 `ttlCache.getTtlCount() + 
newTTLRuleCount` 都打出来,便于现场排查到底是「容量被调小」还是「真的写满了」。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java:
##########
@@ -92,6 +93,20 @@ public TSStatus setTTL(SetTTLPlan plan) {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  private int calculateNewTTLRuleCount(SetTTLPlan plan) {
+    int newTTLRuleCount = getNewTTLRuleCount(plan.getPathPattern());
+    if (plan.isDataBase()) {
+      String[] pathNodes = Arrays.copyOf(plan.getPathPattern(), 
plan.getPathPattern().length + 1);
+      pathNodes[pathNodes.length - 1] = 
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+      newTTLRuleCount += getNewTTLRuleCount(pathNodes);
+    }
+    return newTTLRuleCount;
+  }
+
+  private int getNewTTLRuleCount(String[] pathNodes) {
+    return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL ? 1 : 0;

Review Comment:
   `getNewTTLRuleCount` 命名稍微有歧义——它实际返回的是「这个路径是否会新增一个 rule」(0/1),不是「新增 rule 
的数量」。建议改名为 `isNewTTLRule`(返回 boolean)或 `countNewRule`,在 
`calculateNewTTLRuleCount` 里直接累加 `boolean ? 1 : 0`,语义更清晰。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv 
env) {
     }
   }
 
-  private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+  protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.SET_TTL,
-            new TSetTTLReq(
-                Collections.singletonList(String.join(".", 
plan.getPathPattern())),
-                plan.getTTL(),
-                plan.isDataBase()),
-            dataNodeLocationMap);
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), 
plan.isDataBase()));
+    if (hasFailedDataNode(clientHandler)) {
+      LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+      setFailure(
+          new ProcedureException(
+              new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+    }
+  }
+
+  private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+    if (previousTTLStateCaptured) {
+      return;
+    }
+    final Map<String, Long> ttlMap = 
env.getConfigManager().getTTLManager().getAllTTL();

Review Comment:
   **性能:全量 consensus read 仅为读 1~2 条**
   
   `TTLManager.getAllTTL()` 走的是 
`getConsensusManager().read(showTTLPlan)`,会通过共识层把整张 TTL 表读回来,再做 DFS 序列化成 map。每个 
`SetTTLProcedure` 都要付这个代价,在 TTL 规则较多时(尤其接近 `tTlRuleCapacity` 上限的集群)开销不小。
   
   建议在 `TTLManager` / `TTLInfo` 上加一个针对单路径的 `getTTL(String[] pathPattern)`(或暴露 
`TTLCache.getLastNodeTTL` 的 wrapper),只读需要的一两条。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -86,18 +93,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, 
SetTTLState state)
     }
   }
 
-  private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
-    TSStatus res;
-    try {
-      res =
-          env.getConfigManager()
-              .getConsensusManager()
-              .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : 
this.plan);
-    } catch (ConsensusException e) {
-      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
-      res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(e.getMessage());
-    }
+  protected void setConfigNodeTTL(final ConfigNodeProcedureEnv env) {

Review Comment:
   几个 `protected` 方法(`setConfigNodeTTL` / `updateDataNodeTTL` / 
`writeConfigNodePlan` / `sendTTLRequest`)只是为了测试里的 `TestingSetTTLProcedure` 
override,生产代码并不需要子类。建议改为 package-private(去掉 `protected`),把测试类挪到同一个 package 
即可,缩小公共接口面。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -159,14 +293,24 @@ public void serialize(DataOutputStream stream) throws 
IOException {
             : ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+    stream.writeBoolean(previousTTLStateCaptured);
+    stream.writeLong(previousTTL);
+    stream.writeLong(previousDatabaseWildcardTTL);
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      ReadWriteIOUtils.readInt(byteBuffer);
+      final int length = ReadWriteIOUtils.readInt(byteBuffer);
+      final int position = byteBuffer.position();
       this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
+      byteBuffer.position(position + length);

Review Comment:
   这行 `byteBuffer.position(position + length)` 
是必要的修复(不是冗余),建议加一行注释解释原因——否则后来人很容易觉得「`Factory.create` 自己应该已经读对位置了,这行多余」就给删了。
   
   实际原因:`serializeToByteBuffer()` 用 `ByteBuffer.wrap(publicBAOS.getBuf(), 0, 
size())` 包裹,而 `ReadWriteIOUtils.write(ByteBuffer, OutputStream)` 写的是 
`byteBuffer.capacity()`(即底层数组长度,可能 > 实际数据 size),后面会跟着多余的 padding 0 字节。如果不强制 
reposition,新加的 `boolean + 2*long` 就会从 padding 区读出来。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv 
env) {
     }
   }
 
-  private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+  protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.SET_TTL,
-            new TSetTTLReq(
-                Collections.singletonList(String.join(".", 
plan.getPathPattern())),
-                plan.getTTL(),
-                plan.isDataBase()),
-            dataNodeLocationMap);
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), 
plan.isDataBase()));
+    if (hasFailedDataNode(clientHandler)) {
+      LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+      setFailure(
+          new ProcedureException(
+              new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+    }
+  }
+
+  private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+    if (previousTTLStateCaptured) {
+      return;
+    }
+    final Map<String, Long> ttlMap = 
env.getConfigManager().getTTLManager().getAllTTL();
+    previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+    if (plan.isDataBase()) {
+      previousDatabaseWildcardTTL =
+          getTTLOrDefault(ttlMap, 
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+    }
+    previousTTLStateCaptured = true;
+  }
+
+  protected TSStatus writeConfigNodePlan(
+      final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+    try {
+      return env.getConfigManager()
+          .getConsensusManager()
+          .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : 
setTTLPlan);
+    } catch (ConsensusException e) {
+      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      final TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
+    }
+  }
+
+  protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final 
TSetTTLReq req) {
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, 
dataNodeLocationMap);
     
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
-    for (TSStatus status : statusMap.values()) {
-      // all dataNodes must clear the related schemaengine cache
+    return clientHandler;
+  }
+
+  private TSetTTLReq buildSetTTLReq(
+      final String[] pathPattern, final long ttl, final boolean isDataBase) {
+    return new TSetTTLReq(
+        Collections.singletonList(String.join(".", pathPattern)), ttl, 
isDataBase);
+  }
+
+  private boolean hasFailedDataNode(
+      final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+    if (!clientHandler.getRequestIndices().isEmpty()) {
+      return true;
+    }
+    for (TSStatus status : clientHandler.getResponseMap().values()) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
-        setFailure(
-            new ProcedureException(
-                new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
-        return;
+        return true;
       }
     }
+    return false;
+  }
+
+  private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[] 
pathPattern) {
+    return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+  }
+
+  private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+    final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 
1);
+    pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+    return pathNodes;
+  }
+
+  private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnConfigNode(
+          env, getDatabaseWildcardPathPattern(plan.getPathPattern()), 
previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnConfigNode(
+      final ConfigNodeProcedureEnv env, final String[] pathPattern, final long 
ttl)
+      throws ProcedureException {
+    final SetTTLPlan rollbackPlan =
+        new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : 
ttl);

Review Comment:
   这里的语义转换值得加一行 inline 注释:`ttl == TTL_NOT_EXIST`(写前不存在)→ 构造 `SetTTLPlan` with 
`NULL_TTL`,然后被 `ConfigPlanExecutor.SetTTL` 分支(`getTTL() == NULL_TTL` 时)路由到 
`ttlInfo.unsetTTL`。这是「靠 plan 的 ttl 字段作为 set/unset 
信号」的隐式约定,跨文件理解成本高,标注一下能省后人不少时间。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv 
env) {
     }
   }
 
-  private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+  protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.SET_TTL,
-            new TSetTTLReq(
-                Collections.singletonList(String.join(".", 
plan.getPathPattern())),
-                plan.getTTL(),
-                plan.isDataBase()),
-            dataNodeLocationMap);
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), 
plan.isDataBase()));
+    if (hasFailedDataNode(clientHandler)) {
+      LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+      setFailure(
+          new ProcedureException(
+              new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+    }
+  }
+
+  private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+    if (previousTTLStateCaptured) {
+      return;
+    }
+    final Map<String, Long> ttlMap = 
env.getConfigManager().getTTLManager().getAllTTL();
+    previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+    if (plan.isDataBase()) {
+      previousDatabaseWildcardTTL =
+          getTTLOrDefault(ttlMap, 
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+    }
+    previousTTLStateCaptured = true;
+  }
+
+  protected TSStatus writeConfigNodePlan(
+      final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+    try {
+      return env.getConfigManager()
+          .getConsensusManager()
+          .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : 
setTTLPlan);
+    } catch (ConsensusException e) {
+      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      final TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
+    }
+  }
+
+  protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final 
TSetTTLReq req) {
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, 
dataNodeLocationMap);
     
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
-    for (TSStatus status : statusMap.values()) {
-      // all dataNodes must clear the related schemaengine cache
+    return clientHandler;
+  }
+
+  private TSetTTLReq buildSetTTLReq(
+      final String[] pathPattern, final long ttl, final boolean isDataBase) {
+    return new TSetTTLReq(
+        Collections.singletonList(String.join(".", pathPattern)), ttl, 
isDataBase);
+  }
+
+  private boolean hasFailedDataNode(
+      final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+    if (!clientHandler.getRequestIndices().isEmpty()) {
+      return true;
+    }
+    for (TSStatus status : clientHandler.getResponseMap().values()) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
-        setFailure(
-            new ProcedureException(
-                new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
-        return;
+        return true;
       }
     }
+    return false;
+  }
+
+  private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[] 
pathPattern) {
+    return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+  }
+
+  private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+    final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 
1);
+    pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+    return pathNodes;
+  }
+
+  private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnConfigNode(
+          env, getDatabaseWildcardPathPattern(plan.getPathPattern()), 
previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnConfigNode(
+      final ConfigNodeProcedureEnv env, final String[] pathPattern, final long 
ttl)
+      throws ProcedureException {
+    final SetTTLPlan rollbackPlan =
+        new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : 
ttl);
+    rollbackPlan.setDataBase(false);
+    final TSStatus status = writeConfigNodePlan(env, rollbackPlan);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ProcedureException(
+          new MetadataException(
+              "Rollback ConfigNode ttl failed for "
+                  + String.join(".", pathPattern)
+                  + ": "
+                  + status.getMessage()));
+    }
+  }
+
+  private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+    restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), 
previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnDataNodes(
+          dataNodeLocationMap,
+          getDatabaseWildcardPathPattern(plan.getPathPattern()),
+          previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnDataNodes(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      final String[] pathPattern,
+      final long ttl)
+      throws ProcedureException {
+    if (dataNodeLocationMap.isEmpty()) {
+      return;
+    }
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? 
TTLCache.NULL_TTL : ttl, false));
+    if (hasFailedDataNode(clientHandler)) {
+      throw new ProcedureException(
+          new MetadataException(
+              "Rollback dataNode ttl cache failed for " + String.join(".", 
pathPattern)));
+    }
   }
 
   @Override
-  protected void rollbackState(
-      ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState)
-      throws IOException, InterruptedException, ProcedureException {}
+  protected void rollbackState(final ConfigNodeProcedureEnv env, final 
SetTTLState setTTLState)
+      throws IOException, InterruptedException, ProcedureException {
+    if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || 
!previousTTLStateCaptured) {
+      return;
+    }
+    ProcedureException rollbackFailure = null;
+    try {
+      rollbackConfigNodeTTL(env);
+    } catch (ProcedureException e) {
+      LOGGER.error("Failed to rollback ConfigNode ttl state.", e);
+      rollbackFailure = e;
+    }
+    try {
+      rollbackDataNodeTTL(env);

Review Comment:
   回滚里聚合错误的策略:即使 ConfigNode 回滚失败,DataNode 
回滚仍会尝试。这通常是合理的(best-effort),但目前只保留**第一个**异常向上抛,后续异常只记 error log。
   
   两个建议:
   1. 改成 `addSuppressed` 把第二个异常也带出来,排查时能拿到完整链路。
   2. 在方法上加一行注释说明「best-effort:无论哪一步失败,都继续尝试另一步;最早出现的异常作为最终抛出」,让调用方知道这里不是 
fail-fast。



##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java:
##########
@@ -65,4 +83,234 @@ public void serializeDeserializeTest() throws IOException, 
IllegalPathException
     buffer.clear();
     byteArrayOutputStream.reset();
   }
+
+  @Test
+  public void serializeDeserializeTestWithCapturedRollbackState() throws 
Exception {
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 
2000L);
+    setTTLPlan.setDataBase(true);
+    final TestingSetTTLProcedure procedure = new 
TestingSetTTLProcedure(setTTLPlan);
+
+    final Map<String, Long> ttlMap = new HashMap<>();
+    ttlMap.put("root.**", Long.MAX_VALUE);
+    ttlMap.put("root.db", 500L);
+    ttlMap.put("root.db.**", 600L);
+
+    procedure.executeFromState(mockProcedureEnv(ttlMap), 
SetTTLState.SET_CONFIGNODE_TTL);
+
+    procedure.serialize(outputStream);
+    final ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    final SetTTLProcedure deserializedProcedure =
+        (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer);
+    assertSerializedProcedure(
+        deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L, 
false);
+  }

Review Comment:
   建议补一个「旧格式向前兼容」的反序列化测试:手工拼出**只有** `[length][plan bytes]`(不带 boolean + 2 longs 
那 17 字节)的字节流,然后 deserialize,断言 `previousTTLStateCaptured == false`、`previousTTL 
== Long.MIN_VALUE`。当前的 round-trip 测试只覆盖了「新版写、新版读」的路径,没法保证滚动升级中「旧 procedure 
持久化文件 → 新版 ConfigNode」的兼容性不会回归。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -180,12 +324,21 @@ public boolean equals(Object o) {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    return this.plan.equals(((SetTTLProcedure) o).plan)
-        && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe);
+    final SetTTLProcedure that = (SetTTLProcedure) o;
+    return this.isGeneratedByPipe == that.isGeneratedByPipe
+        && this.previousTTL == that.previousTTL
+        && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL
+        && this.previousTTLStateCaptured == that.previousTTLStateCaptured

Review Comment:
   把 `previousTTL` / `previousDatabaseWildcardTTL` / `previousTTLStateCaptured` 
纳入 `equals` / `hashCode` 与序列化保持一致,但确认一下 procedure 队列里是否有按 `equals` 
判去重的逻辑——如果有,**同一个** plan 在不同 captured 阶段会被视为不同 procedure,可能影响幂等行为。如果没有这类调用就忽略本条。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to