Copilot commented on code in PR #17297:
URL: https://github.com/apache/iotdb/pull/17297#discussion_r2930378907


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java:
##########
@@ -275,16 +291,75 @@ public Map<String, MeasurementSchema> 
getAllSchemasOfCurrentDevice() throws IOEx
       reader.getDeviceTimeseriesMetadata(
           timeseriesMetadataList,
           
deviceIteratorMap.get(resource).getFirstMeasurementNodeOfCurrentDevice(),
+          seriesNeedToUpdateDataType,
+          true,
+          null);
+      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+        MeasurementSchema measurementSchema = 
schemaMap.get(timeseriesMetadata.getMeasurementId());
+        if (measurementSchema == null) {
+          if (!timeseriesMetadata.getChunkMetadataList().isEmpty()) {
+            schemaMap.put(
+                timeseriesMetadata.getMeasurementId(),
+                
reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList()));
+          }
+          continue;
+        }
+        if (measurementSchema.getType() != timeseriesMetadata.getTsDataType()) 
{
+          
seriesNeedToUpdateDataType.add(timeseriesMetadata.getMeasurementId());
+        }
+      }
+    }
+    List<IMeasurementSchema> latestMeasurementSchemas =
+        CompactionUtils.getLatestMeasurementSchemasForTreeModel(
+            currentDevice.left, new ArrayList<>(seriesNeedToUpdateDataType));
+    for (IMeasurementSchema latestMeasurementSchema : 
latestMeasurementSchemas) {
+      if (latestMeasurementSchema != null) {
+        schemaMap.put(
+            latestMeasurementSchema.getMeasurementName(),
+            (MeasurementSchema) latestMeasurementSchema);
+      }
+    }
+    return schemaMap;
+  }
+
+  private Map<String, MeasurementSchema> 
getAllSchemasOfCurrentDeviceForTable() throws IOException {
+    Map<String, MeasurementSchema> schemaMap = new ConcurrentHashMap<>();
+    TsTable tsTable =
+        DataNodeTableCache.getInstance().getTable(databaseName, 
currentDevice.left.getTableName());
+    // get schemas from the newest file to the oldest file
+    for (TsFileResource resource : tsFileResourcesSortedByDesc) {
+      TsFileDeviceIterator deviceIterator = deviceIteratorMap.get(resource);
+      if (!deviceIteratorMap.containsKey(resource)
+          || !deviceIterator.current().equals(currentDevice)) {
+        // if this tsfile has no more device or next device is not equals to 
the current device,
+        // which means this tsfile does not contain the current device, then 
skip it.
+        continue;
+      }
+      TsFileSequenceReader reader = readerMap.get(resource);
+      List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+      reader.getDeviceTimeseriesMetadata(
+          timeseriesMetadataList,
+          deviceIterator.getFirstMeasurementNodeOfCurrentDevice(),
           schemaMap.keySet(),
           true,
           null);
       for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
-        if (!schemaMap.containsKey(timeseriesMetadata.getMeasurementId())
-            && !timeseriesMetadata.getChunkMetadataList().isEmpty()) {
-          schemaMap.put(
-              timeseriesMetadata.getMeasurementId(),
-              
reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList()));
+        MeasurementSchema measurementSchema = 
schemaMap.get(timeseriesMetadata.getMeasurementId());
+        if (measurementSchema != null) {
+          continue;
+        }
+        if (tsTable != null) {
+          TsTableColumnSchema columnSchema =
+              tsTable.getColumnSchema(timeseriesMetadata.getMeasurementId());
+          if (columnSchema != null) {
+            measurementSchema = (MeasurementSchema) 
columnSchema.getMeasurementSchema();
+          }
+        }
+        if (measurementSchema == null && 
!timeseriesMetadata.getChunkMetadataList().isEmpty()) {
+          measurementSchema =
+              
reader.getMeasurementSchema(timeseriesMetadata.getChunkMetadataList());
         }

Review Comment:
   `getAllSchemasOfCurrentDeviceForTable` can insert a null value into 
`schemaMap` (when `tsTable` has no column schema and 
`timeseriesMetadata.getChunkMetadataList()` is empty). Since 
`schemaMap.keySet()` is passed into `getDeviceTimeseriesMetadata(...)` as the 
exclusion set, adding the key with a null value can prevent older files from 
being scanned to resolve the schema later, leaving the schema permanently null. 
Avoid putting the measurement into `schemaMap` until a non-null 
`MeasurementSchema` is derived, or maintain a separate exclusion set that only 
contains measurements with resolved schemas.
   



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/alterDataType/AbstractCompactionAlterDataTypeTest.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.alterDataType;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFakeSchemaFetcherImpl;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.TSRecord;
+import org.apache.tsfile.write.record.datapoint.DoubleDataPoint;
+import org.apache.tsfile.write.record.datapoint.FloatDataPoint;
+import org.apache.tsfile.write.record.datapoint.IntDataPoint;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class AbstractCompactionAlterDataTypeTest extends 
AbstractCompactionTest {
+
+  protected final String oldThreadName = Thread.currentThread().getName();
+  protected final IDeviceID device =
+      IDeviceID.Factory.DEFAULT_FACTORY.create(COMPACTION_TEST_SG + ".d1");
+
+  protected CompactionFakeSchemaFetcherImpl schemaFetcher;
+
+  @Before
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
+    super.setUp();
+    Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
+    this.schemaFetcher = new CompactionFakeSchemaFetcherImpl();
+    
schemaFetcher.getSchemaTree().setDatabases(Collections.singleton(COMPACTION_TEST_SG));
+    CompactionUtils.setSchemaFetcher(schemaFetcher);
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {

Review Comment:
   `setUp()` installs a static/global schema fetcher via 
`CompactionUtils.setSchemaFetcher(schemaFetcher)`, but `tearDown()` never 
resets it. Since this is global mutable state, it can leak into other tests 
executed in the same JVM and change their behavior. Reset it in `tearDown()` 
(e.g., set it back to null) to keep tests isolated.
   



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java:
##########
@@ -133,8 +144,66 @@ public ReadChunkAlignedSeriesCompactionExecutor(
     this.ignoreAllNullRows = ignoreAllNullRows;
   }
 
-  private void collectValueColumnSchemaList() throws IOException {
+  private void collectValueColumnSchemaListForTable(String database) throws 
IOException {
+    TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
+    TsTable tsTable = DataNodeTableCache.getInstance().getTable(database, 
device.getTableName());
     Map<String, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
+    for (int i = this.readerAndChunkMetadataList.size() - 1; i >= 0; i--) {
+      Pair<TsFileSequenceReader, List<AbstractAlignedChunkMetadata>> pair =
+          this.readerAndChunkMetadataList.get(i);
+      CompactionTsFileReader reader = (CompactionTsFileReader) pair.getLeft();
+      List<AbstractAlignedChunkMetadata> alignedChunkMetadataList = 
pair.getRight();
+      for (AbstractAlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
+        if (alignedChunkMetadata == null) {
+          continue;
+        }
+        if (timeSchema == null) {
+          timeSchema =
+              new MeasurementSchema(
+                  "",
+                  alignedChunkMetadata.getTimeChunkMetadata().getDataType(),
+                  TSEncoding.valueOf(tsFileConfig.getTimeEncoder()),
+                  tsFileConfig.getCompressor());
+        }
+        for (IChunkMetadata chunkMetadata : 
alignedChunkMetadata.getValueChunkMetadataList()) {
+          if (chunkMetadata == null
+              || 
measurementSchemaMap.containsKey(chunkMetadata.getMeasurementUid())) {
+            continue;
+          }
+          TsTableColumnSchema schemaInTsTable =
+              tsTable.getColumnSchema(chunkMetadata.getMeasurementUid());

Review Comment:
   `collectValueColumnSchemaListForTable` dereferences `tsTable` without a 
null-check (`tsTable.getColumnSchema(...)`). 
`DataNodeTableCache.getInstance().getTable(...)` can return null (e.g., table 
not in cache / dropped), which would cause an NPE during compaction. Please 
guard `tsTable` (treat missing table schema as `schemaInTsTable == null`) and 
fall back to reading the chunk header schema in that case.
   



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java:
##########
@@ -304,42 +313,63 @@ private void compactNotAlignedSeries(
 
   private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
       filterDataTypeNotMatchedChunkMetadata(
+          IDeviceID deviceID,
+          String measurement,
           LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> 
readerAndChunkMetadataList) {
     if (readerAndChunkMetadataList.isEmpty()) {
       return readerAndChunkMetadataList;
     }
-    LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> result = new 
LinkedList<>();
     // find correct data type
     TSDataType correctDataType = null;
-    for (int i = readerAndChunkMetadataList.size() - 1; i >= 0 && 
correctDataType == null; i--) {
-      List<ChunkMetadata> chunkMetadataList = 
readerAndChunkMetadataList.get(i).getRight();
-      if (chunkMetadataList == null || chunkMetadataList.isEmpty()) {
-        continue;
-      }
+    boolean hasDifferentDataTypes = false;
+    Iterator<Pair<TsFileSequenceReader, List<ChunkMetadata>>> descIterator =
+        readerAndChunkMetadataList.descendingIterator();
+    while (descIterator.hasNext()) {
+      Pair<TsFileSequenceReader, List<ChunkMetadata>> pair = 
descIterator.next();
+      List<ChunkMetadata> chunkMetadataList = pair.right;
+      TSDataType dataTypeInCurrentFile = null;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-        if (chunkMetadata == null) {
-          continue;
+        if (chunkMetadata != null) {
+          dataTypeInCurrentFile = chunkMetadata.getDataType();
+          break;
         }
-        correctDataType = chunkMetadata.getDataType();
+      }
+      if (correctDataType == null) {
+        correctDataType = dataTypeInCurrentFile;
+      } else if (correctDataType != dataTypeInCurrentFile) {
+        hasDifferentDataTypes = true;
         break;
       }
     }
-    if (correctDataType == null) {
+    if (!hasDifferentDataTypes) {
       return readerAndChunkMetadataList;
     }
+
+    IMeasurementSchema schema =
+        CompactionUtils.getLatestMeasurementSchemasForTreeModel(
+                deviceID, Collections.singletonList(measurement))
+            .get(0);
+    if (schema != null) {
+      correctDataType = schema.getType();
+    }
+
+    LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> result = new 
LinkedList<>();
     // check data type consistent and skip compact files with wrong data type
     for (Pair<TsFileSequenceReader, List<ChunkMetadata>> 
tsFileSequenceReaderListPair :
         readerAndChunkMetadataList) {
       boolean dataTypeConsistent = true;
       for (ChunkMetadata chunkMetadata : 
tsFileSequenceReaderListPair.getRight()) {
-        if (chunkMetadata != null
-            && !MetadataUtils.canAlter(chunkMetadata.getDataType(), 
correctDataType)) {
-          dataTypeConsistent = false;
+        if (chunkMetadata == null) {
+          continue;
+        }
+        if (chunkMetadata.getDataType() == correctDataType) {
           break;
         }
-        if (chunkMetadata != null && chunkMetadata.getDataType() != 
correctDataType) {
-          chunkMetadata.setNewType(correctDataType);
+        if (!MetadataUtils.canAlter(chunkMetadata.getDataType(), 
correctDataType)) {
+          dataTypeConsistent = false;
+          break;
         }
+        chunkMetadata.setNewType(correctDataType);

Review Comment:
   In `filterDataTypeNotMatchedChunkMetadata`, `correctDataType` can remain 
null if the newest file(s) for this series have empty chunk-metadata lists 
(e.g., fully deleted by modifications), and the schema fetch returns null. In 
that case `MetadataUtils.canAlter(chunkMetadata.getDataType(), 
correctDataType)` will throw an NPE. Add a guard such as: if `correctDataType` 
is still null after scanning / schema lookup, return the original list (or 
filter out empty pairs) without attempting type alteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to