CRZbulabula commented on code in PR #17279:
URL: https://github.com/apache/iotdb/pull/17279#discussion_r2929683212


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java:
##########
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import 
org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import 
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Procedure for checking and restoring data partition table integrity. This 
procedure scans all
+ * DataNodes to detect missing data partitions and restores the 
DataPartitionTable on the ConfigNode
+ * Leader.
+ */
+public class DataPartitionTableIntegrityCheckProcedure
+    extends StateMachineProcedure<
+        ConfigNodeProcedureEnv, 
DataPartitionTableIntegrityCheckProcedureState> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class);
+
+  private static final int MAX_RETRY_COUNT = 3;
+  private static final long HEART_BEAT_REQUEST_RATE = 60000;
+
+  NodeManager dataNodeManager;
+  private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
+
+  // ============Need serialize BEGIN=============/
+  /** Collected earliest timeslots from DataNodes: database -> earliest 
timeslot */
+  private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>();
+
+  /** DataPartitionTables collected from DataNodes: dataNodeId -> 
DataPartitionTable */
+  private Map<Integer, DataPartitionTable> dataPartitionTables = new 
ConcurrentHashMap<>();
+
+  private Set<String> lostDataPartitionsOfDatabases = new HashSet<>();
+
+  /** Final merged DataPartitionTable */
+  private DataPartitionTable finalDataPartitionTable;
+
+  private static Set<TDataNodeConfiguration> skipDataNodes = new HashSet<>();
+  private static Set<TDataNodeConfiguration> failedDataNodes = new HashSet<>();
+
+  private static ScheduledExecutorService heartBeatExecutor;
+
+  // ============Need serialize END=============/
+
+  public DataPartitionTableIntegrityCheckProcedure() {
+    super();
+  }
+
+  @Override
+  protected Flow executeFromState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws InterruptedException {
+    try {
+      // Ensure to get the real-time DataNodes in the current cluster at every 
step
+      dataNodeManager = env.getConfigManager().getNodeManager();
+      allDataNodes = dataNodeManager.getRegisteredDataNodes();
+
+      switch (state) {
+        case COLLECT_EARLIEST_TIMESLOTS:
+          failedDataNodes = new HashSet<>();
+          return collectEarliestTimeslots();
+        case ANALYZE_MISSING_PARTITIONS:
+          lostDataPartitionsOfDatabases = new HashSet<>();
+          return analyzeMissingPartitions(env);
+        case REQUEST_PARTITION_TABLES:
+          heartBeatExecutor = Executors.newScheduledThreadPool(1);
+          return requestPartitionTables(env);
+        case MERGE_PARTITION_TABLES:
+          return mergePartitionTables(env);
+        case WRITE_PARTITION_TABLE_TO_RAFT:
+          return writePartitionTableToRaft(env);
+        default:
+          throw new ProcedureException("Unknown state: " + state);
+      }
+    } catch (Exception e) {
+      LOG.error("Error executing state {}: {}", state, e.getMessage(), e);
+      setFailure("DataPartitionTableIntegrityCheckProcedure", e);
+      return Flow.NO_MORE_STATE;
+    }
+  }
+
+  @Override
+  protected void rollbackState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case COLLECT_EARLIEST_TIMESLOTS:
+      case ANALYZE_MISSING_PARTITIONS:
+      case REQUEST_PARTITION_TABLES:
+      case MERGE_PARTITION_TABLES:
+      case WRITE_PARTITION_TABLE_TO_RAFT:
+        // Cleanup resources
+        earliestTimeslots.clear();
+        dataPartitionTables.clear();
+        allDataNodes.clear();
+        finalDataPartitionTable = null;
+        break;
+      default:
+        throw new ProcedureException("Unknown state for rollback: " + state);
+    }
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getState(final int 
stateId) {
+    return DataPartitionTableIntegrityCheckProcedureState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(final 
DataPartitionTableIntegrityCheckProcedureState state) {
+    return state.ordinal();
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getInitialState() {
+    skipDataNodes = new HashSet<>();
+    failedDataNodes = new HashSet<>();
+    return 
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS;
+  }
+
+  /**
+   * Collect earliest timeslot information from all DataNodes. Each DataNode 
returns a Map<String,
+   * Long> where key is database name and value is the earliest timeslot id.
+   */
+  private Flow collectEarliestTimeslots() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Collecting earliest timeslots from all DataNodes...");
+    }
+
+    if (allDataNodes.isEmpty()) {
+      LOG.error(
+          "No DataNodes registered, no way to collect earliest timeslots, 
terminating procedure");
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    // Collect earliest timeslots from all DataNodes
+    allDataNodes.removeAll(skipDataNodes);
+    for (TDataNodeConfiguration dataNode : allDataNodes) {
+      try {
+        TGetEarliestTimeslotsResp resp =
+            (TGetEarliestTimeslotsResp)
+                SyncDataNodeClientPool.getInstance()
+                    .sendSyncRequestToDataNodeWithGivenRetry(

Review Comment:
   Should be processed asynchronosly



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java:
##########
@@ -414,6 +427,34 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   private static final String SYSTEM = "system";
 
+  private final ExecutorService findEarliestTimeSlotExecutor =
+      new WrappedThreadPoolExecutor(
+          0,
+          
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(),
+          0L,
+          TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(
+              
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()),
+          new 
IoTThreadFactory(ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName()),
+          ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName(),
+          new ThreadPoolExecutor.CallerRunsPolicy());
+
+  private final ExecutorService partitionTableRecoverExecutor =
+      new WrappedThreadPoolExecutor(
+          0,
+          
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(),
+          0L,
+          TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(
+              
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()),
+          new 
IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()),
+          ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(),
+          new ThreadPoolExecutor.CallerRunsPolicy());
+
+  private Map<String, Long> databaseEarliestRegionMap = new 
ConcurrentHashMap<>();

Review Comment:
   This could be set to a local variable.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java:
##########
@@ -110,6 +116,13 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
 
   private int exitStatusCode = 0;
 
+  private Future<Void> dataPartitionTableCheckFuture;
+
+  private ExecutorService dataPartitionTableCheckExecutor =
+      
IoTDBThreadPoolFactory.newSingleThreadExecutor("DATA_PARTITION_TABLE_CHECK");
+
+  private final CountDownLatch latch = new CountDownLatch(1);

Review Comment:
   This CountDownLatch is unnecessary.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java:
##########
@@ -203,6 +221,34 @@ public void active() {
         }
         loadSecretKey();
         loadHardwareCode();
+
+        dataPartitionTableCheckFuture =
+            dataPartitionTableCheckExecutor.submit(
+                () -> {
+                  LOGGER.info(
+                      "Prepare to start dataPartitionTableIntegrityCheck after 
all datanodes are started up");
+                  //          
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeout());
+
+                  while (latch.getCount() > 0) {
+                    List<Integer> dnList =
+                        configManager
+                            .getLoadManager()
+                            .filterDataNodeThroughStatus(NodeStatus.Running);
+                    if (dnList != null && !dnList.isEmpty()) {
+                      LOGGER.info("Starting 
dataPartitionTableIntegrityCheck...");
+                      TSStatus status =
+                          
configManager.getProcedureManager().dataPartitionTableIntegrityCheck();
+                      if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+                        LOGGER.error("Data partition table integrity check 
failed!");
+                      }
+                      latch.countDown();
+                    } else {
+                      LOGGER.info("No running datanodes found, waiting...");
+                      Thread.sleep(5000); // 等待5秒后重新检查

Review Comment:
   ```suggestion
                         Thread.sleep(5000);
   ```



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java:
##########
@@ -352,6 +352,9 @@ public DataSet registerDataNode(TDataNodeRegisterReq req) {
     // Adjust the maximum RegionGroup number of each Database
     getClusterSchemaManager().adjustMaxRegionGroupNum();
 
+    // Check if all DataNodes are registered and trigger integrity check if 
needed
+    checkAndTriggerIntegrityCheck();

Review Comment:
   Remove this function.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java:
##########
@@ -203,6 +221,34 @@ public void active() {
         }
         loadSecretKey();
         loadHardwareCode();
+
+        dataPartitionTableCheckFuture =
+            dataPartitionTableCheckExecutor.submit(
+                () -> {
+                  LOGGER.info(
+                      "Prepare to start dataPartitionTableIntegrityCheck after 
all datanodes are started up");
+                  //          
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeout());

Review Comment:
   Remove this.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java:
##########
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import 
org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import 
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Procedure for checking and restoring data partition table integrity. This 
procedure scans all
+ * DataNodes to detect missing data partitions and restores the 
DataPartitionTable on the ConfigNode
+ * Leader.
+ */
+public class DataPartitionTableIntegrityCheckProcedure
+    extends StateMachineProcedure<
+        ConfigNodeProcedureEnv, 
DataPartitionTableIntegrityCheckProcedureState> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class);
+
+  private static final int MAX_RETRY_COUNT = 3;
+  private static final long HEART_BEAT_REQUEST_RATE = 60000;
+
+  NodeManager dataNodeManager;
+  private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
+
+  // ============Need serialize BEGIN=============/
+  /** Collected earliest timeslots from DataNodes: database -> earliest 
timeslot */
+  private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>();
+
+  /** DataPartitionTables collected from DataNodes: dataNodeId -> 
DataPartitionTable */
+  private Map<Integer, DataPartitionTable> dataPartitionTables = new 
ConcurrentHashMap<>();
+
+  private Set<String> lostDataPartitionsOfDatabases = new HashSet<>();
+
+  /** Final merged DataPartitionTable */
+  private DataPartitionTable finalDataPartitionTable;
+
+  private static Set<TDataNodeConfiguration> skipDataNodes = new HashSet<>();
+  private static Set<TDataNodeConfiguration> failedDataNodes = new HashSet<>();
+
+  private static ScheduledExecutorService heartBeatExecutor;
+
+  // ============Need serialize END=============/
+
+  public DataPartitionTableIntegrityCheckProcedure() {
+    super();
+  }
+
+  @Override
+  protected Flow executeFromState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws InterruptedException {
+    try {
+      // Ensure to get the real-time DataNodes in the current cluster at every 
step
+      dataNodeManager = env.getConfigManager().getNodeManager();
+      allDataNodes = dataNodeManager.getRegisteredDataNodes();
+
+      switch (state) {
+        case COLLECT_EARLIEST_TIMESLOTS:
+          failedDataNodes = new HashSet<>();
+          return collectEarliestTimeslots();
+        case ANALYZE_MISSING_PARTITIONS:
+          lostDataPartitionsOfDatabases = new HashSet<>();
+          return analyzeMissingPartitions(env);
+        case REQUEST_PARTITION_TABLES:
+          heartBeatExecutor = Executors.newScheduledThreadPool(1);
+          return requestPartitionTables(env);
+        case MERGE_PARTITION_TABLES:
+          return mergePartitionTables(env);
+        case WRITE_PARTITION_TABLE_TO_RAFT:
+          return writePartitionTableToRaft(env);
+        default:
+          throw new ProcedureException("Unknown state: " + state);
+      }
+    } catch (Exception e) {
+      LOG.error("Error executing state {}: {}", state, e.getMessage(), e);
+      setFailure("DataPartitionTableIntegrityCheckProcedure", e);
+      return Flow.NO_MORE_STATE;
+    }
+  }
+
+  @Override
+  protected void rollbackState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case COLLECT_EARLIEST_TIMESLOTS:
+      case ANALYZE_MISSING_PARTITIONS:
+      case REQUEST_PARTITION_TABLES:
+      case MERGE_PARTITION_TABLES:
+      case WRITE_PARTITION_TABLE_TO_RAFT:
+        // Cleanup resources
+        earliestTimeslots.clear();
+        dataPartitionTables.clear();
+        allDataNodes.clear();
+        finalDataPartitionTable = null;
+        break;
+      default:
+        throw new ProcedureException("Unknown state for rollback: " + state);
+    }
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getState(final int 
stateId) {
+    return DataPartitionTableIntegrityCheckProcedureState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(final 
DataPartitionTableIntegrityCheckProcedureState state) {
+    return state.ordinal();
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getInitialState() {
+    skipDataNodes = new HashSet<>();
+    failedDataNodes = new HashSet<>();
+    return 
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS;
+  }
+
+  /**
+   * Collect earliest timeslot information from all DataNodes. Each DataNode 
returns a Map<String,
+   * Long> where key is database name and value is the earliest timeslot id.
+   */
+  private Flow collectEarliestTimeslots() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Collecting earliest timeslots from all DataNodes...");
+    }
+
+    if (allDataNodes.isEmpty()) {
+      LOG.error(
+          "No DataNodes registered, no way to collect earliest timeslots, 
terminating procedure");
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    // Collect earliest timeslots from all DataNodes
+    allDataNodes.removeAll(skipDataNodes);
+    for (TDataNodeConfiguration dataNode : allDataNodes) {
+      try {
+        TGetEarliestTimeslotsResp resp =
+            (TGetEarliestTimeslotsResp)
+                SyncDataNodeClientPool.getInstance()
+                    .sendSyncRequestToDataNodeWithGivenRetry(
+                        dataNode.getLocation().getInternalEndPoint(),
+                        null,
+                        CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
+                        MAX_RETRY_COUNT);
+        if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          failedDataNodes.add(dataNode);
+          LOG.error(
+              "Failed to collected earliest timeslots from the 
DataNode[id={}], response status is {}",
+              dataNode.getLocation().getDataNodeId(),
+              resp.getStatus());
+          continue;
+        }
+
+        Map<String, Long> nodeTimeslots = resp.getDatabaseToEarliestTimeslot();
+
+        // Merge with existing timeslots (take minimum)
+        for (Map.Entry<String, Long> entry : nodeTimeslots.entrySet()) {
+          earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Collected earliest timeslots from the DataNode[id={}]: {}",
+              dataNode.getLocation().getDataNodeId(),
+              nodeTimeslots);
+        }
+      } catch (Exception e) {
+        LOG.error(
+            "Failed to collect earliest timeslots from the DataNode[id={}]: 
{}",
+            dataNode.getLocation().getDataNodeId(),
+            e.getMessage(),
+            e);
+        failedDataNodes.add(dataNode);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info(
+          "Collected earliest timeslots from {} DataNodes: {}, the number of 
successful DataNodes is {}",
+          allDataNodes.size(),
+          earliestTimeslots,
+          allDataNodes.size() - failedDataNodes.size());
+    }
+
+    if (failedDataNodes.size() == allDataNodes.size()
+        && new HashSet<>(allDataNodes).containsAll(failedDataNodes)) {
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+    } else {
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS);
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  /**
+   * Analyze which data partitions are missing based on earliest timeslots. 
Identify data partitions
+   * of databases need to be repaired.
+   */
+  private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Analyzing missing data partitions...");
+    }
+
+    if (earliestTimeslots.isEmpty()) {
+      LOG.error(
+          "No missing data partitions detected, nothing needs to be repaired, 
terminating procedure");
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    // Find all databases that have lost data partition tables
+    for (Map.Entry<String, Long> entry : earliestTimeslots.entrySet()) {
+      String database = entry.getKey();
+      long earliestTimeslot = entry.getValue();
+
+      // Get current DataPartitionTable from ConfigManager
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>>
+          localDataPartitionTable = getLocalDataPartitionTable(env, database);
+
+      // Check if ConfigNode has a data partition that is associated with the 
earliestTimeslot
+      if (localDataPartitionTable == null
+          || localDataPartitionTable.isEmpty()
+          || localDataPartitionTable.get(database) == null
+          || localDataPartitionTable.get(database).isEmpty()) {
+        lostDataPartitionsOfDatabases.add(database);
+        LOG.warn(
+            "No data partition table related to database {} was found from the 
ConfigNode, and this issue needs to be repaired",
+            database);
+        continue;
+      }
+
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>
+          seriesPartitionMap = localDataPartitionTable.get(database);
+      for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>
+          seriesPartitionEntry : seriesPartitionMap.entrySet()) {
+        Map<TTimePartitionSlot, List<TConsensusGroupId>> 
tTimePartitionSlotListMap =
+            seriesPartitionEntry.getValue();
+        tTimePartitionSlotListMap
+            .keySet()
+            .forEach(
+                slot -> {
+                  if (!TimePartitionUtils.satisfyPartitionId(
+                      slot.getStartTime(), earliestTimeslot)) {
+                    lostDataPartitionsOfDatabases.add(database);
+                    LOG.warn(
+                        "Database {} has lost timeslot {} in its data table 
partition, and this issue needs to be repaired",
+                        database,
+                        earliestTimeslot);
+                  }

Review Comment:
   Rectify these codes. The only thing u need to check is whether the 
database's earlist timeSlot greater than the DataNode's.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java:
##########
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import 
org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import 
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Procedure for checking and restoring data partition table integrity. This 
procedure scans all
+ * DataNodes to detect missing data partitions and restores the 
DataPartitionTable on the ConfigNode
+ * Leader.
+ */
+public class DataPartitionTableIntegrityCheckProcedure
+    extends StateMachineProcedure<
+        ConfigNodeProcedureEnv, 
DataPartitionTableIntegrityCheckProcedureState> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class);
+
+  private static final int MAX_RETRY_COUNT = 3;
+  private static final long HEART_BEAT_REQUEST_RATE = 60000;
+
+  NodeManager dataNodeManager;
+  private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
+
+  // ============Need serialize BEGIN=============/
+  /** Collected earliest timeslots from DataNodes: database -> earliest 
timeslot */
+  private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>();
+
+  /** DataPartitionTables collected from DataNodes: dataNodeId -> 
DataPartitionTable */
+  private Map<Integer, DataPartitionTable> dataPartitionTables = new 
ConcurrentHashMap<>();
+
+  private Set<String> lostDataPartitionsOfDatabases = new HashSet<>();
+
+  /** Final merged DataPartitionTable */
+  private DataPartitionTable finalDataPartitionTable;
+
+  private static Set<TDataNodeConfiguration> skipDataNodes = new HashSet<>();
+  private static Set<TDataNodeConfiguration> failedDataNodes = new HashSet<>();
+
+  private static ScheduledExecutorService heartBeatExecutor;
+
+  // ============Need serialize END=============/
+
+  public DataPartitionTableIntegrityCheckProcedure() {
+    super();
+  }
+
+  @Override
+  protected Flow executeFromState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws InterruptedException {
+    try {
+      // Ensure to get the real-time DataNodes in the current cluster at every 
step
+      dataNodeManager = env.getConfigManager().getNodeManager();
+      allDataNodes = dataNodeManager.getRegisteredDataNodes();
+
+      switch (state) {
+        case COLLECT_EARLIEST_TIMESLOTS:
+          failedDataNodes = new HashSet<>();
+          return collectEarliestTimeslots();
+        case ANALYZE_MISSING_PARTITIONS:
+          lostDataPartitionsOfDatabases = new HashSet<>();
+          return analyzeMissingPartitions(env);
+        case REQUEST_PARTITION_TABLES:
+          heartBeatExecutor = Executors.newScheduledThreadPool(1);
+          return requestPartitionTables(env);
+        case MERGE_PARTITION_TABLES:
+          return mergePartitionTables(env);
+        case WRITE_PARTITION_TABLE_TO_RAFT:
+          return writePartitionTableToRaft(env);
+        default:
+          throw new ProcedureException("Unknown state: " + state);
+      }
+    } catch (Exception e) {
+      LOG.error("Error executing state {}: {}", state, e.getMessage(), e);
+      setFailure("DataPartitionTableIntegrityCheckProcedure", e);
+      return Flow.NO_MORE_STATE;
+    }
+  }
+
+  @Override
+  protected void rollbackState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case COLLECT_EARLIEST_TIMESLOTS:
+      case ANALYZE_MISSING_PARTITIONS:
+      case REQUEST_PARTITION_TABLES:
+      case MERGE_PARTITION_TABLES:
+      case WRITE_PARTITION_TABLE_TO_RAFT:
+        // Cleanup resources
+        earliestTimeslots.clear();
+        dataPartitionTables.clear();
+        allDataNodes.clear();
+        finalDataPartitionTable = null;
+        break;
+      default:
+        throw new ProcedureException("Unknown state for rollback: " + state);
+    }
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getState(final int 
stateId) {
+    return DataPartitionTableIntegrityCheckProcedureState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(final 
DataPartitionTableIntegrityCheckProcedureState state) {
+    return state.ordinal();
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getInitialState() {
+    skipDataNodes = new HashSet<>();
+    failedDataNodes = new HashSet<>();
+    return 
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS;
+  }
+
+  /**
+   * Collect earliest timeslot information from all DataNodes. Each DataNode 
returns a Map<String,
+   * Long> where key is database name and value is the earliest timeslot id.
+   */
+  private Flow collectEarliestTimeslots() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Collecting earliest timeslots from all DataNodes...");
+    }
+
+    if (allDataNodes.isEmpty()) {
+      LOG.error(
+          "No DataNodes registered, no way to collect earliest timeslots, 
terminating procedure");
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    // Collect earliest timeslots from all DataNodes
+    allDataNodes.removeAll(skipDataNodes);
+    for (TDataNodeConfiguration dataNode : allDataNodes) {
+      try {
+        TGetEarliestTimeslotsResp resp =
+            (TGetEarliestTimeslotsResp)
+                SyncDataNodeClientPool.getInstance()
+                    .sendSyncRequestToDataNodeWithGivenRetry(
+                        dataNode.getLocation().getInternalEndPoint(),
+                        null,
+                        CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
+                        MAX_RETRY_COUNT);
+        if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          failedDataNodes.add(dataNode);
+          LOG.error(
+              "Failed to collected earliest timeslots from the 
DataNode[id={}], response status is {}",
+              dataNode.getLocation().getDataNodeId(),
+              resp.getStatus());
+          continue;
+        }
+
+        Map<String, Long> nodeTimeslots = resp.getDatabaseToEarliestTimeslot();
+
+        // Merge with existing timeslots (take minimum)
+        for (Map.Entry<String, Long> entry : nodeTimeslots.entrySet()) {
+          earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Collected earliest timeslots from the DataNode[id={}]: {}",
+              dataNode.getLocation().getDataNodeId(),
+              nodeTimeslots);
+        }
+      } catch (Exception e) {
+        LOG.error(
+            "Failed to collect earliest timeslots from the DataNode[id={}]: 
{}",
+            dataNode.getLocation().getDataNodeId(),
+            e.getMessage(),
+            e);
+        failedDataNodes.add(dataNode);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info(
+          "Collected earliest timeslots from {} DataNodes: {}, the number of 
successful DataNodes is {}",
+          allDataNodes.size(),
+          earliestTimeslots,
+          allDataNodes.size() - failedDataNodes.size());
+    }
+
+    if (failedDataNodes.size() == allDataNodes.size()
+        && new HashSet<>(allDataNodes).containsAll(failedDataNodes)) {
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+    } else {
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS);
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  /**
+   * Analyze which data partitions are missing based on earliest timeslots. 
Identify data partitions
+   * of databases need to be repaired.
+   */
+  private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Analyzing missing data partitions...");
+    }
+
+    if (earliestTimeslots.isEmpty()) {
+      LOG.error(
+          "No missing data partitions detected, nothing needs to be repaired, 
terminating procedure");
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    // Find all databases that have lost data partition tables
+    for (Map.Entry<String, Long> entry : earliestTimeslots.entrySet()) {
+      String database = entry.getKey();
+      long earliestTimeslot = entry.getValue();
+
+      // Get current DataPartitionTable from ConfigManager
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>>
+          localDataPartitionTable = getLocalDataPartitionTable(env, database);
+
+      // Check if ConfigNode has a data partition that is associated with the 
earliestTimeslot
+      if (localDataPartitionTable == null
+          || localDataPartitionTable.isEmpty()
+          || localDataPartitionTable.get(database) == null
+          || localDataPartitionTable.get(database).isEmpty()) {
+        lostDataPartitionsOfDatabases.add(database);
+        LOG.warn(
+            "No data partition table related to database {} was found from the 
ConfigNode, and this issue needs to be repaired",
+            database);
+        continue;
+      }
+
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>
+          seriesPartitionMap = localDataPartitionTable.get(database);
+      for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>
+          seriesPartitionEntry : seriesPartitionMap.entrySet()) {
+        Map<TTimePartitionSlot, List<TConsensusGroupId>> 
tTimePartitionSlotListMap =
+            seriesPartitionEntry.getValue();
+        tTimePartitionSlotListMap
+            .keySet()
+            .forEach(
+                slot -> {
+                  if (!TimePartitionUtils.satisfyPartitionId(
+                      slot.getStartTime(), earliestTimeslot)) {
+                    lostDataPartitionsOfDatabases.add(database);
+                    LOG.warn(
+                        "Database {} has lost timeslot {} in its data table 
partition, and this issue needs to be repaired",
+                        database,
+                        earliestTimeslot);
+                  }
+                });
+      }
+    }
+
+    if (lostDataPartitionsOfDatabases.isEmpty()) {
+      LOG.info("No databases have lost data partitions, terminating 
procedure");
+      return Flow.NO_MORE_STATE;
+    }
+
+    LOG.info(
+        "Identified {} databases have lost data partitions, will request 
DataPartitionTable generation from {} DataNodes",
+        lostDataPartitionsOfDatabases.size(),
+        allDataNodes.size() - failedDataNodes.size());
+    
setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TConsensusGroupId>>>>
+      getLocalDataPartitionTable(final ConfigNodeProcedureEnv env, final 
String database) {
+    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> 
schemaPartitionTable =
+        env.getConfigManager()
+            .getSchemaPartition(Collections.singletonMap(database, 
Collections.emptyList()))
+            .getSchemaPartitionTable();
+
+    // Construct request for getting data partition
+    final Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
partitionSlotsMap = new HashMap<>();
+    schemaPartitionTable.forEach(
+        (key, value) -> {
+          Map<TSeriesPartitionSlot, TTimeSlotList> slotListMap = new 
HashMap<>();
+          value
+              .keySet()
+              .forEach(
+                  slot ->
+                      slotListMap.put(
+                          slot, new TTimeSlotList(Collections.emptyList(), 
true, true)));
+          partitionSlotsMap.put(key, slotListMap);
+        });
+    final GetDataPartitionPlan getDataPartitionPlan = new 
GetDataPartitionPlan(partitionSlotsMap);
+    return 
env.getConfigManager().getDataPartition(getDataPartitionPlan).getDataPartitionTable();
+  }
+
+  /**
+   * Request DataPartitionTable generation from target DataNodes. Each 
DataNode scans its tsfile
+   * resources and generates a DataPartitionTable.
+   */
+  private Flow requestPartitionTables(final ConfigNodeProcedureEnv env) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Requesting DataPartitionTable generation from {} DataNodes...", 
allDataNodes.size());
+    }
+
+    if (allDataNodes.isEmpty()) {
+      LOG.error(
+          "No DataNodes registered, no way to requested DataPartitionTable 
generation, terminating procedure");
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        heartBeatExecutor,
+        this::checkPartitionTableGenerationStatus,
+        0,
+        HEART_BEAT_REQUEST_RATE,
+        TimeUnit.MILLISECONDS);
+
+    allDataNodes.removeAll(skipDataNodes);
+    allDataNodes.removeAll(failedDataNodes);
+    for (TDataNodeConfiguration dataNode : allDataNodes) {
+      int dataNodeId = dataNode.getLocation().getDataNodeId();
+      if (!dataPartitionTables.containsKey(dataNodeId)) {
+        try {
+          TGenerateDataPartitionTableReq req = new 
TGenerateDataPartitionTableReq();
+          req.setDatabases(lostDataPartitionsOfDatabases);
+          TGenerateDataPartitionTableResp resp =
+              (TGenerateDataPartitionTableResp)
+                  SyncDataNodeClientPool.getInstance()
+                      .sendSyncRequestToDataNodeWithGivenRetry(
+                          dataNode.getLocation().getInternalEndPoint(),
+                          req,
+                          CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE,
+                          MAX_RETRY_COUNT);
+          if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            failedDataNodes.add(dataNode);
+            LOG.error(
+                "Failed to request DataPartitionTable generation from the 
DataNode[id={}], response status is {}",
+                dataNode.getLocation().getDataNodeId(),
+                resp.getStatus());
+            continue;
+          }
+
+          byte[] bytes = resp.getDataPartitionTable();
+          DataPartitionTable dataPartitionTable = new DataPartitionTable();
+          dataPartitionTable.deserialize(ByteBuffer.wrap(bytes));
+          dataPartitionTables.put(dataNodeId, dataPartitionTable);
+        } catch (Exception e) {
+          failedDataNodes.add(dataNode);
+          LOG.error(
+              "Failed to request DataPartitionTable generation from 
DataNode[id={}]: {}",
+              dataNodeId,
+              e.getMessage(),
+              e);
+        }
+      }
+    }
+
+    if (failedDataNodes.size() == allDataNodes.size()
+        && new HashSet<>(allDataNodes).containsAll(failedDataNodes)) {
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    
setNextState(DataPartitionTableIntegrityCheckProcedureState.MERGE_PARTITION_TABLES);
+    return Flow.HAS_MORE_STATE;
+  }
+
+  /** Check completion status of DataPartitionTable generation tasks. */
+  private void checkPartitionTableGenerationStatus() {
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Checking DataPartitionTable generation completion status...");
+    }
+
+    int completeCount = 0;
+    for (TDataNodeConfiguration dataNode : allDataNodes) {
+      int dataNodeId = dataNode.getLocation().getDataNodeId();
+
+      if (!dataPartitionTables.containsKey(dataNodeId)) {
+        try {
+          TGenerateDataPartitionTableHeartbeatResp resp =
+              (TGenerateDataPartitionTableHeartbeatResp)
+                  SyncDataNodeClientPool.getInstance()
+                      .sendSyncRequestToDataNodeWithGivenRetry(
+                          dataNode.getLocation().getInternalEndPoint(),
+                          null,
+                          
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
+                          MAX_RETRY_COUNT);
+          DataPartitionTableGeneratorState state =
+              
DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode());
+
+          if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            LOG.error(
+                "Failed to request DataPartitionTable generation heart beat 
from the DataNode[id={}], state is {}, response status is {}",
+                dataNode.getLocation().getDataNodeId(),
+                state,
+                resp.getStatus());
+            continue;
+          }
+          switch (state) {
+            case SUCCESS:
+              LOG.info(
+                  "DataNode {} completed DataPartitionTable generation, 
terminating heart beat",
+                  dataNodeId);
+              completeCount++;
+              break;
+            case IN_PROGRESS:
+              LOG.info("DataNode {} still generating DataPartitionTable", 
dataNodeId);
+              break;
+            case FAILED:
+              LOG.error(
+                  "DataNode {} failed to generate DataPartitionTable, 
terminating heart beat",
+                  dataNodeId);
+              completeCount++;
+              break;
+            default:
+              LOG.error(
+                  "DataNode {} returned unknown error code: {}", dataNodeId, 
resp.getErrorCode());
+              break;
+          }
+        } catch (Exception e) {
+          LOG.error(
+              "Error checking DataPartitionTable status from DataNode {}: {}, 
terminating heart beat",
+              dataNodeId,
+              e.getMessage(),
+              e);
+          completeCount++;
+        }
+      } else {
+        completeCount++;
+      }
+    }
+
+    if (completeCount >= allDataNodes.size()) {
+      heartBeatExecutor.shutdown();
+    }
+  }
+
+  /** Merge DataPartitionTables from all DataNodes into a final table. */
+  private Flow mergePartitionTables(final ConfigNodeProcedureEnv env) {

Review Comment:
   Should extract some common functions for DataPartitionTable, 
SeriesPartitionTable.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java:
##########
@@ -110,6 +116,13 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
 
   private int exitStatusCode = 0;
 
+  private Future<Void> dataPartitionTableCheckFuture;
+
+  private ExecutorService dataPartitionTableCheckExecutor =
+      
IoTDBThreadPoolFactory.newSingleThreadExecutor("DATA_PARTITION_TABLE_CHECK");

Review Comment:
   Verify the lifecycle of this variable, it should be cleaned after your check 
passed.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java:
##########
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import 
org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import 
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Procedure for checking and restoring data partition table integrity. This 
procedure scans all
+ * DataNodes to detect missing data partitions and restores the 
DataPartitionTable on the ConfigNode
+ * Leader.
+ */
+public class DataPartitionTableIntegrityCheckProcedure
+    extends StateMachineProcedure<
+        ConfigNodeProcedureEnv, 
DataPartitionTableIntegrityCheckProcedureState> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class);
+
+  private static final int MAX_RETRY_COUNT = 3;
+  private static final long HEART_BEAT_REQUEST_RATE = 60000;
+
+  NodeManager dataNodeManager;
+  private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
+
+  // ============Need serialize BEGIN=============/
+  /** Collected earliest timeslots from DataNodes: database -> earliest 
timeslot */
+  private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>();
+
+  /** DataPartitionTables collected from DataNodes: dataNodeId -> 
DataPartitionTable */
+  private Map<Integer, DataPartitionTable> dataPartitionTables = new 
ConcurrentHashMap<>();
+
+  private Set<String> lostDataPartitionsOfDatabases = new HashSet<>();
+
+  /** Final merged DataPartitionTable */
+  private DataPartitionTable finalDataPartitionTable;
+
+  private static Set<TDataNodeConfiguration> skipDataNodes = new HashSet<>();
+  private static Set<TDataNodeConfiguration> failedDataNodes = new HashSet<>();
+
+  private static ScheduledExecutorService heartBeatExecutor;
+
+  // ============Need serialize END=============/
+
+  public DataPartitionTableIntegrityCheckProcedure() {
+    super();
+  }
+
+  @Override
+  protected Flow executeFromState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws InterruptedException {
+    try {
+      // Ensure to get the real-time DataNodes in the current cluster at every 
step
+      dataNodeManager = env.getConfigManager().getNodeManager();
+      allDataNodes = dataNodeManager.getRegisteredDataNodes();
+
+      switch (state) {
+        case COLLECT_EARLIEST_TIMESLOTS:
+          failedDataNodes = new HashSet<>();
+          return collectEarliestTimeslots();
+        case ANALYZE_MISSING_PARTITIONS:
+          lostDataPartitionsOfDatabases = new HashSet<>();
+          return analyzeMissingPartitions(env);
+        case REQUEST_PARTITION_TABLES:
+          heartBeatExecutor = Executors.newScheduledThreadPool(1);
+          return requestPartitionTables(env);
+        case MERGE_PARTITION_TABLES:
+          return mergePartitionTables(env);
+        case WRITE_PARTITION_TABLE_TO_RAFT:
+          return writePartitionTableToRaft(env);
+        default:
+          throw new ProcedureException("Unknown state: " + state);
+      }
+    } catch (Exception e) {
+      LOG.error("Error executing state {}: {}", state, e.getMessage(), e);
+      setFailure("DataPartitionTableIntegrityCheckProcedure", e);
+      return Flow.NO_MORE_STATE;
+    }
+  }
+
+  @Override
+  protected void rollbackState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case COLLECT_EARLIEST_TIMESLOTS:
+      case ANALYZE_MISSING_PARTITIONS:
+      case REQUEST_PARTITION_TABLES:
+      case MERGE_PARTITION_TABLES:
+      case WRITE_PARTITION_TABLE_TO_RAFT:
+        // Cleanup resources
+        earliestTimeslots.clear();
+        dataPartitionTables.clear();
+        allDataNodes.clear();
+        finalDataPartitionTable = null;
+        break;
+      default:
+        throw new ProcedureException("Unknown state for rollback: " + state);
+    }
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getState(final int 
stateId) {
+    return DataPartitionTableIntegrityCheckProcedureState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(final 
DataPartitionTableIntegrityCheckProcedureState state) {
+    return state.ordinal();
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getInitialState() {
+    skipDataNodes = new HashSet<>();
+    failedDataNodes = new HashSet<>();
+    return 
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS;
+  }
+
+  /**
+   * Collect earliest timeslot information from all DataNodes. Each DataNode 
returns a Map<String,
+   * Long> where key is database name and value is the earliest timeslot id.
+   */
+  private Flow collectEarliestTimeslots() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Collecting earliest timeslots from all DataNodes...");
+    }
+
+    if (allDataNodes.isEmpty()) {
+      LOG.error(
+          "No DataNodes registered, no way to collect earliest timeslots, 
terminating procedure");

Review Comment:
   DO NOT use _terminating_, this is ambiguous here.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java:
##########
@@ -203,6 +221,34 @@ public void active() {
         }
         loadSecretKey();
         loadHardwareCode();
+
+        dataPartitionTableCheckFuture =
+            dataPartitionTableCheckExecutor.submit(
+                () -> {
+                  LOGGER.info(
+                      "Prepare to start dataPartitionTableIntegrityCheck after 
all datanodes are started up");

Review Comment:
   Better to append a prefix for all of your logs appended, e.g.,
   ```suggestion
                         "[DataPartitionIntegrity] Prepare to start 
dataPartitionTableIntegrityCheck after all datanodes are started up");
   ```



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java:
##########
@@ -319,6 +319,8 @@ public class ConfigNodeConfig {
 
   private long forceWalPeriodForConfigNodeSimpleInMs = 100;
 
+  private long partitionTableRecoverWaitAllDnUpTimeout = 60000;

Review Comment:
   Refactor this parameter
   ```suggestion
     private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000;
   ```



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java:
##########
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
+import org.apache.iotdb.commons.partition.DataPartitionTable;
+import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
+import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import 
org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import 
org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
+import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
+import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Procedure for checking and restoring data partition table integrity. This 
procedure scans all
+ * DataNodes to detect missing data partitions and restores the 
DataPartitionTable on the ConfigNode
+ * Leader.
+ */
+public class DataPartitionTableIntegrityCheckProcedure
+    extends StateMachineProcedure<
+        ConfigNodeProcedureEnv, 
DataPartitionTableIntegrityCheckProcedureState> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class);
+
+  private static final int MAX_RETRY_COUNT = 3;
+  private static final long HEART_BEAT_REQUEST_RATE = 60000;
+
+  NodeManager dataNodeManager;
+  private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
+
+  // ============Need serialize BEGIN=============/
+  /** Collected earliest timeslots from DataNodes: database -> earliest 
timeslot */
+  private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>();
+
+  /** DataPartitionTables collected from DataNodes: dataNodeId -> 
DataPartitionTable */
+  private Map<Integer, DataPartitionTable> dataPartitionTables = new 
ConcurrentHashMap<>();
+
+  private Set<String> lostDataPartitionsOfDatabases = new HashSet<>();
+
+  /** Final merged DataPartitionTable */
+  private DataPartitionTable finalDataPartitionTable;
+
+  private static Set<TDataNodeConfiguration> skipDataNodes = new HashSet<>();
+  private static Set<TDataNodeConfiguration> failedDataNodes = new HashSet<>();
+
+  private static ScheduledExecutorService heartBeatExecutor;
+
+  // ============Need serialize END=============/
+
+  public DataPartitionTableIntegrityCheckProcedure() {
+    super();
+  }
+
+  @Override
+  protected Flow executeFromState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws InterruptedException {
+    try {
+      // Ensure to get the real-time DataNodes in the current cluster at every 
step
+      dataNodeManager = env.getConfigManager().getNodeManager();
+      allDataNodes = dataNodeManager.getRegisteredDataNodes();
+
+      switch (state) {
+        case COLLECT_EARLIEST_TIMESLOTS:
+          failedDataNodes = new HashSet<>();
+          return collectEarliestTimeslots();
+        case ANALYZE_MISSING_PARTITIONS:
+          lostDataPartitionsOfDatabases = new HashSet<>();
+          return analyzeMissingPartitions(env);
+        case REQUEST_PARTITION_TABLES:
+          heartBeatExecutor = Executors.newScheduledThreadPool(1);
+          return requestPartitionTables(env);
+        case MERGE_PARTITION_TABLES:
+          return mergePartitionTables(env);
+        case WRITE_PARTITION_TABLE_TO_RAFT:
+          return writePartitionTableToRaft(env);
+        default:
+          throw new ProcedureException("Unknown state: " + state);
+      }
+    } catch (Exception e) {
+      LOG.error("Error executing state {}: {}", state, e.getMessage(), e);
+      setFailure("DataPartitionTableIntegrityCheckProcedure", e);
+      return Flow.NO_MORE_STATE;
+    }
+  }
+
+  @Override
+  protected void rollbackState(
+      final ConfigNodeProcedureEnv env, final 
DataPartitionTableIntegrityCheckProcedureState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case COLLECT_EARLIEST_TIMESLOTS:
+      case ANALYZE_MISSING_PARTITIONS:
+      case REQUEST_PARTITION_TABLES:
+      case MERGE_PARTITION_TABLES:
+      case WRITE_PARTITION_TABLE_TO_RAFT:
+        // Cleanup resources
+        earliestTimeslots.clear();
+        dataPartitionTables.clear();
+        allDataNodes.clear();
+        finalDataPartitionTable = null;
+        break;
+      default:
+        throw new ProcedureException("Unknown state for rollback: " + state);
+    }
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getState(final int 
stateId) {
+    return DataPartitionTableIntegrityCheckProcedureState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(final 
DataPartitionTableIntegrityCheckProcedureState state) {
+    return state.ordinal();
+  }
+
+  @Override
+  protected DataPartitionTableIntegrityCheckProcedureState getInitialState() {
+    skipDataNodes = new HashSet<>();
+    failedDataNodes = new HashSet<>();
+    return 
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS;
+  }
+
+  /**
+   * Collect earliest timeslot information from all DataNodes. Each DataNode 
returns a Map<String,
+   * Long> where key is database name and value is the earliest timeslot id.
+   */
+  private Flow collectEarliestTimeslots() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Collecting earliest timeslots from all DataNodes...");
+    }
+
+    if (allDataNodes.isEmpty()) {
+      LOG.error(
+          "No DataNodes registered, no way to collect earliest timeslots, 
terminating procedure");
+      
setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    // Collect earliest timeslots from all DataNodes
+    allDataNodes.removeAll(skipDataNodes);
+    for (TDataNodeConfiguration dataNode : allDataNodes) {
+      try {
+        TGetEarliestTimeslotsResp resp =
+            (TGetEarliestTimeslotsResp)
+                SyncDataNodeClientPool.getInstance()
+                    .sendSyncRequestToDataNodeWithGivenRetry(
+                        dataNode.getLocation().getInternalEndPoint(),
+                        null,
+                        CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
+                        MAX_RETRY_COUNT);
+        if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          failedDataNodes.add(dataNode);
+          LOG.error(
+              "Failed to collected earliest timeslots from the 
DataNode[id={}], response status is {}",
+              dataNode.getLocation().getDataNodeId(),
+              resp.getStatus());
+          continue;
+        }
+
+        Map<String, Long> nodeTimeslots = resp.getDatabaseToEarliestTimeslot();
+
+        // Merge with existing timeslots (take minimum)
+        for (Map.Entry<String, Long> entry : nodeTimeslots.entrySet()) {
+          earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Collected earliest timeslots from the DataNode[id={}]: {}",
+              dataNode.getLocation().getDataNodeId(),
+              nodeTimeslots);
+        }
+      } catch (Exception e) {
+        LOG.error(
+            "Failed to collect earliest timeslots from the DataNode[id={}]: 
{}",
+            dataNode.getLocation().getDataNodeId(),
+            e.getMessage(),
+            e);
+        failedDataNodes.add(dataNode);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info(
+          "Collected earliest timeslots from {} DataNodes: {}, the number of 
successful DataNodes is {}",
+          allDataNodes.size(),
+          earliestTimeslots,
+          allDataNodes.size() - failedDataNodes.size());
+    }
+
+    if (failedDataNodes.size() == allDataNodes.size()
+        && new HashSet<>(allDataNodes).containsAll(failedDataNodes)) {

Review Comment:
   HashSet<>(allDataNodes).containsAll(failedDataNodes) is always true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to