CRZbulabula commented on code in PR #17735:
URL: https://github.com/apache/iotdb/pull/17735#discussion_r3279236000


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv 
env) {
     }
   }
 
-  private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+  protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.SET_TTL,
-            new TSetTTLReq(
-                Collections.singletonList(String.join(".", 
plan.getPathPattern())),
-                plan.getTTL(),
-                plan.isDataBase()),
-            dataNodeLocationMap);
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), 
plan.isDataBase()));
+    if (hasFailedDataNode(clientHandler)) {
+      LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+      setFailure(
+          new ProcedureException(
+              new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+    }
+  }
+
+  private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+    if (previousTTLStateCaptured) {
+      return;
+    }
+    final Map<String, Long> ttlMap = 
env.getConfigManager().getTTLManager().getAllTTL();
+    previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+    if (plan.isDataBase()) {
+      previousDatabaseWildcardTTL =
+          getTTLOrDefault(ttlMap, 
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+    }
+    previousTTLStateCaptured = true;
+  }
+
+  protected TSStatus writeConfigNodePlan(
+      final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+    try {
+      return env.getConfigManager()
+          .getConsensusManager()
+          .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : 
setTTLPlan);
+    } catch (ConsensusException e) {
+      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      final TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
+    }
+  }
+
+  protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final 
TSetTTLReq req) {
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, 
dataNodeLocationMap);
     
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
-    for (TSStatus status : statusMap.values()) {
-      // all dataNodes must clear the related schemaengine cache
+    return clientHandler;
+  }
+
+  private TSetTTLReq buildSetTTLReq(
+      final String[] pathPattern, final long ttl, final boolean isDataBase) {
+    return new TSetTTLReq(
+        Collections.singletonList(String.join(".", pathPattern)), ttl, 
isDataBase);
+  }
+
+  private boolean hasFailedDataNode(
+      final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+    if (!clientHandler.getRequestIndices().isEmpty()) {
+      return true;
+    }
+    for (TSStatus status : clientHandler.getResponseMap().values()) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
-        setFailure(
-            new ProcedureException(
-                new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
-        return;
+        return true;
       }
     }
+    return false;
+  }
+
+  private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[] 
pathPattern) {
+    return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+  }
+
+  private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+    final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 
1);
+    pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+    return pathNodes;
+  }
+
+  private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnConfigNode(
+          env, getDatabaseWildcardPathPattern(plan.getPathPattern()), 
previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnConfigNode(
+      final ConfigNodeProcedureEnv env, final String[] pathPattern, final long 
ttl)
+      throws ProcedureException {
+    final SetTTLPlan rollbackPlan =
+        new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : 
ttl);
+    rollbackPlan.setDataBase(false);
+    final TSStatus status = writeConfigNodePlan(env, rollbackPlan);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ProcedureException(
+          new MetadataException(
+              "Rollback ConfigNode ttl failed for "
+                  + String.join(".", pathPattern)
+                  + ": "
+                  + status.getMessage()));
+    }
+  }
+
+  private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+    restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), 
previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnDataNodes(
+          dataNodeLocationMap,
+          getDatabaseWildcardPathPattern(plan.getPathPattern()),
+          previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnDataNodes(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      final String[] pathPattern,
+      final long ttl)
+      throws ProcedureException {
+    if (dataNodeLocationMap.isEmpty()) {
+      return;
+    }
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? 
TTLCache.NULL_TTL : ttl, false));
+    if (hasFailedDataNode(clientHandler)) {
+      throw new ProcedureException(
+          new MetadataException(
+              "Rollback dataNode ttl cache failed for " + String.join(".", 
pathPattern)));
+    }
   }
 
   @Override
-  protected void rollbackState(
-      ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState)
-      throws IOException, InterruptedException, ProcedureException {}
+  protected void rollbackState(final ConfigNodeProcedureEnv env, final 
SetTTLState setTTLState)
+      throws IOException, InterruptedException, ProcedureException {
+    if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || 
!previousTTLStateCaptured) {
+      return;
+    }
+    ProcedureException rollbackFailure = null;
+    try {
+      rollbackConfigNodeTTL(env);
+    } catch (ProcedureException e) {
+      LOGGER.error("Failed to rollback ConfigNode ttl state.", e);
+      rollbackFailure = e;
+    }
+    try {
+      rollbackDataNodeTTL(env);

Review Comment:
   回滚里聚合错误的策略:即使 ConfigNode 回滚失败,DataNode 
回滚仍会尝试。这通常是合理的(best-effort),但目前只保留**第一个**异常向上抛,后续异常只记 error log。
   
   两个建议:
   1. 改成 `addSuppressed` 把第二个异常也带出来,排查时能拿到完整链路。
   2. 在方法上加一行注释说明「best-effort:无论哪一步失败,都继续尝试另一步;最早出现的异常作为最终抛出」,让调用方知道这里不是 
fail-fast。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -159,14 +293,24 @@ public void serialize(DataOutputStream stream) throws 
IOException {
             : ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+    stream.writeBoolean(previousTTLStateCaptured);
+    stream.writeLong(previousTTL);
+    stream.writeLong(previousDatabaseWildcardTTL);
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      ReadWriteIOUtils.readInt(byteBuffer);
+      final int length = ReadWriteIOUtils.readInt(byteBuffer);
+      final int position = byteBuffer.position();
       this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
+      byteBuffer.position(position + length);
+      if (byteBuffer.remaining() >= 17) {

Review Comment:
   `>= 17` 是 magic number(`1 byte boolean + 2 * 8 bytes long`)。建议抽成常量,例如:
   ```java
   private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2;
   ...
   if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) { ... }
   ```
   后续如果再加字段也只改一处。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to