zerolbsony commented on code in PR #17279:
URL: https://github.com/apache/iotdb/pull/17279#discussion_r2944096902
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java:
##########
@@ -3117,4 +3158,292 @@ public TSStatus writeAuditLog(TAuditLogReq req) {
public void handleClientExit() {
// Do nothing
}
+
+ // ====================================================
+ // Data Partition Table Integrity Check Implementation
+ // ====================================================
+
+ private volatile DataPartitionTableGenerator currentGenerator;
+ private volatile long currentTaskId = 0;
+
+ @Override
+ public TGetEarliestTimeslotsResp getEarliestTimeslots() {
+ TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp();
+
+ try {
+ Map<String, Long> earliestTimeslots = new HashMap<>();
+
+ // Get data directories from configuration
+ String[] dataDirs =
IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+
+ for (String dataDir : dataDirs) {
+ File dir = new File(dataDir);
+ if (dir.exists() && dir.isDirectory()) {
+ processDataDirectoryForEarliestTimeslots(dir, earliestTimeslots);
+ }
+ }
+
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ resp.setDatabaseToEarliestTimeslot(earliestTimeslots);
+
+ LOGGER.info("Retrieved earliest timeslots for {} databases",
earliestTimeslots.size());
+
+ } catch (Exception e) {
+ LOGGER.error("Failed to get earliest timeslots", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.GET_EARLIEST_TIMESLOTS,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+
+ return resp;
+ }
+
+ @Override
+ public TGenerateDataPartitionTableResp generateDataPartitionTable(
+ TGenerateDataPartitionTableReq req) {
+ TGenerateDataPartitionTableResp resp = new
TGenerateDataPartitionTableResp();
+ byte[] empty = new byte[0];
+
+ try {
+ // Check if there's already a task in the progress
+ if (currentGenerator != null
+ && currentGenerator.getStatus() ==
DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) {
+ resp.setDataPartitionTable(empty);
+
resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode());
+ resp.setMessage("DataPartitionTable generation is already in the
progress");
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ return resp;
+ }
+
+ // Create generator for all data directories
+ int seriesSlotNum =
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
+ String seriesPartitionExecutorClass =
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass();
+
+ currentGenerator =
+ new DataPartitionTableGenerator(
+ partitionTableRecoverExecutor,
+ req.getDatabases(),
+ seriesSlotNum,
+ seriesPartitionExecutorClass);
+ currentTaskId = System.currentTimeMillis();
+
+ // Start generation synchronously for now to return the data partition
table immediately
+ currentGenerator.startGeneration().get(timeoutMs, TimeUnit.MILLISECONDS);
+
+ if (currentGenerator != null) {
+ switch (currentGenerator.getStatus()) {
+ case IN_PROGRESS:
+ resp.setDataPartitionTable(empty);
+
resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode());
+ resp.setMessage("DataPartitionTable generation interrupted");
+
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ break;
+ case COMPLETED:
+ DataPartitionTable dataPartitionTable =
currentGenerator.getDataPartitionTable();
+ if (dataPartitionTable != null) {
+ byte[] result = serializeDataPartitionTable(dataPartitionTable);
+ resp.setDataPartitionTable(result);
+ }
+
+
resp.setErrorCode(DataPartitionTableGeneratorState.SUCCESS.getCode());
+ resp.setMessage("DataPartitionTable generation completed
successfully");
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ LOGGER.info("DataPartitionTable generation completed with task ID:
{}", currentTaskId);
+ break;
+ default:
+ resp.setDataPartitionTable(empty);
+
resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode());
+ resp.setMessage(
+ "DataPartitionTable generation failed: " +
currentGenerator.getErrorMessage());
+
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ break;
+ }
+ }
+
+ // Clear current generator
+ currentGenerator = null;
+ } catch (Exception e) {
+ LOGGER.error("Failed to generate DataPartitionTable", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.GENERATE_DATA_PARTITION_TABLE,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+
+ return resp;
+ }
+
+ @Override
+ public TGenerateDataPartitionTableHeartbeatResp
generateDataPartitionTableHeartbeat() {
+ TGenerateDataPartitionTableHeartbeatResp resp = new
TGenerateDataPartitionTableHeartbeatResp();
+
+ try {
+ if (currentGenerator == null) {
+ resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
+ resp.setMessage("No DataPartitionTable generation task found");
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ return resp;
+ }
+
+ DataPartitionTableGenerator.TaskStatus status =
currentGenerator.getStatus();
+
+ switch (status) {
+ case IN_PROGRESS:
+
resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode());
+ resp.setMessage(
+ String.format(
+ "DataPartitionTable generation in progress: %.1f%%",
+ currentGenerator.getProgress() * 100));
+
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ break;
+ case COMPLETED:
+
resp.setErrorCode(DataPartitionTableGeneratorState.SUCCESS.getCode());
+ resp.setMessage("DataPartitionTable generation completed
successfully");
+ resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ break;
+ case FAILED:
+ resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode());
+ resp.setMessage(
+ "DataPartitionTable generation failed: " +
currentGenerator.getErrorMessage());
+
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ break;
+ default:
+
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
+ resp.setMessage("Unknown task status: " + status);
+
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to check DataPartitionTable generation status", e);
+ resp.setStatus(
+ onIoTDBException(
+ e,
+ OperationType.CHECK_DATA_PARTITION_TABLE_STATUS,
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()));
+ }
+
+ return resp;
+ }
+
+ /** Process data directory to find the earliest timeslots for each database.
*/
+ private void processDataDirectoryForEarliestTimeslots(
+ File dataDir, Map<String, Long> earliestTimeslots) {
+ try {
+ Files.list(dataDir.toPath())
+ .filter(Files::isDirectory)
+ .forEach(
+ sequenceTypePath -> {
+ try {
+ Files.list(sequenceTypePath)
Review Comment:
Changed yet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]