Copilot commented on code in PR #16699:
URL: https://github.com/apache/iotdb/pull/16699#discussion_r2545433923
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java:
##########
@@ -240,18 +249,28 @@ public void selfCheckDataTypes(int index)
public abstract Object getFirstValueOfIndex(int index);
public void semanticCheck() {
- Set<String> deduplicatedMeasurements = new HashSet<>();
+ // Skip if semantic check has already been performed
+ if (hasSemanticChecked) {
+ return;
+ }
+
+ Set<String> deduplicatedMeasurements = new HashSet<>(measurements.length);
+ int index = 0;
for (String measurement : measurements) {
- if (measurement == null || measurement.isEmpty()) {
+ if ((failedMeasurementIndex2Info == null ||
!failedMeasurementIndex2Info.containsKey(index))
+ && (measurement == null || measurement.isEmpty())) {
throw new SemanticException(
"Measurement contains null or empty string: " +
Arrays.toString(measurements));
}
- if (deduplicatedMeasurements.contains(measurement)) {
+ index++;
+ deduplicatedMeasurements.add(measurement);
+ if (deduplicatedMeasurements.size() != index) {
throw new SemanticException("Insertion contains duplicated
measurement: " + measurement);
- } else {
- deduplicatedMeasurements.add(measurement);
}
}
Review Comment:
The duplicate detection logic is incorrect. Line 265 increments `index`
before adding to the set (line 266), but the size check on line 267 compares
against the incremented `index`. This means for the first iteration, it checks
if size (1) != index (1), which is correct, but the logic is confusing.
Additionally, if a measurement is null/empty and passes the check on line 260
(because it's in failedMeasurementIndex2Info), it will still be added to
deduplicatedMeasurements on line 266, potentially causing false duplicate
detection. The increment should happen after the duplicate check, and null
measurements should not be added to the set.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -1421,14 +1426,72 @@ private boolean insertTabletToTsFileProcessor(
return true;
}
+ private TableSchema getTableSchemaFromCache(
+ final String database, final String tableName, final Pair<Long, Long>
currentVersion) {
+ final TableSchemaCacheKey key = new TableSchemaCacheKey(database,
tableName);
+ final Triple<Long, Long, TableSchema> cached =
TABLE_SCHEMA_CACHE.getIfPresent(key);
+ if (cached == null) {
+ return null;
+ }
+ if (cached.getLeft().equals(currentVersion.getLeft())
+ && cached.getMiddle().equals(currentVersion.getRight())) {
+ return cached.getRight();
+ }
+ // remove stale entry to avoid unbounded growth
+ TABLE_SCHEMA_CACHE.invalidate(key);
+ return null;
Review Comment:
[nitpick] The cache invalidation on line 1441 is always executed even when
the version matches (after returning on line 1438). However, this code path is
only reached when versions don't match. Consider adding a comment to clarify
this is intentional, or restructure to make it clearer that invalidation only
happens on version mismatch: `if (...) { return cached.getRight(); } else {
TABLE_SCHEMA_CACHE.invalidate(key); return null; }`
```suggestion
} else {
// remove stale entry to avoid unbounded growth (only on version
mismatch)
TABLE_SCHEMA_CACHE.invalidate(key);
return null;
}
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java:
##########
@@ -102,75 +110,395 @@ protected TableSchema toTableSchema(InsertBaseStatement
insertBaseStatement) {
return new TableSchema(tableName, columnSchemas);
}
+ protected InsertNodeMeasurementInfo toInsertNodeMeasurementInfo(
+ InsertBaseStatement insertBaseStatement) {
+ String tableName =
insertBaseStatement.getDevicePath().getFullPath().toLowerCase();
+
+ // Use lazy initialization with measurements and dataTypes
+ return new InsertNodeMeasurementInfo(
+ tableName,
+ insertBaseStatement.getColumnCategories(),
+ insertBaseStatement.getMeasurements(),
+ insertBaseStatement.getDataTypes(),
+ index ->
+ TypeInferenceUtils.getPredictedDataType(
+ insertBaseStatement.getFirstValueOfIndex(index), true),
+ insertBaseStatement::toLowerCase,
+ insertBaseStatement::semanticCheck,
+ insertBaseStatement::setAttributeColumnsPresent,
+ insertBaseStatement::setToLowerCaseApplied,
+ insertBaseStatement::setSemanticChecked);
+ }
+
public void validateTableSchema(Metadata metadata, MPPQueryContext context) {
String databaseName = getDatabase();
- final TableSchema incomingSchema = getTableSchema();
- final TableSchema realSchema =
- metadata
- .validateTableHeaderSchema(databaseName, incomingSchema, context,
true, false)
- .orElse(null);
- if (realSchema == null) {
+ metadata.validateInsertNodeMeasurements(
+ databaseName,
+ getInsertNodeMeasurementInfo(),
+ context,
+ true,
+ this::validate,
+ this::adjustTagColumns);
+ }
+
+ public void validateTableSchema(
+ final Metadata metadata,
+ final MPPQueryContext context,
+ final InsertRowStatement insertRowStatement,
+ final String databaseName) {
+ metadata.validateInsertNodeMeasurements(
+ databaseName,
+ toInsertNodeMeasurementInfo(insertRowStatement),
+ context,
+ true,
+ (index, measurement, dataType, columnCategory, existingColumn) ->
+ validate(
+ index, insertRowStatement, measurement, dataType,
columnCategory, existingColumn),
+ (tagColumnIndexMap, existingTagColumnIndexMap) ->
+ adjustTagColumns(insertRowStatement, tagColumnIndexMap,
existingTagColumnIndexMap));
+ }
+
+ private void validate(
+ final int index,
+ final String measurement,
+ final TSDataType dataType,
+ final TsTableColumnCategory columnCategory,
+ final TsTableColumnSchema existingColumn) {
+ final InsertBaseStatement innerTreeStatement = getInnerTreeStatement();
+ validate(index, innerTreeStatement, measurement, dataType, columnCategory,
existingColumn);
+ }
+
+ private void validate(
+ final int index,
+ final InsertBaseStatement innerTreeStatement,
+ final String measurement,
+ final TSDataType dataType,
+ final TsTableColumnCategory columnCategory,
+ final TsTableColumnSchema existingColumn) {
+ if (existingColumn == null) {
+ processNonExistColumn(measurement, columnCategory, innerTreeStatement,
index);
+ return; // Exit early if column doesn't exist
+ }
+
+ // check data type
+ if (dataType == null || columnCategory != TsTableColumnCategory.FIELD) {
+ // sql insertion does not provide type
+ // the type is inferred and can be inconsistent with the existing one
+ innerTreeStatement.setDataType(existingColumn.getDataType(), index);
+ } else if (!existingColumn.getDataType().isCompatible(dataType)
+ && !innerTreeStatement.isForceTypeConversion()) {
+ processTypeConflictColumn(
+ measurement, dataType, columnCategory, existingColumn, index,
innerTreeStatement);
+ }
+
+ // check column category
+ if (columnCategory == null) {
+ // sql insertion does not provide category
+ innerTreeStatement.setColumnCategory(existingColumn.getColumnCategory(),
index);
+ } else if (!columnCategory.equals(existingColumn.getColumnCategory())) {
throw new SemanticException(
- "Schema validation failed, table cannot be created: " +
incomingSchema);
+ String.format(
+ "Inconsistent column category of column %s: %s/%s",
+ measurement, columnCategory, existingColumn.getColumnCategory()),
+ TSStatusCode.COLUMN_CATEGORY_MISMATCH.getStatusCode());
}
- InsertBaseStatement innerTreeStatement = getInnerTreeStatement();
- validateTableSchema(realSchema, incomingSchema, innerTreeStatement);
- }
- protected void validateTableSchema(
- TableSchema realSchema,
- TableSchema incomingTableSchema,
- InsertBaseStatement innerTreeStatement) {
- final List<ColumnSchema> incomingSchemaColumns =
incomingTableSchema.getColumns();
- Map<String, ColumnSchema> realSchemaMap = new HashMap<>();
- realSchema.getColumns().forEach(c -> realSchemaMap.put(c.getName(), c));
- // incoming schema should be consistent with real schema
- for (int i = 0, incomingSchemaColumnsSize = incomingSchemaColumns.size();
- i < incomingSchemaColumnsSize;
- i++) {
- ColumnSchema incomingSchemaColumn = incomingSchemaColumns.get(i);
- final ColumnSchema realSchemaColumn =
realSchemaMap.get(incomingSchemaColumn.getName());
- validateTableSchema(incomingSchemaColumn, realSchemaColumn, i,
innerTreeStatement);
+ // construct measurement schema
+ TSDataType tsDataType = existingColumn.getDataType();
+ MeasurementSchema newMeasurementSchema =
+ new MeasurementSchema(
+ existingColumn.getColumnName(),
+ tsDataType,
+ getDefaultEncoding(tsDataType),
+
TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType));
+ innerTreeStatement.setMeasurementSchema(newMeasurementSchema, index);
+ try {
+ innerTreeStatement.selfCheckDataTypes(index);
+ } catch (DataTypeMismatchException | PathNotExistException e) {
+ throw new SemanticException(e);
}
- // incoming schema should contain all id columns in real schema and have
consistent order
- final List<ColumnSchema> realIdColumns = realSchema.getTagColumns();
- adjustIdColumns(realIdColumns, innerTreeStatement);
+ }
+
+ void adjustTagColumns(
+ final LinkedHashMap<String, Integer> tagColumnIndexMap,
+ final LinkedHashMap<String, Integer> existingTagColumnIndexMap) {
+ adjustTagColumns(getInnerTreeStatement(), tagColumnIndexMap,
existingTagColumnIndexMap);
}
/**
- * Adjust the order of ID columns in this insertion to be consistent with
that from the schema
- * region.
+ * Adjust the order of TAG columns in this insertion to be consistent with
that from the schema
+ * region. Optimized for performance: one expansion if needed, one
reordering pass.
*
- * @param realIdColumnSchemas id column order from the schema region
+ * @param tagColumnIndexMap LinkedHashMap of incoming TAG columns, key is
column name, value is
+ * measurement index in the array (will be updated during adjustment)
+ * @param existingTagColumnIndexMap LinkedHashMap of existing TAG columns in
TsTable, key is
+ * column name, value is TAG column index in table (maintains schema
region order)
*/
- public void adjustIdColumns(
- List<ColumnSchema> realIdColumnSchemas, final InsertBaseStatement
baseStatement) {
- List<ColumnSchema> incomingColumnSchemas =
toTableSchema(baseStatement).getColumns();
- for (int realIdColPos = 0; realIdColPos < realIdColumnSchemas.size();
realIdColPos++) {
- ColumnSchema realColumn = realIdColumnSchemas.get(realIdColPos);
- int incomingIdColPos = incomingColumnSchemas.indexOf(realColumn);
- if (incomingIdColPos == -1) {
- // if the realIdColPos-th id column in the table is missing, insert an
empty column in the
- // tablet
- baseStatement.insertColumn(realIdColPos, realColumn);
- incomingColumnSchemas.add(realIdColPos, realColumn);
- } else {
- // move the id column in the tablet to the proper position
- baseStatement.swapColumn(incomingIdColPos, realIdColPos);
- Collections.swap(incomingColumnSchemas, incomingIdColPos,
realIdColPos);
+ void adjustTagColumns(
+ final InsertBaseStatement baseStatement,
+ final LinkedHashMap<String, Integer> tagColumnIndexMap,
+ final LinkedHashMap<String, Integer> existingTagColumnIndexMap) {
+ if (baseStatement == null || existingTagColumnIndexMap.isEmpty()) {
+ return;
+ }
+
+ // Phase 1: Analyze and determine action in one pass
+ final int oldLength = baseStatement.getMeasurements().length;
+ final int totalTagCount = existingTagColumnIndexMap.size();
+
+ // Build mapping, count existing TAG columns, and check if reordering is
needed - all in one
+ // pass
+ int existingTagCount = 0;
+ boolean needReorder = false;
+ final Map<String, Integer> tagOldIndexMap = new LinkedHashMap<>();
+
+ int targetPos = 0;
+ for (String tagColumnName : existingTagColumnIndexMap.keySet()) {
+ final Integer oldIndex = tagColumnIndexMap.get(tagColumnName);
+ if (oldIndex != null) {
+ tagOldIndexMap.put(tagColumnName, oldIndex);
+ existingTagCount++;
+ // Check if this TAG column is at the correct position
+ if (oldIndex != targetPos) {
+ needReorder = true;
+ }
}
+ targetPos++;
}
+
+ final boolean needExpansion = (existingTagCount < totalTagCount);
+
+ if (needExpansion) {
+ // Case 1: Expansion needed - create new arrays and reorder in one pass
+ expandAndReorderTagColumns(
+ baseStatement,
+ tagColumnIndexMap,
+ existingTagColumnIndexMap,
+ tagOldIndexMap,
+ oldLength,
+ totalTagCount);
+ } else if (needReorder) {
+ // Case 2: No expansion but reordering needed - use swaps
+ reorderTagColumnsWithSwap(baseStatement, tagColumnIndexMap,
existingTagColumnIndexMap);
+ }
+ // else: No expansion and already in correct order - no operation needed
+
+ // Clear cached table schema to force regeneration
tableSchema = null;
}
+ /**
+ * Expand arrays and reorder TAG columns in one pass. Array layout: [TAG
area: 0~totalTagCount] +
+ * [non-TAG area: totalTagCount~newLength]
+ */
+ private void expandAndReorderTagColumns(
+ final InsertBaseStatement baseStatement,
+ final LinkedHashMap<String, Integer> tagColumnIndexMap,
+ final LinkedHashMap<String, Integer> existingTagColumnIndexMap,
+ final Map<String, Integer> tagOldIndexMap,
+ final int oldLength,
+ final int totalTagCount) {
+
+ // Build old-to-new index mapping array
+ final int[] oldToNewMapping = new int[oldLength];
+
+ // Get old arrays
+ final String[] oldMeasurements = baseStatement.getMeasurements();
+ final MeasurementSchema[] oldMeasurementSchemas =
baseStatement.getMeasurementSchemas();
+ final TSDataType[] oldDataTypes = baseStatement.getDataTypes();
+ final TsTableColumnCategory[] oldColumnCategories =
baseStatement.getColumnCategories();
+
+ final int missingCount = totalTagCount - tagOldIndexMap.size();
+ final int newLength = oldLength + missingCount;
+
+ // Build mapping for existing TAG columns
+ int tagIdx = 0;
+ for (String tagColumnName : existingTagColumnIndexMap.keySet()) {
+ final Integer oldIdx = tagOldIndexMap.get(tagColumnName);
+ if (oldIdx != null) {
+ oldToNewMapping[oldIdx] = tagIdx; // oldIdx maps to tagIdx in new array
+ }
+ tagIdx++;
+ }
+
+ // Create new arrays with final size
+ final String[] newMeasurements = new String[newLength];
+ final MeasurementSchema[] newMeasurementSchemas =
+ oldMeasurementSchemas != null ? new MeasurementSchema[newLength] :
null;
+ final TSDataType[] newDataTypes = oldDataTypes != null ? new
TSDataType[newLength] : null;
+ final TsTableColumnCategory[] newColumnCategories =
+ oldColumnCategories != null ? new TsTableColumnCategory[newLength] :
null;
+
+ // Fill TAG area [0, totalTagCount) in schema order
+ tagIdx = 0;
+ for (String tagColumnName : existingTagColumnIndexMap.keySet()) {
+ final Integer oldIdx = tagOldIndexMap.get(tagColumnName);
+
+ if (oldIdx == null) {
+ // Missing TAG column, fill with default value
+ fillMissingTagColumn(
+ newMeasurements,
+ newMeasurementSchemas,
+ newDataTypes,
+ newColumnCategories,
+ tagIdx,
+ tagColumnName);
+ } else {
+ // Existing TAG column, copy from old array
+ copyColumn(
+ oldMeasurements,
+ oldMeasurementSchemas,
+ oldDataTypes,
+ oldColumnCategories,
+ newMeasurements,
+ newMeasurementSchemas,
+ newDataTypes,
+ newColumnCategories,
+ oldIdx,
+ tagIdx);
+ }
+ tagIdx++;
+ }
+
+ // Fill non-TAG area [totalTagCount, newLength) and complete the mapping
+ final Set<Integer> tagOldIndices = new HashSet<>(tagOldIndexMap.values());
+ int nonTagIdx = totalTagCount;
+ for (int oldIdx = 0; oldIdx < oldLength; oldIdx++) {
+ if (!tagOldIndices.contains(oldIdx)) {
Review Comment:
[nitpick] Creating a HashSet from `tagOldIndexMap.values()` on line 364
requires iterating through all values and creating a new collection. Since
`tagOldIndexMap` is already a Map, a more efficient approach would be to check
`!tagOldIndexMap.containsValue(oldIdx)` directly, or iterate through entries if
order matters. However, `containsValue()` on LinkedHashMap is O(n), so the
current approach of creating a Set (O(n) creation + O(1) lookups) is actually
optimal for multiple lookups.
##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java:
##########
@@ -1052,6 +1054,23 @@ public Optional<TableSchema> validateTableHeaderSchema(
return Optional.of(tableSchema);
}
Review Comment:
This method overrides [TestMetadata.validateInsertNodeMeasurements](1); it
is advisable to add an Override annotation.
```suggestion
@Override
```
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java:
##########
@@ -114,99 +165,97 @@ public int getTagColumnOrdinal(final String columnName) {
}
public List<TsTableColumnSchema> getTagColumnSchemaList() {
+ List<TsTableColumnSchema> tagColumnSchemaList = tagColumnSchemas.get();
+ if (tagColumnSchemaList != null
+ && lastReadVersion.get() == instanceVersion.get()
+ && isNotWrite.get()) {
+ return tagColumnSchemaList;
+ }
Review Comment:
Similar race condition as in `getColumnSchema()`. The cached list could
become stale between reading it (line 168) and checking the version (line 170).
A write could increment `instanceVersion` after line 168 but before line 170,
causing stale data to be returned. The version and write flag should be checked
before accessing the cached value.
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java:
##########
@@ -95,7 +114,21 @@ public String getTableName() {
return tableName;
}
+ /**
+ * Get column schema with optimistic lock for fast reads. This method uses a
lock-free fast path
+ * when there's no concurrent write operation, significantly improving read
performance.
+ *
+ * @param columnName the column name to query
+ * @return the column schema, or null if not found
+ */
public TsTableColumnSchema getColumnSchema(final String columnName) {
+ long versionBefore = instanceVersion.get();
+ TsTableColumnSchema result = columnSchemaMap.get(columnName);
+ if (isNotWrite.get() && instanceVersion.get() == versionBefore) {
+ return result;
+ }
Review Comment:
There's a potential race condition in the optimistic locking implementation.
Between line 126 (reading from columnSchemaMap) and line 127 (checking
isNotWrite and version), a write operation could complete, leaving `result`
stale. The version check on line 127 should be done before reading from the
map, or the result should be re-read if version changed. Consider: `long v =
instanceVersion.get(); if (isNotWrite.get() && v == instanceVersion.get()) {
return columnSchemaMap.get(columnName); }`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java:
##########
@@ -254,6 +256,316 @@ public Optional<TableSchema> validateTableHeaderSchema(
return Optional.of(new TableSchema(tableSchema.getTableName(),
resultColumnList));
}
+ /** Handler for validating and processing a single measurement column */
+ @FunctionalInterface
+ public interface MeasurementValidator {
+ /**
+ * Validate a measurement column
+ *
+ * @param index measurement index in the array
+ * @param columnCategory column category
+ * @param existingColumn existing column in table, null if not exists
+ */
+ void validate(
+ final int index,
+ final String measurement,
+ final TSDataType dataType,
+ final TsTableColumnCategory columnCategory,
+ final TsTableColumnSchema existingColumn);
+ }
+
+ /** Handler for processing TAG columns during validation */
+ @FunctionalInterface
+ public interface TagColumnHandler {
+ /**
+ * Adjust the order of TAG columns in this insertion to be consistent with
that from the schema
+ * region, similar to adjustIdColumns logic.
+ *
+ * @param tagColumnIndexMap LinkedHashMap of incoming TAG columns, key is
column name, value is
+ * measurement index in the array (maintains insertion order)
+ * @param existingTagColumnIndexMap LinkedHashMap of existing TAG columns
in TsTable, key is
+ * column name, value is TAG column index in table (maintains schema
region order)
+ */
+ void handle(
+ LinkedHashMap<String, Integer> tagColumnIndexMap,
+ LinkedHashMap<String, Integer> existingTagColumnIndexMap);
+ }
+
+ /**
+ * Optimized validation method with custom handlers for measurement
validation and TAG column
+ * processing
+ *
+ * @param database database name
+ * @param measurementInfo measurement information from InsertNode
+ * @param context query context
+ * @param allowCreateTable whether to allow table auto-creation
+ * @param measurementValidator custom validator for each measurement, null
to use default
+ * @param tagColumnHandler custom handler for TAG columns, null to skip TAG
processing
+ * @return validated TsTable, or empty if table doesn't exist and cannot be
created
+ */
+ public void validateInsertNodeMeasurements(
+ final String database,
+ final InsertNodeMeasurementInfo measurementInfo,
+ final MPPQueryContext context,
+ final boolean allowCreateTable,
+ final MeasurementValidator measurementValidator,
+ final TagColumnHandler tagColumnHandler) {
+
+ DataNodeSchemaLockManager.getInstance()
+ .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION_TABLE);
+
+ final TsTableColumnCategory[] columnCategories =
measurementInfo.getColumnCategories();
+ final int measurementCount = measurementInfo.getMeasurementCount();
+
+ if (measurementCount == 0) {
+ throw new SemanticException("No measurements present, please check the
request");
+ }
+
+ TsTable table =
+ DataNodeTableCache.getInstance().getTableInWrite(database,
measurementInfo.getTableName());
+ final List<Integer> missingMeasurementIndices = new ArrayList<>();
+
+ final boolean isAutoCreateSchemaEnabled =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+
+ // First round validate, check existing schema
+ if (table == null) {
+ // Auto create missing table
+ if (allowCreateTable && isAutoCreateSchemaEnabled) {
+ measurementInfo.toLowerCase();
+ measurementInfo.semanticCheck();
+ autoCreateTableFromMeasurementInfo(context, database, measurementInfo);
+ table =
+ DataNodeTableCache.getInstance()
+ .getTable(database, measurementInfo.getTableName(), false);
+ if (table == null) {
+ throw new IllegalStateException(
+ "auto create table succeed, but cannot get table schema in
current node's DataNodeTableCache, may be caused by concurrently auto creating
table");
+ }
+ } else {
+ TableMetadataImpl.throwTableNotExistsException(database,
measurementInfo.getTableName());
+ }
+ } else {
+ DataNodeTreeViewSchemaUtils.checkTableInWrite(database, table);
+ // Note: isStrictTagColumn is always false for InsertNode, so TAG column
order validation is
+ // skipped
+ }
+
+ boolean refreshed = false;
+ boolean noField = true;
+ boolean hasAttribute = false;
+
+ // Track TAG column measurement indices for batch processing after
validation loop
+ // LinkedHashMap maintains insertion order, key is column name, value is
measurement index
+ final LinkedHashMap<String, Integer> tagColumnIndexMap = new
LinkedHashMap<>();
+
+ // Validate each measurement
+ for (int i = 0; i < measurementCount; i++) {
+ String measurementName = measurementInfo.getMeasurementName(i);
+ if (measurementName == null) {
+ continue;
+ }
+
+ final TsTableColumnCategory category =
+ columnCategories != null && i < columnCategories.length ?
columnCategories[i] : null;
+
+ hasAttribute = hasAttribute || category ==
TsTableColumnCategory.ATTRIBUTE;
+
+ TsTableColumnSchema existingColumn =
table.getColumnSchema(measurementName);
+ if (existingColumn == null) {
+ measurementInfo.toLowerCase();
+ measurementInfo.semanticCheck();
+ measurementName = measurementInfo.getMeasurementName(i);
+ existingColumn = table.getColumnSchema(measurementName);
+ }
+
+ if (Objects.isNull(existingColumn)) {
+ if (!refreshed) {
+ // Refresh because there may be new columns added and failed to
commit
+ refreshed = true;
+ table =
+ DataNodeTableCache.getInstance().getTable(database,
measurementInfo.getTableName());
+ existingColumn = table.getColumnSchema(measurementName);
+ }
+
+ if (Objects.isNull(existingColumn)) {
+ // Check arguments for column auto creation
+ if (category == null) {
+ throw new SemanticException(
+ String.format(
+ "Unknown column category for %s. Cannot auto create
column.", measurementName),
+ TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
+ }
+ missingMeasurementIndices.add(i);
+ }
+
+ if (noField && category == TsTableColumnCategory.FIELD) {
+ noField = false;
+ }
+ } else {
+ // Only check column category
+ if (category != null &&
!existingColumn.getColumnCategory().equals(category)) {
+ throw new SemanticException(
+ String.format("Wrong category at column %s.", measurementName),
+ TSStatusCode.COLUMN_CATEGORY_MISMATCH.getStatusCode());
+ }
+ if (noField && existingColumn.getColumnCategory() ==
TsTableColumnCategory.FIELD) {
+ noField = false;
+ }
+
+ // Custom validation handler - get MeasurementSchema on demand
+ if (measurementValidator != null) {
+ measurementValidator.validate(
+ i, measurementName, measurementInfo.getType(i), category,
existingColumn);
+ }
+ }
+
+ // Record TAG column measurement index during validation loop
+ if (tagColumnHandler != null && category == TsTableColumnCategory.TAG) {
+ tagColumnIndexMap.put(measurementName, i); // Store measurement index
+ }
+ }
+
+ if (noField) {
+ throw new SemanticException("No Field column present, please check the
request");
+ }
+
+ measurementInfo.setAttributeColumnsPresent(hasAttribute);
+ if (missingMeasurementIndices.isEmpty()) {
+ measurementInfo.setToLowerCaseApplied(true);
+ } else {
+ measurementInfo.toLowerCase();
+ }
+ measurementInfo.semanticCheck();
+
+ // Auto create missing columns
+ if (!missingMeasurementIndices.isEmpty() && isAutoCreateSchemaEnabled) {
+ autoCreateColumnsFromMeasurements(
+ database, measurementInfo, missingMeasurementIndices, context);
+ table = DataNodeTableCache.getInstance().getTable(database,
measurementInfo.getTableName());
+ } else if (!missingMeasurementIndices.isEmpty()
+ && !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert())
{
+ final List<String> missingNames = new ArrayList<>();
+ for (int idx : missingMeasurementIndices) {
+ missingNames.add(measurementInfo.getMeasurementName(idx));
+ }
+ throw new SemanticException(
+ String.format("Missing columns %s.", missingNames),
+ TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
+ }
+
+ for (int i : missingMeasurementIndices) {
+ measurementValidator.validate(
+ i,
+ measurementInfo.getMeasurementName(i),
+ measurementInfo.getType(i),
+ columnCategories[i],
+ table.getColumnSchema(measurementInfo.getMeasurementName(i)));
+ }
Review Comment:
Variable [measurementValidator](1) may be null at this access as suggested
by [this](2) null guard.
```suggestion
if (measurementValidator != null) {
measurementValidator.validate(
i,
measurementInfo.getMeasurementName(i),
measurementInfo.getType(i),
columnCategories[i],
table.getColumnSchema(measurementInfo.getMeasurementName(i)));
}
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java:
##########
@@ -1397,6 +1398,23 @@ public Optional<TableSchema> validateTableHeaderSchema(
database, tableSchema, context, allowCreateTable,
isStrictTagColumn);
}
Review Comment:
This method overrides [Metadata.validateInsertNodeMeasurements](1); it is
advisable to add an Override annotation.
```suggestion
@Override
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -321,6 +314,18 @@ public class DataRegion implements IDataRegionForQuery {
*/
private Map<Long, Long> partitionMaxFileVersions = new ConcurrentHashMap<>();
+ private static final Cache<TableSchemaCacheKey, Triple<Long, Long,
TableSchema>>
+ TABLE_SCHEMA_CACHE =
+ Caffeine.newBuilder()
+ .maximumWeight(
+
IoTDBDescriptor.getInstance().getConfig().getDataNodeTableSchemaCacheSize())
+ .weigher(
+ (TableSchemaCacheKey k, Triple<Long, Long, TableSchema> v) ->
+ (int)
+
(PipeMemoryWeightUtil.calculateTableSchemaBytesUsed(v.getRight())
+ + 2 * Long.BYTES))
Review Comment:
Potential overflow in [int multiplication](1) before it is converted to long
by use in a numeric context.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java:
##########
@@ -611,6 +611,42 @@ public void swapColumn(int src, int target) {
deviceIDs = null;
}
+ @Override
+ public void rebuildArraysAfterExpansion(final int[] oldToNewMapping, final
int totalTagCount) {
+ final int oldLength = oldToNewMapping.length;
+ final int newLength = measurements.length;
+
+ // Save old arrays
+ final Object[] oldColumns = columns;
+ final BitMap[] oldNullBitMaps = nullBitMaps;
+
+ // Create new arrays: [TAG area: 0~totalTagCount] + [non-TAG area:
totalTagCount~newLength]
+ columns = new Object[newLength];
+ nullBitMaps = oldNullBitMaps != null ? new BitMap[newLength] : null;
+
+ // Initialize all TAG columns with default empty arrays (STRING type)
+ for (int tagIdx = 0; tagIdx < totalTagCount; tagIdx++) {
+ columns[tagIdx] =
+ CommonUtils.createValueColumnOfDataType(
+ TSDataType.STRING, TsTableColumnCategory.TAG, rowCount);
+ if (nullBitMaps != null) {
+ nullBitMaps[tagIdx] = new BitMap(rowCount);
+ nullBitMaps[tagIdx].markAll();
+ }
+ }
+
+ // Copy columns using the mapping: newColumns[oldToNewMapping[oldIdx]] =
oldColumns[oldIdx]
+ for (int oldIdx = 0; oldIdx < oldLength; oldIdx++) {
+ final int newIdx = oldToNewMapping[oldIdx];
+ columns[newIdx] = oldColumns[oldIdx];
+ if (nullBitMaps != null && oldNullBitMaps != null) {
Review Comment:
This check is useless. [oldNullBitMaps](1) cannot be null at this check,
since it is guarded by [... != ...](2).
```suggestion
if (nullBitMaps != null) {
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java:
##########
@@ -254,6 +256,316 @@ public Optional<TableSchema> validateTableHeaderSchema(
return Optional.of(new TableSchema(tableSchema.getTableName(),
resultColumnList));
}
+ /** Handler for validating and processing a single measurement column */
+ @FunctionalInterface
+ public interface MeasurementValidator {
+ /**
+ * Validate a measurement column
+ *
+ * @param index measurement index in the array
+ * @param columnCategory column category
+ * @param existingColumn existing column in table, null if not exists
+ */
+ void validate(
+ final int index,
+ final String measurement,
+ final TSDataType dataType,
+ final TsTableColumnCategory columnCategory,
+ final TsTableColumnSchema existingColumn);
+ }
+
+ /** Handler for processing TAG columns during validation */
+ @FunctionalInterface
+ public interface TagColumnHandler {
+ /**
+ * Adjust the order of TAG columns in this insertion to be consistent with
that from the schema
+ * region, similar to adjustIdColumns logic.
+ *
+ * @param tagColumnIndexMap LinkedHashMap of incoming TAG columns, key is
column name, value is
+ * measurement index in the array (maintains insertion order)
+ * @param existingTagColumnIndexMap LinkedHashMap of existing TAG columns
in TsTable, key is
+ * column name, value is TAG column index in table (maintains schema
region order)
+ */
+ void handle(
+ LinkedHashMap<String, Integer> tagColumnIndexMap,
+ LinkedHashMap<String, Integer> existingTagColumnIndexMap);
+ }
+
+ /**
+ * Optimized validation method with custom handlers for measurement
validation and TAG column
+ * processing
+ *
+ * @param database database name
+ * @param measurementInfo measurement information from InsertNode
+ * @param context query context
+ * @param allowCreateTable whether to allow table auto-creation
+ * @param measurementValidator custom validator for each measurement, null
to use default
+ * @param tagColumnHandler custom handler for TAG columns, null to skip TAG
processing
+ * @return validated TsTable, or empty if table doesn't exist and cannot be
created
+ */
+ public void validateInsertNodeMeasurements(
+ final String database,
+ final InsertNodeMeasurementInfo measurementInfo,
+ final MPPQueryContext context,
+ final boolean allowCreateTable,
+ final MeasurementValidator measurementValidator,
+ final TagColumnHandler tagColumnHandler) {
+
+ DataNodeSchemaLockManager.getInstance()
+ .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION_TABLE);
+
+ final TsTableColumnCategory[] columnCategories =
measurementInfo.getColumnCategories();
+ final int measurementCount = measurementInfo.getMeasurementCount();
+
+ if (measurementCount == 0) {
+ throw new SemanticException("No measurements present, please check the
request");
+ }
+
+ TsTable table =
+ DataNodeTableCache.getInstance().getTableInWrite(database,
measurementInfo.getTableName());
+ final List<Integer> missingMeasurementIndices = new ArrayList<>();
+
+ final boolean isAutoCreateSchemaEnabled =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+
+ // First round validate, check existing schema
+ if (table == null) {
+ // Auto create missing table
+ if (allowCreateTable && isAutoCreateSchemaEnabled) {
+ measurementInfo.toLowerCase();
+ measurementInfo.semanticCheck();
+ autoCreateTableFromMeasurementInfo(context, database, measurementInfo);
+ table =
+ DataNodeTableCache.getInstance()
+ .getTable(database, measurementInfo.getTableName(), false);
+ if (table == null) {
+ throw new IllegalStateException(
+ "auto create table succeed, but cannot get table schema in
current node's DataNodeTableCache, may be caused by concurrently auto creating
table");
+ }
+ } else {
+ TableMetadataImpl.throwTableNotExistsException(database,
measurementInfo.getTableName());
+ }
+ } else {
+ DataNodeTreeViewSchemaUtils.checkTableInWrite(database, table);
+ // Note: isStrictTagColumn is always false for InsertNode, so TAG column
order validation is
+ // skipped
+ }
+
+ boolean refreshed = false;
+ boolean noField = true;
+ boolean hasAttribute = false;
+
+ // Track TAG column measurement indices for batch processing after
validation loop
+ // LinkedHashMap maintains insertion order, key is column name, value is
measurement index
+ final LinkedHashMap<String, Integer> tagColumnIndexMap = new
LinkedHashMap<>();
+
+ // Validate each measurement
+ for (int i = 0; i < measurementCount; i++) {
+ String measurementName = measurementInfo.getMeasurementName(i);
+ if (measurementName == null) {
+ continue;
+ }
+
+ final TsTableColumnCategory category =
+ columnCategories != null && i < columnCategories.length ?
columnCategories[i] : null;
+
+ hasAttribute = hasAttribute || category ==
TsTableColumnCategory.ATTRIBUTE;
+
+ TsTableColumnSchema existingColumn =
table.getColumnSchema(measurementName);
+ if (existingColumn == null) {
+ measurementInfo.toLowerCase();
+ measurementInfo.semanticCheck();
+ measurementName = measurementInfo.getMeasurementName(i);
+ existingColumn = table.getColumnSchema(measurementName);
+ }
+
+ if (Objects.isNull(existingColumn)) {
+ if (!refreshed) {
+ // Refresh because there may be new columns added and failed to
commit
+ refreshed = true;
+ table =
+ DataNodeTableCache.getInstance().getTable(database,
measurementInfo.getTableName());
+ existingColumn = table.getColumnSchema(measurementName);
+ }
+
+ if (Objects.isNull(existingColumn)) {
+ // Check arguments for column auto creation
+ if (category == null) {
+ throw new SemanticException(
+ String.format(
+ "Unknown column category for %s. Cannot auto create
column.", measurementName),
+ TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
+ }
+ missingMeasurementIndices.add(i);
+ }
+
+ if (noField && category == TsTableColumnCategory.FIELD) {
+ noField = false;
+ }
+ } else {
+ // Only check column category
+ if (category != null &&
!existingColumn.getColumnCategory().equals(category)) {
+ throw new SemanticException(
+ String.format("Wrong category at column %s.", measurementName),
+ TSStatusCode.COLUMN_CATEGORY_MISMATCH.getStatusCode());
+ }
+ if (noField && existingColumn.getColumnCategory() ==
TsTableColumnCategory.FIELD) {
+ noField = false;
+ }
+
+ // Custom validation handler - get MeasurementSchema on demand
+ if (measurementValidator != null) {
+ measurementValidator.validate(
+ i, measurementName, measurementInfo.getType(i), category,
existingColumn);
+ }
+ }
+
+ // Record TAG column measurement index during validation loop
+ if (tagColumnHandler != null && category == TsTableColumnCategory.TAG) {
+ tagColumnIndexMap.put(measurementName, i); // Store measurement index
+ }
+ }
+
+ if (noField) {
+ throw new SemanticException("No Field column present, please check the
request");
+ }
+
+ measurementInfo.setAttributeColumnsPresent(hasAttribute);
+ if (missingMeasurementIndices.isEmpty()) {
+ measurementInfo.setToLowerCaseApplied(true);
+ } else {
+ measurementInfo.toLowerCase();
+ }
+ measurementInfo.semanticCheck();
+
+ // Auto create missing columns
+ if (!missingMeasurementIndices.isEmpty() && isAutoCreateSchemaEnabled) {
+ autoCreateColumnsFromMeasurements(
+ database, measurementInfo, missingMeasurementIndices, context);
+ table = DataNodeTableCache.getInstance().getTable(database,
measurementInfo.getTableName());
+ } else if (!missingMeasurementIndices.isEmpty()
+ && !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert())
{
+ final List<String> missingNames = new ArrayList<>();
+ for (int idx : missingMeasurementIndices) {
+ missingNames.add(measurementInfo.getMeasurementName(idx));
+ }
+ throw new SemanticException(
+ String.format("Missing columns %s.", missingNames),
+ TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
+ }
+
+ for (int i : missingMeasurementIndices) {
+ measurementValidator.validate(
+ i,
+ measurementInfo.getMeasurementName(i),
+ measurementInfo.getType(i),
+ columnCategories[i],
Review Comment:
Variable [columnCategories](1) may be null at this access as suggested by
[this](2) null guard.
```suggestion
columnCategories != null ? columnCategories[i] : null,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]