Caideyipi commented on code in PR #16540: URL: https://github.com/apache/iotdb/pull/16540#discussion_r2418386440
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java: ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util; + +import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Utility class for handling mods operations during TsFile parsing. Supports mods processing logic + * for both tree model and table model. + */ +public class ModsOperationUtil { + + private ModsOperationUtil() { + // Utility class, no instantiation allowed + } + + /** + * Load all modifications from TsFile and build PatternTreeMap + * + * @param tsFile TsFile file + * @return PatternTreeMap containing all modifications + */ + public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> + loadModificationsFromTsFile(File tsFile) { + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = + PatternTreeMapFactory.getModsPatternTreeMap(); + + try { + ModificationFile.readAllModifications(tsFile, true) + .forEach( + modification -> modifications.append(modification.keyOfPatternTree(), modification)); + } catch (Exception e) { + throw new PipeException("Failed to load modifications from TsFile: " + tsFile.getPath(), e); + } + + return modifications; + } + + /** + * Check if data in the specified time range is completely deleted by mods Different logic for + * tree model and table model + * + * @param deviceID device ID + * @param measurementID measurement ID + * @param startTime start time + * @param endTime end time + * @param modifications modification records + * @return true if data is completely deleted, false otherwise + */ + public static boolean isAllDeletedByMods( + IDeviceID deviceID, + String measurementID, + long startTime, + long endTime, + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications) { + if (modifications == null) { + return false; + } + + final List<ModEntry> mods = modifications.getOverlapped(deviceID, measurementID); + if (mods == null || mods.isEmpty()) { + return false; + } + + // Different logic for tree model and table model + if (deviceID.isTableModel()) { + // For table model: check if any modification affects the device and covers the time range + return mods.stream() + .anyMatch( + modification -> + modification.getTimeRange().contains(startTime, endTime) + && modification.affects(deviceID) + && modification.affects(measurementID)); + } else { + // For tree model: check if any modification covers the time range + return mods.stream() + .anyMatch(modification -> modification.getTimeRange().contains(startTime, endTime)); + } + } + + /** + * Initialize mods mapping for specified measurement list + * + * @param deviceID device ID + * @param measurements measurement list + * @param modifications modification records + * @return mapping from measurement ID to mods list and index + */ + public static List<ModsInfo> initializeMeasurementMods( + IDeviceID deviceID, + List<String> measurements, + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications) { + + List<ModsInfo> modsInfos = new ArrayList<>(); + + for (final String measurement : measurements) { + final List<ModEntry> mods = modifications.getOverlapped(deviceID, measurement); + if (mods == null || mods.isEmpty()) { + // No mods, use empty list and index 0 + modsInfos.add(new ModsInfo(Collections.emptyList(), 0)); + continue; + } + + // Sort by time range for efficient lookup + // Different filtering logic for tree model and table model + final List<ModEntry> sortedMods; + if (deviceID.isTableModel()) { + // For table model: filter modifications that affect the device + sortedMods = + mods.stream() + .filter( + modification -> + modification.affects(deviceID) && modification.affects(measurement)) + .sorted() + .collect(Collectors.toList()); + } else { + // For tree model: no additional filtering needed + sortedMods = mods; + } + // Store sorted mods and start index + modsInfos.add(new ModsInfo(ModificationUtils.sortAndMerge(sortedMods), 0)); + } + + return modsInfos; + } + + /** + * Check if data at the specified time point is deleted + * + * @param time time point + * @param modsInfo mods information containing mods list and current index + * @return true if data is deleted, false otherwise + */ + public static boolean isDelete(long time, ModsInfo modsInfo) { + if (modsInfo == null) { + return false; + } + + final List<ModEntry> mods = modsInfo.getMods(); + if (mods == null || mods.isEmpty()) { + return false; + } + + int currentIndex = modsInfo.getCurrentIndex(); + if (currentIndex < 0) { + return false; + } + + // Search from current index + for (int i = currentIndex; i < mods.size(); i++) { + final ModEntry mod = mods.get(i); + final long modStartTime = mod.getTimeRange().getMin(); + final long modEndTime = mod.getTimeRange().getMax(); + + if (time < modStartTime) { + // Current time is before mod start time, update index and return false + modsInfo.setCurrentIndex(i); + return false; + } else if (time <= modEndTime) { + // Current time is within mod time range, update index and return true + modsInfo.setCurrentIndex(i); + return true; + } + // If time > modEndTime, continue to next mod + } + + // All mods checked, clear mods list and reset index to 0 + modsInfo.setMods(Collections.emptyList()); + modsInfo.setCurrentIndex(0); + return false; + } + + /** + * Update index in measurement mods mapping + * + * @param measurementID measurement ID + * @param newIndex new index + * @param measurementModsMap measurement mods mapping + */ + public static void updateModsIndex( Review Comment: Is it used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
