zerolbsony commented on code in PR #17279: URL: https://github.com/apache/iotdb/pull/17279#discussion_r2944133288
########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java: ########## @@ -0,0 +1,878 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType; +import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; +import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Procedure for checking and restoring data partition table integrity. This procedure scans all + * DataNodes to detect missing data partitions and restores the DataPartitionTable on the ConfigNode + * Leader. + */ +public class DataPartitionTableIntegrityCheckProcedure + extends StateMachineProcedure< + ConfigNodeProcedureEnv, DataPartitionTableIntegrityCheckProcedureState> { + + private static final Logger LOG = + LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class); + + private static final int MAX_RETRY_COUNT = 3; + private static final long HEART_BEAT_REQUEST_RATE = 60000; + + NodeManager dataNodeManager; + private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>(); + + // ============Need serialize BEGIN=============/ + /** Collected earliest timeslots from DataNodes: database -> earliest timeslot */ + private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>(); + + /** DataPartitionTables collected from DataNodes: dataNodeId -> DataPartitionTable */ + private Map<Integer, DataPartitionTable> dataPartitionTables = new ConcurrentHashMap<>(); + + private Set<String> lostDataPartitionsOfDatabases = new HashSet<>(); + + /** Final merged DataPartitionTable */ + private DataPartitionTable finalDataPartitionTable; + + private static Set<TDataNodeConfiguration> skipDataNodes = new HashSet<>(); + private static Set<TDataNodeConfiguration> failedDataNodes = new HashSet<>(); + + private static ScheduledExecutorService heartBeatExecutor; + + // ============Need serialize END=============/ + + public DataPartitionTableIntegrityCheckProcedure() { + super(); + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws InterruptedException { + try { + // Ensure to get the real-time DataNodes in the current cluster at every step + dataNodeManager = env.getConfigManager().getNodeManager(); + allDataNodes = dataNodeManager.getRegisteredDataNodes(); + + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + failedDataNodes = new HashSet<>(); + return collectEarliestTimeslots(); + case ANALYZE_MISSING_PARTITIONS: + lostDataPartitionsOfDatabases = new HashSet<>(); + return analyzeMissingPartitions(env); + case REQUEST_PARTITION_TABLES: + heartBeatExecutor = Executors.newScheduledThreadPool(1); + return requestPartitionTables(env); + case MERGE_PARTITION_TABLES: + return mergePartitionTables(env); + case WRITE_PARTITION_TABLE_TO_RAFT: + return writePartitionTableToRaft(env); + default: + throw new ProcedureException("Unknown state: " + state); + } + } catch (Exception e) { + LOG.error("Error executing state {}: {}", state, e.getMessage(), e); + setFailure("DataPartitionTableIntegrityCheckProcedure", e); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws IOException, InterruptedException, ProcedureException { + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + case ANALYZE_MISSING_PARTITIONS: + case REQUEST_PARTITION_TABLES: + case MERGE_PARTITION_TABLES: + case WRITE_PARTITION_TABLE_TO_RAFT: + // Cleanup resources + earliestTimeslots.clear(); + dataPartitionTables.clear(); + allDataNodes.clear(); + finalDataPartitionTable = null; + break; + default: + throw new ProcedureException("Unknown state for rollback: " + state); + } + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getState(final int stateId) { + return DataPartitionTableIntegrityCheckProcedureState.values()[stateId]; + } + + @Override + protected int getStateId(final DataPartitionTableIntegrityCheckProcedureState state) { + return state.ordinal(); + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getInitialState() { + skipDataNodes = new HashSet<>(); + failedDataNodes = new HashSet<>(); + return DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS; + } + + /** + * Collect earliest timeslot information from all DataNodes. Each DataNode returns a Map<String, + * Long> where key is database name and value is the earliest timeslot id. + */ + private Flow collectEarliestTimeslots() { + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting earliest timeslots from all DataNodes..."); + } + + if (allDataNodes.isEmpty()) { + LOG.error( + "No DataNodes registered, no way to collect earliest timeslots, terminating procedure"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + // Collect earliest timeslots from all DataNodes + allDataNodes.removeAll(skipDataNodes); + for (TDataNodeConfiguration dataNode : allDataNodes) { + try { + TGetEarliestTimeslotsResp resp = + (TGetEarliestTimeslotsResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS, + MAX_RETRY_COUNT); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failedDataNodes.add(dataNode); + LOG.error( + "Failed to collected earliest timeslots from the DataNode[id={}], response status is {}", + dataNode.getLocation().getDataNodeId(), + resp.getStatus()); + continue; + } + + Map<String, Long> nodeTimeslots = resp.getDatabaseToEarliestTimeslot(); + + // Merge with existing timeslots (take minimum) + for (Map.Entry<String, Long> entry : nodeTimeslots.entrySet()) { + earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Collected earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + nodeTimeslots); + } + } catch (Exception e) { + LOG.error( + "Failed to collect earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + e.getMessage(), + e); + failedDataNodes.add(dataNode); + } + } + + if (LOG.isDebugEnabled()) { + LOG.info( + "Collected earliest timeslots from {} DataNodes: {}, the number of successful DataNodes is {}", + allDataNodes.size(), + earliestTimeslots, + allDataNodes.size() - failedDataNodes.size()); + } + + if (failedDataNodes.size() == allDataNodes.size() + && new HashSet<>(allDataNodes).containsAll(failedDataNodes)) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + } else { + setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS); + } + return Flow.HAS_MORE_STATE; + } + + /** + * Analyze which data partitions are missing based on earliest timeslots. Identify data partitions + * of databases need to be repaired. + */ + private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Analyzing missing data partitions..."); + } + + if (earliestTimeslots.isEmpty()) { + LOG.error( + "No missing data partitions detected, nothing needs to be repaired, terminating procedure"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + // Find all databases that have lost data partition tables + for (Map.Entry<String, Long> entry : earliestTimeslots.entrySet()) { + String database = entry.getKey(); + long earliestTimeslot = entry.getValue(); + + // Get current DataPartitionTable from ConfigManager + Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>> + localDataPartitionTable = getLocalDataPartitionTable(env, database); + + // Check if ConfigNode has a data partition that is associated with the earliestTimeslot + if (localDataPartitionTable == null + || localDataPartitionTable.isEmpty() + || localDataPartitionTable.get(database) == null + || localDataPartitionTable.get(database).isEmpty()) { + lostDataPartitionsOfDatabases.add(database); + LOG.warn( + "No data partition table related to database {} was found from the ConfigNode, and this issue needs to be repaired", + database); + continue; + } + + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>> + seriesPartitionMap = localDataPartitionTable.get(database); + for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>> + seriesPartitionEntry : seriesPartitionMap.entrySet()) { + Map<TTimePartitionSlot, List<TConsensusGroupId>> tTimePartitionSlotListMap = + seriesPartitionEntry.getValue(); + tTimePartitionSlotListMap + .keySet() + .forEach( + slot -> { + if (!TimePartitionUtils.satisfyPartitionId( + slot.getStartTime(), earliestTimeslot)) { + lostDataPartitionsOfDatabases.add(database); + LOG.warn( + "Database {} has lost timeslot {} in its data table partition, and this issue needs to be repaired", + database, + earliestTimeslot); + } + }); + } Review Comment: Choose another plan to change yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
