CRZbulabula commented on code in PR #17279: URL: https://github.com/apache/iotdb/pull/17279#discussion_r2944736652
########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java: ########## @@ -0,0 +1,880 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType; +import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; +import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Procedure for checking and restoring data partition table integrity. This procedure scans all + * DataNodes to detect missing data partitions and restores the DataPartitionTable on the ConfigNode + * Leader. + */ +public class DataPartitionTableIntegrityCheckProcedure + extends StateMachineProcedure< + ConfigNodeProcedureEnv, DataPartitionTableIntegrityCheckProcedureState> { + + private static final Logger LOG = Review Comment: For all log with level > DEBUG, you'd better append '[DataPartitionIntegrity]' prefix, easier for your debug. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java: ########## @@ -3117,4 +3138,349 @@ public TSStatus writeAuditLog(TAuditLogReq req) { public void handleClientExit() { // Do nothing } + + // ==================================================== + // Data Partition Table Integrity Check Implementation + // ==================================================== + + private volatile DataPartitionTableGenerator currentGenerator; + private volatile CompletableFuture<Void> currentGeneratorFuture; + private volatile long currentTaskId = 0; + + @Override + public TGetEarliestTimeslotsResp getEarliestTimeslots() { + TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp(); + + try { + Map<String, Long> earliestTimeslots = new HashMap<>(); + + // Get data directories from configuration + String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + + for (String dataDir : dataDirs) { + File dir = new File(dataDir); + if (dir.exists() && dir.isDirectory()) { + processDataDirectoryForEarliestTimeslots(dir, earliestTimeslots); + } + } + + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setDatabaseToEarliestTimeslot(earliestTimeslots); + + LOGGER.info("Retrieved earliest timeslots for {} databases", earliestTimeslots.size()); + + } catch (Exception e) { + LOGGER.error("Failed to get earliest timeslots", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.GET_EARLIEST_TIMESLOTS, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + + return resp; + } + + @Override + public TGenerateDataPartitionTableResp generateDataPartitionTable( + TGenerateDataPartitionTableReq req) { + TGenerateDataPartitionTableResp resp = new TGenerateDataPartitionTableResp(); + + try { + // Check if there's already a task in the progress + if (currentGenerator != null + && currentGenerator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { + resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); + resp.setMessage("DataPartitionTable generation is already in the progress"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + return resp; + } + + // Create generator for all data directories + int seriesSlotNum = IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum(); + String seriesPartitionExecutorClass = + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(); + + final ExecutorService partitionTableRecoverExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()), + new IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()), + ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + currentGenerator = + new DataPartitionTableGenerator( + partitionTableRecoverExecutor, + req.getDatabases(), + seriesSlotNum, + seriesPartitionExecutorClass); + currentTaskId = System.currentTimeMillis(); + + // Start generation synchronously for now to return the data partition table immediately + currentGeneratorFuture = currentGenerator.startGeneration(); + parseGenerationStatus(resp); + } catch (Exception e) { + LOGGER.error("Failed to generate DataPartitionTable", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.GENERATE_DATA_PARTITION_TABLE, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + + return resp; + } + + @Override + public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat() { + TGenerateDataPartitionTableHeartbeatResp resp = new TGenerateDataPartitionTableHeartbeatResp(); + // Set default value + resp.setDatabaseScopedDataPartitionTables(Collections.emptyList()); + try { + currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + if (currentGenerator == null) { + resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); + resp.setMessage("No DataPartitionTable generation task found"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + return resp; + } + + parseGenerationStatus(resp); + if (currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED)) { + boolean success = false; + List<DatabaseScopedDataPartitionTable> databaseScopedDataPartitionTableList = new ArrayList<>(); + Map<String, DataPartitionTable> dataPartitionTableMap = currentGenerator.getDatabasePartitionTableMap(); + if (!dataPartitionTableMap.isEmpty()) { + for (Map.Entry<String, DataPartitionTable> entry : dataPartitionTableMap.entrySet()) { + String database = entry.getKey(); + DataPartitionTable dataPartitionTable = entry.getValue(); + if (!StringUtils.isEmpty(database) && dataPartitionTable != null) { + DatabaseScopedDataPartitionTable databaseScopedDataPartitionTable = new DatabaseScopedDataPartitionTable(database, dataPartitionTable); + databaseScopedDataPartitionTableList.add(databaseScopedDataPartitionTable); + success = true; + } + } + } + + if (success) { + List<ByteBuffer> result = serializeDatabaseScopedTableList(databaseScopedDataPartitionTableList); + resp.setDatabaseScopedDataPartitionTables(result); + + // Clear current generator + currentGenerator = null; + } + } + } catch (Exception e) { + LOGGER.error("Failed to check DataPartitionTable generation status", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.CHECK_DATA_PARTITION_TABLE_STATUS, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + return resp; + } + + private <T> void parseGenerationStatus(T resp) { + if (resp instanceof TGenerateDataPartitionTableResp) { + handleResponse((TGenerateDataPartitionTableResp) resp); + } else { + handleResponse((TGenerateDataPartitionTableHeartbeatResp) resp); + } + } + + private void handleResponse(TGenerateDataPartitionTableResp resp) { + updateResponse(resp); + } + + private void handleResponse(TGenerateDataPartitionTableHeartbeatResp resp) { + updateResponse(resp); + } + + private <T> void updateResponse(T resp) { + if (currentGenerator == null) return; + + switch (currentGenerator.getStatus()) { + case IN_PROGRESS: + setResponseFields(resp, DataPartitionTableGeneratorState.IN_PROGRESS.getCode(), String.format( + "DataPartitionTable generation in progress: %.1f%%", + currentGenerator.getProgress() * 100), RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + break; + case COMPLETED: + setResponseFields(resp, DataPartitionTableGeneratorState.SUCCESS.getCode(), "DataPartitionTable generation completed successfully", RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + LOGGER.info("DataPartitionTable generation completed with task ID: {}", currentTaskId); + break; + case FAILED: + setResponseFields(resp, DataPartitionTableGeneratorState.FAILED.getCode(), "DataPartitionTable generation failed: " + currentGenerator.getErrorMessage(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + LOGGER.info("DataPartitionTable generation failed with task ID: {}", currentTaskId); + break; + default: + setResponseFields(resp, DataPartitionTableGeneratorState.UNKNOWN.getCode(), "Unknown task status: " + currentGenerator.getStatus(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + LOGGER.info("DataPartitionTable generation failed with task ID: {}", currentTaskId); + break; + } + } + + private <T> void setResponseFields(T resp, int errorCode, String message, TSStatus status) { + try { + Method setErrorCode = resp.getClass().getMethod("setErrorCode", int.class); + Method setMessage = resp.getClass().getMethod("setMessage", String.class); + Method setStatus = resp.getClass().getMethod("setStatus", TSStatus.class); + + setErrorCode.invoke(resp, errorCode); + setMessage.invoke(resp, message); + setStatus.invoke(resp, status); + } catch (Exception e) { + LOGGER.error("Failed to set response fields", e); + } + } + + /** Process data directory to find the earliest timeslots for each database. */ + private void processDataDirectoryForEarliestTimeslots( Review Comment: Better log your conclusion, i.e., the earlist timeslot for each database. ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java: ########## @@ -0,0 +1,880 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType; +import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; +import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Procedure for checking and restoring data partition table integrity. This procedure scans all + * DataNodes to detect missing data partitions and restores the DataPartitionTable on the ConfigNode + * Leader. + */ +public class DataPartitionTableIntegrityCheckProcedure + extends StateMachineProcedure< + ConfigNodeProcedureEnv, DataPartitionTableIntegrityCheckProcedureState> { + + private static final Logger LOG = + LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class); + + private static final int MAX_RETRY_COUNT = 3; + private static final long HEART_BEAT_REQUEST_RATE = 60000; + + NodeManager dataNodeManager; + private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>(); + + // ============Need serialize BEGIN=============/ + /** Collected earliest timeslots from DataNodes: database -> earliest timeslot */ + private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>(); + + /** DataPartitionTables collected from DataNodes: dataNodeId -> DataPartitionTable */ + private Map<Integer, List<DatabaseScopedDataPartitionTable>> dataPartitionTables = new ConcurrentHashMap<>(); + + private Set<String> lostDataPartitionsOfDatabases = new HashSet<>(); + + /** Final merged DataPartitionTable */ + private Map<String, DataPartitionTable> finalDataPartitionTables; + + private static Set<TDataNodeConfiguration> skipDataNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private static Set<TDataNodeConfiguration> failedDataNodes = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + // ============Need serialize END=============/ + + public DataPartitionTableIntegrityCheckProcedure() { + super(); + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws InterruptedException { + try { + // Ensure to get the real-time DataNodes in the current cluster at every step + dataNodeManager = env.getConfigManager().getNodeManager(); + allDataNodes = dataNodeManager.getRegisteredDataNodes(); + + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + failedDataNodes = new HashSet<>(); + return collectEarliestTimeslots(); + case ANALYZE_MISSING_PARTITIONS: + lostDataPartitionsOfDatabases = new HashSet<>(); + return analyzeMissingPartitions(env); + case REQUEST_PARTITION_TABLES: + return requestPartitionTables(); + case REQUEST_PARTITION_TABLES_HEART_BEAT: + return requestPartitionTablesHeartBeat(); + case MERGE_PARTITION_TABLES: + finalDataPartitionTables = new HashMap<>(); + return mergePartitionTables(env); + case WRITE_PARTITION_TABLE_TO_RAFT: + return writePartitionTableToRaft(env); + default: + throw new ProcedureException("Unknown state: " + state); + } + } catch (Exception e) { + LOG.error("Error executing state {}: {}", state, e.getMessage(), e); + setFailure("DataPartitionTableIntegrityCheckProcedure", e); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws IOException, InterruptedException, ProcedureException { + // Cleanup resources + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + earliestTimeslots.clear(); + break; + case ANALYZE_MISSING_PARTITIONS: + lostDataPartitionsOfDatabases.clear(); + break; + case REQUEST_PARTITION_TABLES: + case REQUEST_PARTITION_TABLES_HEART_BEAT: + dataPartitionTables.clear(); + break; + case MERGE_PARTITION_TABLES: + finalDataPartitionTables.clear(); + break; + default: + allDataNodes.clear(); + earliestTimeslots.clear(); + dataPartitionTables.clear(); + finalDataPartitionTables.clear(); + throw new ProcedureException("Unknown state for rollback: " + state); + } + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getState(final int stateId) { + return DataPartitionTableIntegrityCheckProcedureState.values()[stateId]; + } + + @Override + protected int getStateId(final DataPartitionTableIntegrityCheckProcedureState state) { + return state.ordinal(); + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getInitialState() { + skipDataNodes = new HashSet<>(); + failedDataNodes = new HashSet<>(); + return DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS; + } + + /** + * Collect earliest timeslot information from all DataNodes. Each DataNode returns a Map<String, + * Long> where key is database name and value is the earliest timeslot id. + */ + private Flow collectEarliestTimeslots() { + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting earliest timeslots from all DataNodes..."); + } + + if (allDataNodes.isEmpty()) { + LOG.error( + "No DataNodes registered, no way to collect earliest timeslots, waiting for them to go up"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + // Collect earliest timeslots from all DataNodes + allDataNodes.removeAll(skipDataNodes); + for (TDataNodeConfiguration dataNode : allDataNodes) { + try { + TGetEarliestTimeslotsResp resp = + (TGetEarliestTimeslotsResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS, + MAX_RETRY_COUNT); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failedDataNodes.add(dataNode); + LOG.error( + "Failed to collected earliest timeslots from the DataNode[id={}], response status is {}", + dataNode.getLocation().getDataNodeId(), + resp.getStatus()); + continue; + } + + Map<String, Long> nodeTimeslots = resp.getDatabaseToEarliestTimeslot(); + + // Merge with existing timeslots (take minimum) + for (Map.Entry<String, Long> entry : nodeTimeslots.entrySet()) { + earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Collected earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + nodeTimeslots); + } + } catch (Exception e) { + LOG.error( + "Failed to collect earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + e.getMessage(), + e); + failedDataNodes.add(dataNode); + } + } + + if (LOG.isDebugEnabled()) { + LOG.info( + "Collected earliest timeslots from {} DataNodes: {}, the number of successful DataNodes is {}", + allDataNodes.size(), + earliestTimeslots, + allDataNodes.size() - failedDataNodes.size()); + } + + if (failedDataNodes.size() == allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + } else { + setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS); + } + return Flow.HAS_MORE_STATE; + } + + /** + * Analyze which data partitions are missing based on earliest timeslots. Identify data partitions + * of databases need to be repaired. + */ + private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Analyzing missing data partitions..."); + } + + if (earliestTimeslots.isEmpty()) { + LOG.warn( + "No missing data partitions detected, nothing needs to be repaired, terminating procedure"); + return Flow.NO_MORE_STATE; + } + + // Find all databases that have lost data partition tables + for (Map.Entry<String, Long> entry : earliestTimeslots.entrySet()) { + String database = entry.getKey(); + long earliestTimeslot = entry.getValue(); + + // Get current DataPartitionTable from ConfigManager + Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>> + localDataPartitionTable = getLocalDataPartitionTable(env, database); + + // Check if ConfigNode has a data partition that is associated with the earliestTimeslot + if (localDataPartitionTable == null + || localDataPartitionTable.isEmpty() + || localDataPartitionTable.get(database) == null + || localDataPartitionTable.get(database).isEmpty()) { + lostDataPartitionsOfDatabases.add(database); + LOG.warn( + "No data partition table related to database {} was found from the ConfigNode, and this issue needs to be repaired", + database); + continue; + } + + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>> + seriesPartitionMap = localDataPartitionTable.get(database); + for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>> + seriesPartitionEntry : seriesPartitionMap.entrySet()) { + Map<TTimePartitionSlot, List<TConsensusGroupId>> tTimePartitionSlotListMap = + seriesPartitionEntry.getValue(); + + if (tTimePartitionSlotListMap.isEmpty()) { + continue; + } + + TTimePartitionSlot localEarliestSlot = tTimePartitionSlotListMap.keySet() + .stream() + .min(Comparator.comparingLong(TTimePartitionSlot::getStartTime)) + .orElse(null); + + if (!TimePartitionUtils.satisfyPartitionId(localEarliestSlot.getStartTime(), earliestTimeslot)) { + lostDataPartitionsOfDatabases.add(database); + LOG.warn( + "Database {} has lost timeslot {} in its data table partition, and this issue needs to be repaired", + database, + earliestTimeslot); + } + } + } + + if (lostDataPartitionsOfDatabases.isEmpty()) { + LOG.info("No databases have lost data partitions, terminating procedure"); + return Flow.NO_MORE_STATE; + } + + LOG.info( + "Identified {} databases have lost data partitions, will request DataPartitionTable generation from {} DataNodes", + lostDataPartitionsOfDatabases.size(), + allDataNodes.size() - failedDataNodes.size()); + setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); + return Flow.HAS_MORE_STATE; + } + + private Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>> + getLocalDataPartitionTable(final ConfigNodeProcedureEnv env, final String database) { + Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable = + env.getConfigManager() + .getSchemaPartition(Collections.singletonMap(database, Collections.emptyList())) + .getSchemaPartitionTable(); + + // Construct request for getting data partition + final Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>(); + schemaPartitionTable.forEach( + (key, value) -> { + Map<TSeriesPartitionSlot, TTimeSlotList> slotListMap = new HashMap<>(); + value + .keySet() + .forEach( + slot -> + slotListMap.put( + slot, new TTimeSlotList(Collections.emptyList(), true, true))); + partitionSlotsMap.put(key, slotListMap); + }); + final GetDataPartitionPlan getDataPartitionPlan = new GetDataPartitionPlan(partitionSlotsMap); + return env.getConfigManager().getDataPartition(getDataPartitionPlan).getDataPartitionTable(); + } + + /** + * Request DataPartitionTable generation from target DataNodes. Each DataNode scans its tsfile + * resources and generates a DataPartitionTable. + */ + private Flow requestPartitionTables() { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Requesting DataPartitionTable generation from {} DataNodes...", allDataNodes.size()); + } + + if (allDataNodes.isEmpty()) { + LOG.error( + "No DataNodes registered, no way to requested DataPartitionTable generation, terminating procedure"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + allDataNodes.removeAll(skipDataNodes); + allDataNodes.removeAll(failedDataNodes); + for (TDataNodeConfiguration dataNode : allDataNodes) { + int dataNodeId = dataNode.getLocation().getDataNodeId(); + if (!dataPartitionTables.containsKey(dataNodeId)) { + try { + TGenerateDataPartitionTableReq req = new TGenerateDataPartitionTableReq(); + req.setDatabases(lostDataPartitionsOfDatabases); + TGenerateDataPartitionTableResp resp = + (TGenerateDataPartitionTableResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + req, + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE, + MAX_RETRY_COUNT); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failedDataNodes.add(dataNode); + LOG.error( + "Failed to request DataPartitionTable generation from the DataNode[id={}], response status is {}", + dataNode.getLocation().getDataNodeId(), + resp.getStatus()); + } + } catch (Exception e) { + failedDataNodes.add(dataNode); + LOG.error( + "Failed to request DataPartitionTable generation from DataNode[id={}]: {}", + dataNodeId, + e.getMessage(), + e); + } + } + } + + if (failedDataNodes.size() == allDataNodes.size() + && new HashSet<>(allDataNodes).containsAll(failedDataNodes)) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT); + return Flow.HAS_MORE_STATE; + } + + private Flow requestPartitionTablesHeartBeat() { + if (LOG.isDebugEnabled()) { + LOG.info("Checking DataPartitionTable generation completion status..."); + } + + int completeCount = 0; + for (TDataNodeConfiguration dataNode : allDataNodes) { + int dataNodeId = dataNode.getLocation().getDataNodeId(); + + if (!dataPartitionTables.containsKey(dataNodeId)) { + try { + TGenerateDataPartitionTableHeartbeatResp resp = + (TGenerateDataPartitionTableHeartbeatResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + MAX_RETRY_COUNT); + DataPartitionTableGeneratorState state = + DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode()); + + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOG.error( + "Failed to request DataPartitionTable generation heart beat from the DataNode[id={}], state is {}, response status is {}", + dataNode.getLocation().getDataNodeId(), + state, + resp.getStatus()); + continue; + } + + switch (state) { + case SUCCESS: + List<ByteBuffer> byteBufferList = resp.getDatabaseScopedDataPartitionTables(); + List<DatabaseScopedDataPartitionTable> databaseScopedDataPartitionTableList = deserializeDatabaseScopedTableList(byteBufferList); + dataPartitionTables.put(dataNodeId, databaseScopedDataPartitionTableList); + LOG.info( + "DataNode {} completed DataPartitionTable generation, terminating heart beat", + dataNodeId); + completeCount++; + break; + case IN_PROGRESS: + LOG.info("DataNode {} still generating DataPartitionTable", dataNodeId); + break; + default: + LOG.error( + "DataNode {} returned unknown error code: {}", dataNodeId, resp.getErrorCode()); + break; + } + } catch (Exception e) { + LOG.error( + "Error checking DataPartitionTable status from DataNode {}: {}, terminating heart beat", + dataNodeId, + e.getMessage(), + e); + completeCount++; + } + } else { + completeCount++; + } + } + + if (completeCount >= allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.MERGE_PARTITION_TABLES); + return Flow.HAS_MORE_STATE; + } + + try { + Thread.sleep(HEART_BEAT_REQUEST_RATE); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Error checking DataPartitionTable status due to thread interruption."); + } + setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); + return Flow.HAS_MORE_STATE; Review Comment: ```suggestion setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT); return Flow.HAS_MORE_STATE; ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java: ########## @@ -1219,6 +1219,11 @@ public class IoTDBConfig { private long maxObjectSizeInByte = 4 * 1024 * 1024 * 1024L; + /* Need use these parameters when repair data partition table */ + private int partitionTableRecoverWorkerNum = 10; + // Rate limit set to 10 MB/s + private int partitionTableRecoverMaxReadBytesPerSecond = 10; Review Comment: MaxReadMBs instead of `Bytes` ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java: ########## @@ -0,0 +1,880 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType; +import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; +import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Procedure for checking and restoring data partition table integrity. This procedure scans all + * DataNodes to detect missing data partitions and restores the DataPartitionTable on the ConfigNode + * Leader. + */ +public class DataPartitionTableIntegrityCheckProcedure + extends StateMachineProcedure< + ConfigNodeProcedureEnv, DataPartitionTableIntegrityCheckProcedureState> { + + private static final Logger LOG = + LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class); + + private static final int MAX_RETRY_COUNT = 3; + private static final long HEART_BEAT_REQUEST_RATE = 60000; + + NodeManager dataNodeManager; + private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>(); + + // ============Need serialize BEGIN=============/ + /** Collected earliest timeslots from DataNodes: database -> earliest timeslot */ + private Map<String, Long> earliestTimeslots = new ConcurrentHashMap<>(); + + /** DataPartitionTables collected from DataNodes: dataNodeId -> DataPartitionTable */ + private Map<Integer, List<DatabaseScopedDataPartitionTable>> dataPartitionTables = new ConcurrentHashMap<>(); + + private Set<String> lostDataPartitionsOfDatabases = new HashSet<>(); + + /** Final merged DataPartitionTable */ + private Map<String, DataPartitionTable> finalDataPartitionTables; + + private static Set<TDataNodeConfiguration> skipDataNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private static Set<TDataNodeConfiguration> failedDataNodes = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + // ============Need serialize END=============/ + + public DataPartitionTableIntegrityCheckProcedure() { + super(); + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws InterruptedException { + try { + // Ensure to get the real-time DataNodes in the current cluster at every step + dataNodeManager = env.getConfigManager().getNodeManager(); + allDataNodes = dataNodeManager.getRegisteredDataNodes(); + + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + failedDataNodes = new HashSet<>(); + return collectEarliestTimeslots(); + case ANALYZE_MISSING_PARTITIONS: + lostDataPartitionsOfDatabases = new HashSet<>(); + return analyzeMissingPartitions(env); + case REQUEST_PARTITION_TABLES: + return requestPartitionTables(); + case REQUEST_PARTITION_TABLES_HEART_BEAT: + return requestPartitionTablesHeartBeat(); + case MERGE_PARTITION_TABLES: + finalDataPartitionTables = new HashMap<>(); + return mergePartitionTables(env); + case WRITE_PARTITION_TABLE_TO_RAFT: + return writePartitionTableToRaft(env); + default: + throw new ProcedureException("Unknown state: " + state); + } + } catch (Exception e) { + LOG.error("Error executing state {}: {}", state, e.getMessage(), e); + setFailure("DataPartitionTableIntegrityCheckProcedure", e); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws IOException, InterruptedException, ProcedureException { + // Cleanup resources + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + earliestTimeslots.clear(); + break; + case ANALYZE_MISSING_PARTITIONS: + lostDataPartitionsOfDatabases.clear(); + break; + case REQUEST_PARTITION_TABLES: + case REQUEST_PARTITION_TABLES_HEART_BEAT: + dataPartitionTables.clear(); + break; + case MERGE_PARTITION_TABLES: + finalDataPartitionTables.clear(); + break; + default: + allDataNodes.clear(); + earliestTimeslots.clear(); + dataPartitionTables.clear(); + finalDataPartitionTables.clear(); + throw new ProcedureException("Unknown state for rollback: " + state); + } + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getState(final int stateId) { + return DataPartitionTableIntegrityCheckProcedureState.values()[stateId]; + } + + @Override + protected int getStateId(final DataPartitionTableIntegrityCheckProcedureState state) { + return state.ordinal(); + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getInitialState() { + skipDataNodes = new HashSet<>(); + failedDataNodes = new HashSet<>(); + return DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS; + } + + /** + * Collect earliest timeslot information from all DataNodes. Each DataNode returns a Map<String, + * Long> where key is database name and value is the earliest timeslot id. + */ + private Flow collectEarliestTimeslots() { + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting earliest timeslots from all DataNodes..."); + } + + if (allDataNodes.isEmpty()) { + LOG.error( + "No DataNodes registered, no way to collect earliest timeslots, waiting for them to go up"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + // Collect earliest timeslots from all DataNodes + allDataNodes.removeAll(skipDataNodes); + for (TDataNodeConfiguration dataNode : allDataNodes) { + try { + TGetEarliestTimeslotsResp resp = + (TGetEarliestTimeslotsResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS, + MAX_RETRY_COUNT); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failedDataNodes.add(dataNode); + LOG.error( + "Failed to collected earliest timeslots from the DataNode[id={}], response status is {}", + dataNode.getLocation().getDataNodeId(), + resp.getStatus()); + continue; + } + + Map<String, Long> nodeTimeslots = resp.getDatabaseToEarliestTimeslot(); + + // Merge with existing timeslots (take minimum) + for (Map.Entry<String, Long> entry : nodeTimeslots.entrySet()) { + earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Collected earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + nodeTimeslots); + } + } catch (Exception e) { + LOG.error( + "Failed to collect earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + e.getMessage(), + e); + failedDataNodes.add(dataNode); + } + } + + if (LOG.isDebugEnabled()) { + LOG.info( + "Collected earliest timeslots from {} DataNodes: {}, the number of successful DataNodes is {}", + allDataNodes.size(), + earliestTimeslots, + allDataNodes.size() - failedDataNodes.size()); + } + + if (failedDataNodes.size() == allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + } else { + setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS); + } + return Flow.HAS_MORE_STATE; + } + + /** + * Analyze which data partitions are missing based on earliest timeslots. Identify data partitions + * of databases need to be repaired. + */ + private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Analyzing missing data partitions..."); + } + + if (earliestTimeslots.isEmpty()) { + LOG.warn( + "No missing data partitions detected, nothing needs to be repaired, terminating procedure"); + return Flow.NO_MORE_STATE; + } + + // Find all databases that have lost data partition tables + for (Map.Entry<String, Long> entry : earliestTimeslots.entrySet()) { + String database = entry.getKey(); + long earliestTimeslot = entry.getValue(); + + // Get current DataPartitionTable from ConfigManager + Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>> + localDataPartitionTable = getLocalDataPartitionTable(env, database); + + // Check if ConfigNode has a data partition that is associated with the earliestTimeslot + if (localDataPartitionTable == null + || localDataPartitionTable.isEmpty() + || localDataPartitionTable.get(database) == null + || localDataPartitionTable.get(database).isEmpty()) { + lostDataPartitionsOfDatabases.add(database); + LOG.warn( + "No data partition table related to database {} was found from the ConfigNode, and this issue needs to be repaired", + database); + continue; + } + + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>> + seriesPartitionMap = localDataPartitionTable.get(database); + for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>> + seriesPartitionEntry : seriesPartitionMap.entrySet()) { + Map<TTimePartitionSlot, List<TConsensusGroupId>> tTimePartitionSlotListMap = + seriesPartitionEntry.getValue(); + + if (tTimePartitionSlotListMap.isEmpty()) { + continue; + } + + TTimePartitionSlot localEarliestSlot = tTimePartitionSlotListMap.keySet() + .stream() + .min(Comparator.comparingLong(TTimePartitionSlot::getStartTime)) + .orElse(null); + + if (!TimePartitionUtils.satisfyPartitionId(localEarliestSlot.getStartTime(), earliestTimeslot)) { Review Comment: Judge whether earliestTimeSlot.startTime < localEarliestSlot.startTime ########## iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java: ########## @@ -134,6 +157,14 @@ public static boolean satisfyPartitionId(long startTime, long endTime, long part return startPartition <= partitionId && endPartition >= partitionId; } + public static boolean satisfyPartitionId(long startTime, long partitionId) { Review Comment: No need to update this class in our current implementation. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java: ########## @@ -414,6 +430,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface private static final String SYSTEM = "system"; + private Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>(); + + // Must be lower than the RPC request timeout, in milliseconds + private static final long timeoutMs = 50000; Review Comment: DO NOT append member variables easily. ########## iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java: ########## @@ -81,6 +83,8 @@ public class DataNodeInternalRPCServiceImplTest { + private static final Logger LOG = Review Comment: Why change this file? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java: ########## @@ -414,6 +427,34 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface private static final String SYSTEM = "system"; + private final ExecutorService findEarliestTimeSlotExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()), + new IoTThreadFactory(ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName()), + ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + private final ExecutorService partitionTableRecoverExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()), + new IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()), + ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + private Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>(); Review Comment: Change this map into a local variable. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java: ########## @@ -3117,4 +3138,349 @@ public TSStatus writeAuditLog(TAuditLogReq req) { public void handleClientExit() { // Do nothing } + + // ==================================================== + // Data Partition Table Integrity Check Implementation + // ==================================================== + + private volatile DataPartitionTableGenerator currentGenerator; + private volatile CompletableFuture<Void> currentGeneratorFuture; + private volatile long currentTaskId = 0; + + @Override + public TGetEarliestTimeslotsResp getEarliestTimeslots() { + TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp(); + + try { + Map<String, Long> earliestTimeslots = new HashMap<>(); + + // Get data directories from configuration + String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + + for (String dataDir : dataDirs) { + File dir = new File(dataDir); + if (dir.exists() && dir.isDirectory()) { + processDataDirectoryForEarliestTimeslots(dir, earliestTimeslots); + } + } + + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setDatabaseToEarliestTimeslot(earliestTimeslots); + + LOGGER.info("Retrieved earliest timeslots for {} databases", earliestTimeslots.size()); + + } catch (Exception e) { + LOGGER.error("Failed to get earliest timeslots", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.GET_EARLIEST_TIMESLOTS, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + + return resp; + } + + @Override + public TGenerateDataPartitionTableResp generateDataPartitionTable( + TGenerateDataPartitionTableReq req) { + TGenerateDataPartitionTableResp resp = new TGenerateDataPartitionTableResp(); + + try { + // Check if there's already a task in the progress + if (currentGenerator != null + && currentGenerator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { + resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); + resp.setMessage("DataPartitionTable generation is already in the progress"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + return resp; + } + + // Create generator for all data directories + int seriesSlotNum = IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum(); + String seriesPartitionExecutorClass = + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(); + + final ExecutorService partitionTableRecoverExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()), + new IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()), + ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + currentGenerator = + new DataPartitionTableGenerator( + partitionTableRecoverExecutor, + req.getDatabases(), + seriesSlotNum, + seriesPartitionExecutorClass); + currentTaskId = System.currentTimeMillis(); + + // Start generation synchronously for now to return the data partition table immediately + currentGeneratorFuture = currentGenerator.startGeneration(); + parseGenerationStatus(resp); + } catch (Exception e) { + LOGGER.error("Failed to generate DataPartitionTable", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.GENERATE_DATA_PARTITION_TABLE, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + + return resp; + } + + @Override + public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat() { + TGenerateDataPartitionTableHeartbeatResp resp = new TGenerateDataPartitionTableHeartbeatResp(); + // Set default value + resp.setDatabaseScopedDataPartitionTables(Collections.emptyList()); + try { + currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + if (currentGenerator == null) { + resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); + resp.setMessage("No DataPartitionTable generation task found"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + return resp; + } + + parseGenerationStatus(resp); + if (currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED)) { + boolean success = false; + List<DatabaseScopedDataPartitionTable> databaseScopedDataPartitionTableList = new ArrayList<>(); + Map<String, DataPartitionTable> dataPartitionTableMap = currentGenerator.getDatabasePartitionTableMap(); + if (!dataPartitionTableMap.isEmpty()) { + for (Map.Entry<String, DataPartitionTable> entry : dataPartitionTableMap.entrySet()) { + String database = entry.getKey(); + DataPartitionTable dataPartitionTable = entry.getValue(); + if (!StringUtils.isEmpty(database) && dataPartitionTable != null) { + DatabaseScopedDataPartitionTable databaseScopedDataPartitionTable = new DatabaseScopedDataPartitionTable(database, dataPartitionTable); + databaseScopedDataPartitionTableList.add(databaseScopedDataPartitionTable); + success = true; + } + } + } + + if (success) { + List<ByteBuffer> result = serializeDatabaseScopedTableList(databaseScopedDataPartitionTableList); + resp.setDatabaseScopedDataPartitionTables(result); + + // Clear current generator + currentGenerator = null; + } + } + } catch (Exception e) { + LOGGER.error("Failed to check DataPartitionTable generation status", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.CHECK_DATA_PARTITION_TABLE_STATUS, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + return resp; + } + + private <T> void parseGenerationStatus(T resp) { + if (resp instanceof TGenerateDataPartitionTableResp) { + handleResponse((TGenerateDataPartitionTableResp) resp); + } else { + handleResponse((TGenerateDataPartitionTableHeartbeatResp) resp); + } + } + + private void handleResponse(TGenerateDataPartitionTableResp resp) { + updateResponse(resp); + } + + private void handleResponse(TGenerateDataPartitionTableHeartbeatResp resp) { + updateResponse(resp); + } + + private <T> void updateResponse(T resp) { + if (currentGenerator == null) return; + + switch (currentGenerator.getStatus()) { + case IN_PROGRESS: + setResponseFields(resp, DataPartitionTableGeneratorState.IN_PROGRESS.getCode(), String.format( + "DataPartitionTable generation in progress: %.1f%%", Review Comment: It is good to left progress record. You'd better log it for retrieving. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
