jt2594838 commented on code in PR #16540:
URL: https://github.com/apache/iotdb/pull/16540#discussion_r2428644711
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java:
##########
@@ -202,6 +213,43 @@ public boolean hasNext() {
continue;
}
+ Iterator<IChunkMetadata> iChunkMetadataIterator =
+
alignedChunkMetadata.getValueChunkMetadataList().iterator();
+ while (iChunkMetadataIterator.hasNext()) {
+ IChunkMetadata iChunkMetadata =
iChunkMetadataIterator.next();
+ if (iChunkMetadata == null) {
+ throw new PipeException(
+ "Table model tsfile parsing does not support this type
of ChunkMeta");
+ }
+
+ boolean isDelete = false;
+ if (isDelete =
+ ModsOperationUtil.isAllDeletedByMods(
+ pair.getLeft(),
+ iChunkMetadata.getMeasurementUid(),
+ alignedChunkMetadata.getStartTime(),
+ alignedChunkMetadata.getEndTime(),
+ modifications)) {
+ iChunkMetadataIterator.remove();
+ }
+ System.out.println(
+ "deviceID: "
+ + pair.getLeft()
+ + ", measurement: "
+ + iChunkMetadata.getMeasurementUid()
+ + ", startTime: "
+ + alignedChunkMetadata.getStartTime()
+ + ", endTime: "
+ + alignedChunkMetadata.getEndTime()
+ + ", isDelete: "
+ + isDelete);
Review Comment:
Remove
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
+
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for handling mods operations during TsFile parsing. Supports
mods processing logic
+ * for both tree model and table model.
+ */
+public class ModsOperationUtil {
+
+ private ModsOperationUtil() {
+ // Utility class, no instantiation allowed
+ }
+
+ /**
+ * Load all modifications from TsFile and build PatternTreeMap
+ *
+ * @param tsFile TsFile file
+ * @return PatternTreeMap containing all modifications
+ */
+ public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
+ loadModificationsFromTsFile(File tsFile) {
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications =
+ PatternTreeMapFactory.getModsPatternTreeMap();
+
+ try {
+ ModificationFile.readAllModifications(tsFile, true)
+ .forEach(
+ modification ->
modifications.append(modification.keyOfPatternTree(), modification));
+ } catch (Exception e) {
+ throw new PipeException("Failed to load modifications from TsFile: " +
tsFile.getPath(), e);
+ }
+
+ return modifications;
+ }
+
+ /**
+ * Check if data in the specified time range is completely deleted by mods
Different logic for
+ * tree model and table model
+ *
+ * @param deviceID device ID
+ * @param measurementID measurement ID
+ * @param startTime start time
+ * @param endTime end time
+ * @param modifications modification records
+ * @return true if data is completely deleted, false otherwise
+ */
+ public static boolean isAllDeletedByMods(
+ IDeviceID deviceID,
+ String measurementID,
+ long startTime,
+ long endTime,
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications) {
+ if (modifications == null) {
+ return false;
+ }
+
+ final List<ModEntry> mods = modifications.getOverlapped(deviceID,
measurementID);
+ if (mods == null || mods.isEmpty()) {
+ return false;
+ }
+
+ // Different logic for tree model and table model
+ if (deviceID.isTableModel()) {
+ // For table model: check if any modification affects the device and
covers the time range
+ return mods.stream()
+ .anyMatch(
+ modification ->
+ modification.getTimeRange().contains(startTime, endTime)
+ && modification.affects(deviceID)
+ && modification.affects(measurementID));
+ } else {
+ // For tree model: check if any modification covers the time range
+ return mods.stream()
+ .anyMatch(modification ->
modification.getTimeRange().contains(startTime, endTime));
+ }
+ }
+
+ /**
+ * Initialize mods mapping for specified measurement list
+ *
+ * @param deviceID device ID
+ * @param measurements measurement list
+ * @param modifications modification records
+ * @return mapping from measurement ID to mods list and index
+ */
+ public static List<ModsInfo> initializeMeasurementMods(
+ IDeviceID deviceID,
+ List<String> measurements,
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications) {
+
+ List<ModsInfo> modsInfos = new ArrayList<>(measurements.size());
+
+ for (final String measurement : measurements) {
+ final List<ModEntry> mods = modifications.getOverlapped(deviceID,
measurement);
+ if (mods == null || mods.isEmpty()) {
+ // No mods, use empty list and index 0
+ modsInfos.add(new ModsInfo(Collections.emptyList(), 0));
+ continue;
+ }
+
+ // Sort by time range for efficient lookup
+ // Different filtering logic for tree model and table model
+ final List<ModEntry> sortedMods;
+ if (deviceID.isTableModel()) {
+ // For table model: filter modifications that affect the device
+ sortedMods =
+ mods.stream()
+ .filter(
+ modification ->
+ modification.affects(deviceID) &&
modification.affects(measurement))
+ .collect(Collectors.toList());
+ } else {
+ // For tree model: no additional filtering needed
+ sortedMods = mods;
+ }
+ // Store sorted mods and start index
+ modsInfos.add(new ModsInfo(ModificationUtils.sortAndMerge(sortedMods),
0));
+ }
+
+ return modsInfos;
+ }
+
+ /**
+ * Check if data at the specified time point is deleted
+ *
+ * @param time time point
+ * @param modsInfo mods information containing mods list and current index
+ * @return true if data is deleted, false otherwise
+ */
+ public static boolean isDelete(long time, ModsInfo modsInfo) {
Review Comment:
How about moving this into ModsInfo?
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.tablemodel.manual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
+import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQueryWithRetry;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTableManualBasic.class})
+public class IoTDBPipeTsFileDecompositionWithModsIT extends
AbstractPipeTableModelDualManualIT {
+
+ @Test
+ public void testTsFileDecompositionWithMods() {
+ TableModelUtils.createDataBaseAndTable(senderEnv, "table1", "sg1");
+ TableModelUtils.createDataBaseAndTable(receiverEnv, "table1", "sg1");
+
+ TableModelUtils.insertData("sg1", "table1", 1, 6, senderEnv);
+
+ TableModelUtils.createDataBaseAndTable(senderEnv, "table1", "sg2");
+ for (int i = 1; i <= 110; i++) {
+ TableModelUtils.insertData("sg2", "table1", 10, 15, (i - 1) * 100, i *
100, senderEnv);
+ }
+
+ executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 2 AND time <= 4",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg1",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 3 AND time <= 5",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg1",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 0 AND time < 10000 AND s0 ='t10' AND
s1='t10' AND s2='t10' AND s3='t10'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 0 AND time <= 11000 AND s0 ='t11'
AND s1='t11' AND s2='t11' AND s3='t11'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 5000 AND time < 10100 AND s0 ='t12'
AND s1='t12' AND s2='t12' AND s3='t12'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 0 AND time < 10000 AND s0 ='t13' AND
s1='t13' AND s2='t13' AND s3='t13'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ "DELETE FROM table1 WHERE time >= 10000 AND time <= 11000 AND s0
='t14' AND s1='t14' AND s2='t14' AND s3='t14'",
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ "sg2",
+ "table");
+
+ executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true',
'capture.table'='true') WITH CONNECTOR('ip'='%s', 'port'='%s',
'username'='root', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ HashSet<String> expectedResults = new HashSet<>();
+ expectedResults.add(
+
"t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ TableModelUtils.getQuerySql("table1"),
+ TableModelUtils.generateHeaderResults(),
+ expectedResults,
+ "sg1");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT s4 FROM table1 WHERE time >= 2 AND time <= 4",
+ "s4,",
+ Collections.emptySet(),
+ "sg1");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t10' AND s1='t10' AND
s2='t10' AND s3='t10'",
+ "count,",
+ Collections.singleton("1000,"),
+ "sg2");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t11' AND s1='t11' AND
s2='t11' AND s3='t11'",
+ "count,",
+ Collections.singleton("0,"),
+ "sg2");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(*) as count FROM table1 WHERE s0 ='t11' AND s1='t11' AND
s2='t11' AND s3='t11'",
+ "count,",
+ Collections.singleton("0,"),
+ "sg2");
Review Comment:
Dupliacted?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/util/ModsOperationUtil.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.util;
+
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for handling mods operations during TsFile parsing. Supports
mods processing logic
+ * for both tree model and table model.
+ */
+public class ModsOperationUtil {
+
+ private ModsOperationUtil() {
+ // Utility class, no instantiation allowed
+ }
+
+ /**
+ * Load all modifications from TsFile and build PatternTreeMap
+ *
+ * @param tsFile TsFile file
+ * @return PatternTreeMap containing all modifications
+ */
+ public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
+ loadModificationsFromTsFile(File tsFile) {
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications =
+ PatternTreeMapFactory.getModsPatternTreeMap();
+
+ try {
+ ModificationFile.readAllModifications(tsFile, true)
+ .forEach(
+ modification ->
modifications.append(modification.keyOfPatternTree(), modification));
+ } catch (Exception e) {
+ throw new PipeException("Failed to load modifications from TsFile: " +
tsFile.getPath(), e);
+ }
+
+ return modifications;
+ }
+
+ /**
+ * Check if data in the specified time range is completely deleted by mods
Different logic for
+ * tree model and table model
+ *
+ * @param deviceID device ID
+ * @param measurementID measurement ID
+ * @param startTime start time
+ * @param endTime end time
+ * @param modifications modification records
+ * @return true if data is completely deleted, false otherwise
+ */
+ public static boolean isAllDeletedByMods(
+ IDeviceID deviceID,
+ String measurementID,
+ long startTime,
+ long endTime,
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications) {
+ if (modifications == null) {
+ return false;
+ }
+
+ final List<ModEntry> mods = modifications.getOverlapped(deviceID,
measurementID);
+ if (mods == null || mods.isEmpty()) {
+ return false;
+ }
+
+ // Different logic for tree model and table model
+ if (deviceID.isTableModel()) {
+ // For table model: check if any modification affects the device and
covers the time range
+ return mods.stream()
+ .anyMatch(
+ modification ->
+ modification.getTimeRange().contains(startTime, endTime)
+ && modification.affects(deviceID)
+ && modification.affects(measurementID));
+ } else {
+ // For tree model: check if any modification covers the time range
+ return mods.stream()
+ .anyMatch(modification ->
modification.getTimeRange().contains(startTime, endTime));
+ }
+ }
+
+ /**
+ * Initialize mods mapping for specified measurement list
+ *
+ * @param deviceID device ID
+ * @param measurements measurement list
+ * @param modifications modification records
+ * @return mapping from measurement ID to mods list and index
+ */
+ public static List<ModsInfo> initializeMeasurementMods(
+ IDeviceID deviceID,
+ List<String> measurements,
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications) {
+
+ List<ModsInfo> modsInfos = new ArrayList<>(measurements.size());
+
+ for (final String measurement : measurements) {
+ final List<ModEntry> mods = modifications.getOverlapped(deviceID,
measurement);
+ if (mods == null || mods.isEmpty()) {
+ // No mods, use empty list and index 0
+ modsInfos.add(new ModsInfo(Collections.emptyList(), 0));
+ continue;
+ }
+
+ // Sort by time range for efficient lookup
+ // Different filtering logic for tree model and table model
+ final List<ModEntry> sortedMods;
+ if (deviceID.isTableModel()) {
+ // For table model: filter modifications that affect the device
+ sortedMods =
+ mods.stream()
+ .filter(
+ modification ->
+ modification.affects(deviceID) &&
modification.affects(measurement))
+ .collect(Collectors.toList());
+ } else {
+ // For tree model: no additional filtering needed
+ sortedMods = mods;
+ }
+ // Store sorted mods and start index
+ modsInfos.add(new ModsInfo(ModificationUtils.sortAndMerge(sortedMods),
0));
+ }
+
+ return modsInfos;
+ }
+
+ /**
+ * Check if data at the specified time point is deleted
+ *
+ * @param time time point
+ * @param modsInfo mods information containing mods list and current index
+ * @return true if data is deleted, false otherwise
+ */
+ public static boolean isDelete(long time, ModsInfo modsInfo) {
+ if (modsInfo == null) {
+ return false;
+ }
+
+ final List<ModEntry> mods = modsInfo.getMods();
+ if (mods == null || mods.isEmpty()) {
+ return false;
+ }
+
+ int currentIndex = modsInfo.getCurrentIndex();
+ if (currentIndex < 0) {
+ return false;
+ }
+
+ // Search from current index
+ for (int i = currentIndex; i < mods.size(); i++) {
+ final ModEntry mod = mods.get(i);
+ final long modStartTime = mod.getTimeRange().getMin();
+ final long modEndTime = mod.getTimeRange().getMax();
+
+ if (time < modStartTime) {
+ // Current time is before mod start time, update index and return false
+ modsInfo.setCurrentIndex(i);
+ return false;
+ } else if (time <= modEndTime) {
+ // Current time is within mod time range, update index and return true
+ modsInfo.setCurrentIndex(i);
+ return true;
+ }
+ // If time > modEndTime, continue to next mod
+ }
Review Comment:
Collections.binarySearch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]