Caideyipi commented on code in PR #16699:
URL: https://github.com/apache/iotdb/pull/16699#discussion_r2544788629
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java:
##########
@@ -254,6 +256,316 @@ public Optional<TableSchema> validateTableHeaderSchema(
return Optional.of(new TableSchema(tableSchema.getTableName(),
resultColumnList));
}
+ /** Handler for validating and processing a single measurement column */
+ @FunctionalInterface
+ public interface MeasurementValidator {
+ /**
+ * Validate a measurement column
+ *
+ * @param index measurement index in the array
+ * @param columnCategory column category
+ * @param existingColumn existing column in table, null if not exists
+ */
+ void validate(
+ int index,
+ String measurement,
+ TSDataType dataType,
+ TsTableColumnCategory columnCategory,
+ TsTableColumnSchema existingColumn);
+ }
+
+ /** Handler for processing TAG columns during validation */
+ @FunctionalInterface
+ public interface TagColumnHandler {
+ /**
+ * Adjust the order of TAG columns in this insertion to be consistent with
that from the schema
+ * region, similar to adjustIdColumns logic.
+ *
+ * @param tagColumnIndexMap LinkedHashMap of incoming TAG columns, key is
column name, value is
+ * measurement index in the array (maintains insertion order)
+ * @param existingTagColumnIndexMap LinkedHashMap of existing TAG columns
in TsTable, key is
+ * column name, value is TAG column index in table (maintains schema
region order)
+ */
+ void handle(
+ LinkedHashMap<String, Integer> tagColumnIndexMap,
+ LinkedHashMap<String, Integer> existingTagColumnIndexMap);
+ }
+
+ /**
+ * Optimized validation method with custom handlers for measurement
validation and TAG column
+ * processing
+ *
+ * @param database database name
+ * @param measurementInfo measurement information from InsertNode
+ * @param context query context
+ * @param allowCreateTable whether to allow table auto-creation
+ * @param measurementValidator custom validator for each measurement, null
to use default
+ * @param tagColumnHandler custom handler for TAG columns, null to skip TAG
processing
+ * @return validated TsTable, or empty if table doesn't exist and cannot be
created
+ */
+ public void validateInsertNodeMeasurements(
+ final String database,
+ final InsertNodeMeasurementInfo measurementInfo,
+ final MPPQueryContext context,
+ final boolean allowCreateTable,
+ final MeasurementValidator measurementValidator,
+ final TagColumnHandler tagColumnHandler) {
+
+ DataNodeSchemaLockManager.getInstance()
+ .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION_TABLE);
+
+ final TsTableColumnCategory[] columnCategories =
measurementInfo.getColumnCategories();
+ final int measurementCount = measurementInfo.getMeasurementCount();
+
+ if (measurementCount == 0) {
+ throw new SemanticException("No measurements present, please check the
request");
+ }
+
+ TsTable table =
+ DataNodeTableCache.getInstance().getTableInWrite(database,
measurementInfo.getTableName());
+ final List<Integer> missingMeasurementIndices = new ArrayList<>();
+
+ final boolean isAutoCreateSchemaEnabled =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+
+ // First round validate, check existing schema
+ if (table == null) {
+ // Auto create missing table
+ if (allowCreateTable && isAutoCreateSchemaEnabled) {
+ measurementInfo.toLowerCase();
+ measurementInfo.semanticCheck();
+ autoCreateTableFromMeasurementInfo(context, database, measurementInfo);
+ table =
+ DataNodeTableCache.getInstance()
+ .getTable(database, measurementInfo.getTableName(), false);
+ if (table == null) {
+ throw new IllegalStateException(
+ "auto create table succeed, but cannot get table schema in
current node's DataNodeTableCache, may be caused by concurrently auto creating
table");
+ }
+ } else {
+ TableMetadataImpl.throwTableNotExistsException(database,
measurementInfo.getTableName());
+ }
+ } else {
+ DataNodeTreeViewSchemaUtils.checkTableInWrite(database, table);
+ // Note: isStrictTagColumn is always false for InsertNode, so TAG column
order validation is
+ // skipped
+ }
+
+ boolean refreshed = false;
+ boolean noField = true;
+ boolean hasAttribute = false;
+
+ // Track TAG column measurement indices for batch processing after
validation loop
+ // LinkedHashMap maintains insertion order, key is column name, value is
measurement index
+ final LinkedHashMap<String, Integer> tagColumnIndexMap = new
LinkedHashMap<>();
+
+ // Validate each measurement
+ for (int i = 0; i < measurementCount; i++) {
+ String measurementName = measurementInfo.getMeasurementName(i);
+ if (measurementName == null) {
+ continue;
+ }
+
+ final TsTableColumnCategory category =
+ columnCategories != null && i < columnCategories.length ?
columnCategories[i] : null;
+
+ hasAttribute = hasAttribute || category ==
TsTableColumnCategory.ATTRIBUTE;
+
+ TsTableColumnSchema existingColumn =
table.getColumnSchema(measurementName);
+ if (existingColumn == null) {
+ measurementInfo.toLowerCase();
+ measurementInfo.semanticCheck();
+ measurementName = measurementInfo.getMeasurementName(i);
+ existingColumn = table.getColumnSchema(measurementName);
+ }
+
+ if (Objects.isNull(existingColumn)) {
+ if (!refreshed) {
+ // Refresh because there may be new columns added and failed to
commit
+ refreshed = true;
+ table =
+ DataNodeTableCache.getInstance().getTable(database,
measurementInfo.getTableName());
+ existingColumn = table.getColumnSchema(measurementName);
+ }
+
+ if (Objects.isNull(existingColumn)) {
+ // Check arguments for column auto creation
+ if (category == null) {
+ throw new SemanticException(
+ String.format(
+ "Unknown column category for %s. Cannot auto create
column.", measurementName),
+ TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode());
+ }
+ missingMeasurementIndices.add(i);
+ }
+
+ if (noField && category == TsTableColumnCategory.FIELD) {
+ noField = false;
+ }
+ } else {
+ // Only check column category
+ if (category != null &&
!existingColumn.getColumnCategory().equals(category)) {
+ throw new SemanticException(
+ String.format("Wrong category at column %s.", measurementName),
+ TSStatusCode.COLUMN_CATEGORY_MISMATCH.getStatusCode());
+ }
+ if (noField && existingColumn.getColumnCategory() ==
TsTableColumnCategory.FIELD) {
+ noField = false;
+ }
+
+ // Custom validation handler - get MeasurementSchema on demand
+ if (measurementValidator != null) {
+ measurementValidator.validate(
+ i, measurementName, measurementInfo.getType(i), category,
existingColumn);
+ }
+ }
+
+ // Record TAG column measurement index during validation loop
+ if (tagColumnHandler != null && category == TsTableColumnCategory.TAG) {
+ tagColumnIndexMap.put(measurementName, i); // Store measurement index
+ }
+ }
+
+ if (noField) {
+ throw new SemanticException("No Field column present, please check the
request");
+ }
+
+ measurementInfo.setAttributeColumnsPresent(hasAttribute);
+ if (missingMeasurementIndices.isEmpty()) {
+ measurementInfo.setToLowerCaseApplied(true);
Review Comment:
Check this, ensure the tableName is lowered
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]