zerolbsony commented on code in PR #17279:
URL: https://github.com/apache/iotdb/pull/17279#discussion_r2957696711
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java:
##########
@@ -3117,4 +3136,350 @@ public TSStatus writeAuditLog(TAuditLogReq req) {
public void handleClientExit() {
// Do nothing
}
+
+ // ====================================================
+ // Data Partition Table Integrity Check Implementation
+ // ====================================================
+
+ private volatile DataPartitionTableGenerator currentGenerator;
+ private volatile CompletableFuture<Void> currentGeneratorFuture;
+ private volatile long currentTaskId = 0;
+
+ @Override
+ public TGetEarliestTimeslotsResp getEarliestTimeslots() {
+ TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp();
+
+ try {
+ Map<String, Long> earliestTimeslots = new HashMap<>();
+
+ // Get data directories from configuration
+ String[] dataDirs =
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+
+ for (String dataDir : dataDirs) {
+ File dir = new File(dataDir);
+ if (dir.exists() && dir.isDirectory()) {
+ processDataDirectoryForEarliestTimeslots(dir, earliestTimeslots);
+ }
+ }
+
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ resp.setDatabaseToEarliestTimeslot(earliestTimeslots);
+
+ LOGGER.info("Retrieved earliest timeslots for {} databases",
earliestTimeslots.size());
+
+ } catch (Exception e) {
+ LOGGER.error("Failed to get earliest timeslots", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.GET_EARLIEST_TIMESLOTS,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+
+ return resp;
+ }
+
+ @Override
+ public TGenerateDataPartitionTableResp generateDataPartitionTable(
+ TGenerateDataPartitionTableReq req) {
+ TGenerateDataPartitionTableResp resp = new
TGenerateDataPartitionTableResp();
+
+ try {
+ // Check if there's already a task in the progress
+ if (currentGenerator != null
+ && currentGenerator.getStatus() ==
DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) {
+
resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode());
+ resp.setMessage("DataPartitionTable generation is already in the
progress");
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ return resp;
+ }
+
+ // Create generator for all data directories
+ int seriesSlotNum =
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
+ String seriesPartitionExecutorClass =
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass();
+
+ final ExecutorService partitionTableRecoverExecutor =
+ new WrappedThreadPoolExecutor(
+ 0,
+
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(),
+ 0L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(
+
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()),
+ new
IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()),
+ ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ currentGenerator =
+ new DataPartitionTableGenerator(
+ partitionTableRecoverExecutor,
+ req.getDatabases(),
+ seriesSlotNum,
+ seriesPartitionExecutorClass);
+ currentTaskId = System.currentTimeMillis();
+
+ // Start generation synchronously for now to return the data partition
table immediately
+ currentGeneratorFuture = currentGenerator.startGeneration();
+ parseGenerationStatus(resp);
+ } catch (Exception e) {
+ LOGGER.error("Failed to generate DataPartitionTable", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.GENERATE_DATA_PARTITION_TABLE,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+
+ return resp;
+ }
+
+ @Override
+ public TGenerateDataPartitionTableHeartbeatResp
generateDataPartitionTableHeartbeat() {
+ TGenerateDataPartitionTableHeartbeatResp resp = new
TGenerateDataPartitionTableHeartbeatResp();
+ // Set default value
+ resp.setDatabaseScopedDataPartitionTables(Collections.emptyList());
+ try {
+ currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
+ if (currentGenerator == null) {
+ resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
+ resp.setMessage("No DataPartitionTable generation task found");
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ return resp;
+ }
+
+ parseGenerationStatus(resp);
+ if
(currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED))
{
+ boolean success = false;
+ List<DatabaseScopedDataPartitionTable>
databaseScopedDataPartitionTableList = new ArrayList<>();
+ Map<String, DataPartitionTable> dataPartitionTableMap =
currentGenerator.getDatabasePartitionTableMap();
+ if (!dataPartitionTableMap.isEmpty()) {
+ for (Map.Entry<String, DataPartitionTable> entry :
dataPartitionTableMap.entrySet()) {
+ String database = entry.getKey();
+ DataPartitionTable dataPartitionTable = entry.getValue();
+ if (!StringUtils.isEmpty(database) && dataPartitionTable != null) {
+ DatabaseScopedDataPartitionTable
databaseScopedDataPartitionTable = new
DatabaseScopedDataPartitionTable(database, dataPartitionTable);
+
databaseScopedDataPartitionTableList.add(databaseScopedDataPartitionTable);
+ success = true;
+ }
+ }
+ }
+
+ if (success) {
+ List<ByteBuffer> result =
serializeDatabaseScopedTableList(databaseScopedDataPartitionTableList);
+ resp.setDatabaseScopedDataPartitionTables(result);
+
+ // Clear current generator
+ currentGenerator = null;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to check DataPartitionTable generation status", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.CHECK_DATA_PARTITION_TABLE_STATUS,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+ return resp;
+ }
+
+ private <T> void parseGenerationStatus(T resp) {
+ if (resp instanceof TGenerateDataPartitionTableResp) {
+ handleResponse((TGenerateDataPartitionTableResp) resp);
+ } else {
+ handleResponse((TGenerateDataPartitionTableHeartbeatResp) resp);
+ }
+ }
+
+ private void handleResponse(TGenerateDataPartitionTableResp resp) {
+ updateResponse(resp);
+ }
+
+ private void handleResponse(TGenerateDataPartitionTableHeartbeatResp resp) {
+ updateResponse(resp);
+ }
+
+ private <T> void updateResponse(T resp) {
+ if (currentGenerator == null) return;
+
+ switch (currentGenerator.getStatus()) {
+ case IN_PROGRESS:
+ setResponseFields(resp,
DataPartitionTableGeneratorState.IN_PROGRESS.getCode(), String.format(
+ "DataPartitionTable generation in progress: %.1f%%",
+ currentGenerator.getProgress() * 100),
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ break;
+ case COMPLETED:
+ setResponseFields(resp,
DataPartitionTableGeneratorState.SUCCESS.getCode(), "DataPartitionTable
generation completed successfully",
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ LOGGER.info("DataPartitionTable generation completed with task ID:
{}", currentTaskId);
+ break;
+ case FAILED:
+ setResponseFields(resp,
DataPartitionTableGeneratorState.FAILED.getCode(), "DataPartitionTable
generation failed: " + currentGenerator.getErrorMessage(),
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ LOGGER.info("DataPartitionTable generation failed with task ID: {}",
currentTaskId);
+ break;
+ default:
+ setResponseFields(resp,
DataPartitionTableGeneratorState.UNKNOWN.getCode(), "Unknown task status: " +
currentGenerator.getStatus(),
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ LOGGER.info("DataPartitionTable generation failed with task ID: {}",
currentTaskId);
+ break;
+ }
+ }
+
+ private <T> void setResponseFields(T resp, int errorCode, String message,
TSStatus status) {
+ try {
+ Method setErrorCode = resp.getClass().getMethod("setErrorCode",
int.class);
+ Method setMessage = resp.getClass().getMethod("setMessage",
String.class);
+ Method setStatus = resp.getClass().getMethod("setStatus",
TSStatus.class);
+
+ setErrorCode.invoke(resp, errorCode);
+ setMessage.invoke(resp, message);
+ setStatus.invoke(resp, status);
+ } catch (Exception e) {
+ LOGGER.error("Failed to set response fields", e);
+ }
+ }
+
+ /** Process data directory to find the earliest timeslots for each database.
*/
+ private void processDataDirectoryForEarliestTimeslots(
+ File dataDir, Map<String, Long> earliestTimeslots) {
+ Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>();
+ try (Stream<Path> sequenceTypePaths = Files.list(dataDir.toPath())) {
+ sequenceTypePaths
+ .filter(Files::isDirectory)
+ .forEach(
+ sequenceTypePath -> {
+ try (Stream<Path> dbPaths = Files.list(sequenceTypePath)) {
+ dbPaths
+ .filter(Files::isDirectory)
+ .forEach(
+ dbPath -> {
+ String databaseName =
dbPath.getFileName().toString();
+ if
(DataPartitionTableGenerator.IGNORE_DATABASE.contains(
+ databaseName)) {
+ return;
+ }
+ databaseEarliestRegionMap.computeIfAbsent(
+ databaseName, key -> Long.MAX_VALUE);
+ long earliestTimeslot =
findEarliestTimeslotInDatabase(dbPath.toFile(), databaseEarliestRegionMap);
+
+ if (earliestTimeslot != Long.MAX_VALUE) {
+ earliestTimeslots.merge(databaseName,
earliestTimeslot, Math::min);
+ }
+ });
+ } catch (IOException e) {
+ LOGGER.error(
+ "Failed to process data directory: {}",
sequenceTypePath.toFile(), e);
+ }
+ });
+ } catch (IOException e) {
+ LOGGER.error("Failed to process data directory: {}", dataDir, e);
+ }
+ }
+
+ /** Find the earliest timeslot in a database directory. */
+ private long findEarliestTimeslotInDatabase(File databaseDir, Map<String,
Long> databaseEarliestRegionMap) {
+ String databaseName = databaseDir.getName();
+ List<Future<?>> futureList = new ArrayList<>();
+
+ final ExecutorService findEarliestTimeSlotExecutor =
+ new WrappedThreadPoolExecutor(
+ 0,
+
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(),
+ 0L,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(
+
IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()),
+ new
IoTThreadFactory(ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName()),
+ ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ try (Stream<Path> databasePaths = Files.list(databaseDir.toPath())) {
+ databasePaths
+ .filter(Files::isDirectory)
+ .forEach(
+ regionPath -> {
+ Future<?> future =
+ findEarliestTimeSlotExecutor.submit(
+ () -> {
+ try (Stream<Path> regionPaths =
Files.list(regionPath)) {
+ regionPaths
+ .filter(Files::isDirectory)
+ .forEach(
+ timeSlotPath -> {
+ try {
+ Optional<Path> matchedFile =
+ Files.find(
+ timeSlotPath,
+ 1,
+ (path, attrs) ->
+ attrs.isRegularFile()
+ && path.toString()
+ .endsWith(
+
DataPartitionTableGenerator
+
.SCAN_FILE_SUFFIX_NAME))
+ .findFirst();
+ if (!matchedFile.isPresent()) {
+ return;
+ }
+ String timeSlotName =
timeSlotPath.getFileName().toString();
+ long timeslot =
Long.parseLong(timeSlotName);
+ databaseEarliestRegionMap.compute(
+ databaseName,
+ (k, v) ->
+ v == null ? timeslot :
Math.min(v, timeslot));
+ } catch (IOException e) {
+ LOGGER.error(
+ "Failed to find any {} files in
the {} directory",
+
DataPartitionTableGenerator.SCAN_FILE_SUFFIX_NAME,
+ timeSlotPath,
+ e);
+ }
+ });
+ } catch (IOException e) {
+ LOGGER.error("Failed to scan {}", regionPath, e);
+ }
+ });
+ futureList.add(future);
+ });
+ } catch (IOException e) {
+ LOGGER.error("Failed to walk database directory: {}", databaseDir, e);
+ }
+
+ for (Future<?> future : futureList) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Failed to wait for task completion", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ findEarliestTimeSlotExecutor.shutdownNow();
+ return databaseEarliestRegionMap.get(databaseName);
+ }
+
+ private List<ByteBuffer>
serializeDatabaseScopedTableList(List<DatabaseScopedDataPartitionTable> list) {
+ if (list == null || list.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<ByteBuffer> result = new ArrayList<>(list.size());
+
+ for (DatabaseScopedDataPartitionTable table : list) {
+ try (PublicBAOS baos = new PublicBAOS();
+ DataOutputStream oos = new DataOutputStream(baos)) {
+
+ TTransport transport = new TIOStreamTransport(oos);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+
+ table.serialize(oos, protocol);
+
+ result.add(ByteBuffer.wrap(baos.toByteArray()));
Review Comment:
Changed yet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]