Copilot commented on code in PR #17372: URL: https://github.com/apache/iotdb/pull/17372#discussion_r2999539201
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/TableCopyToOperator.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.CopyToTsFileOptions; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.TsFileFormatCopyToWriter; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class TableCopyToOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableCopyToOperator.class); + + private final OperatorContext operatorContext; + private final Operator childOperator; + private final String targetFilePath; + private final CopyToOptions options; + private final List<ColumnHeader> innerQueryColumnHeaders; + private final List<Integer> columnIndex2TsBlockColumnIndexList; + + private IFormatCopyToWriter writer; + private File targetFile; + private boolean isFinished = false; + private boolean hasData = false; + + public TableCopyToOperator( + OperatorContext operatorContext, + Operator child, + String targetFilePath, + CopyToOptions options, + List<ColumnHeader> innerQueryColumnHeaders, + List<Integer> columnIndex2TsBlockColumnIndexList) { + this.operatorContext = operatorContext; + this.childOperator = child; + this.targetFilePath = targetFilePath; + this.options = options; + this.innerQueryColumnHeaders = innerQueryColumnHeaders; + this.columnIndex2TsBlockColumnIndexList = columnIndex2TsBlockColumnIndexList; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + long startTime = System.nanoTime(); + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + IFormatCopyToWriter formatWriter = getWriter(); + do { + if (!childOperator.hasNext()) { + isFinished = true; + break; + } + TsBlock tsBlock = childOperator.next(); + if (tsBlock == null || tsBlock.isEmpty()) { + continue; + } + hasData = true; + formatWriter.write(tsBlock); + } while (System.nanoTime() - startTime < maxRuntime && !isFinished); + + if (isFinished) { + formatWriter.seal(); + return formatWriter.buildResultTsBlock(); + } + return null; + } + + private IFormatCopyToWriter getWriter() throws Exception { + if (writer != null) { + return writer; + } + this.targetFile = createTargetFile(targetFilePath); + switch (options.getFormat()) { + case TSFILE: + default: + this.writer = + new TsFileFormatCopyToWriter( + this.targetFile, + (CopyToTsFileOptions) options, + innerQueryColumnHeaders, + columnIndex2TsBlockColumnIndexList); + } + return writer; + } + + private File createTargetFile(String path) throws Exception { + File file = new File(path); + if (file.getParent() == null) { + String dir = TierManager.getInstance().getNextFolderForCopyToTargetFile(); + file = new File(dir, path); + } + File parent = file.getParentFile(); + if (parent != null && !parent.exists() && !parent.mkdirs()) { + throw new IOException("Failed to create directories: " + parent); + } + if (!file.createNewFile()) { + throw new IOException("Target file already exists: " + file.getAbsolutePath()); + } Review Comment: `createTargetFile()` will create a new file at an arbitrary user-supplied path (including absolute paths and paths with `..`) and will also create parent directories. Since the path comes from a SQL statement, this effectively allows remote clients to write files anywhere the server user has permissions. Please restrict COPY TO targets to the dedicated `copy_to` directory (or another explicit allowlist/configured base dir), and reject absolute paths / path traversal. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java: ########## @@ -226,6 +247,10 @@ public String getNextFolderForTsFile(int tierLevel, boolean sequence) : unSeqTiers.get(tierLevel).getNextFolder(); } + public String getNextFolderForCopyToTargetFile() throws DiskSpaceInsufficientException { Review Comment: `getNextFolderForCopyToTargetFile()` will throw a NullPointerException if `copyToFolderManager` was not initialized (e.g., `FolderManager` construction failed due to `DiskSpaceInsufficientException`). Consider either not swallowing the exception in `initFolders()`, or check for null here and throw a clear `DiskSpaceInsufficientException`/`IllegalStateException` with actionable context. ```suggestion public String getNextFolderForCopyToTargetFile() throws DiskSpaceInsufficientException { if (copyToFolderManager == null) { throw new DiskSpaceInsufficientException( "copyToFolderManager is not initialized. This usually indicates that folder " + "initialization in TierManager.initFolders() failed due to insufficient disk " + "space. Please check disk space and related configuration before retrying the " + "copy-to-target operation."); } ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java: ########## @@ -388,6 +389,24 @@ public PlanAndMappings visitExplainAnalyze(ExplainAnalyzeNode node, UnaliasConte mapping); } + @Override + public PlanAndMappings visitCopyTo(CopyToNode node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + List<Symbol> newChildPermittedOutputs = mapper.map(node.getChildPermittedOutputs()); + return new PlanAndMappings( + new CopyToNode( + node.getPlanNodeId(), + rewrittenSource.getRoot(), + node.getTargetFilePath(), + node.getCopyToOptions(), + newChildPermittedOutputs, + node.getInnerQueryDatasetHeader(), + node.getInnerQueryOutputNode()), Review Comment: In `visitCopyTo`, the child plan is rewritten and symbols may be remapped, but `innerQueryOutputNode` is passed through unchanged. `TableCopyToOperator` later calls `datasetHeader.setTableColumnToTsBlockIndexMap(innerQueryOutputNode, childNode)` and relies on `innerQueryOutputNode.getOutputSymbols()` matching the (possibly rewritten) child’s output symbols; if they don’t, `outputSymbolsIndexMap.get(...)` can return null and fail at runtime. Please remap `innerQueryOutputNode`’s output symbols using the same `SymbolMapper` (similar to `childPermittedOutputs`), or store a symbol-independent mapping instead of the original `OutputNode`. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CopyTo.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.CopyToOptions; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +public class CopyTo extends Statement { + + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(CopyTo.class); + private final Statement queryStatement; + private final String targetFileName; + private final CopyToOptions options; + + public CopyTo(Statement queryStatement, String targetFileName, CopyToOptions options) { + this(null, queryStatement, targetFileName, options); + } + + public CopyTo( + @Nullable NodeLocation location, + Statement queryStatement, + String targetFileName, + CopyToOptions options) { + super(location); + this.queryStatement = queryStatement; + this.targetFileName = targetFileName; + this.options = options; + } + + public Statement getQueryStatement() { + return queryStatement; + } + + public String getTargetFileName() { + return targetFileName; + } + + public CopyToOptions getOptions() { + return options; + } + + @Override + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { + return visitor.visitCopyTo(this, context); + } + + @Override + public List<Node> getChildren() { + return ImmutableList.<Node>builder().add(queryStatement).build(); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + CopyTo copyTo = (CopyTo) o; + return Objects.equals(queryStatement, copyTo.queryStatement) + && Objects.equals(targetFileName, copyTo.targetFileName) + && Objects.equals(options, copyTo.options); + } + + @Override + public int hashCode() { + return Objects.hash(queryStatement, targetFileName, options); + } + + @Override + public String toString() { + return "CopyTo{" + + "queryStatement=" + + queryStatement + + ", targetFileName='" + + targetFileName + + '\'' + + ", options=" + + options + + '}'; + } + + @Override + public long ramBytesUsed() { + long size = INSTANCE_SIZE; + size += AstMemoryEstimationHelper.getEstimatedSizeOfNodeLocation(getLocationInternal()); + size += AstMemoryEstimationHelper.getEstimatedSizeOfAccountableObject(queryStatement); Review Comment: `ramBytesUsed()` only accounts for the node location and `queryStatement`, but it omits the retained sizes of `targetFileName` and `options`. Other AST nodes include string fields in their memory estimates; please add these fields here too to keep memory accounting accurate. ```suggestion size += AstMemoryEstimationHelper.getEstimatedSizeOfAccountableObject(queryStatement); size += AstMemoryEstimationHelper.getEstimatedSizeOfString(targetFileName); size += AstMemoryEstimationHelper.getEstimatedSizeOfAccountableObject(options); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java: ########## @@ -643,6 +644,21 @@ protected Void visitExplain(Explain node, Integer indent) { return null; } + @Override + protected Void visitCopyTo(CopyTo node, Integer context) { + builder.append("COPY\n"); + builder.append("(\n"); + process(node.getQueryStatement(), context); + builder.append("\n) "); + builder.append("TO "); + builder.append('\''); + builder.append(node.getTargetFileName()); + builder.append('\''); + builder.append("\n"); + builder.append(node.getOptions().toString()); Review Comment: `SqlFormatter` currently appends `node.getOptions().toString()`, but `CopyToTsFileOptions.toString()` does not match the grammar keywords (e.g., it prints `TIME COLUMN`, `TAG COLUMNS`, `MEMORY` instead of `TIME`, `TAGS`, `MEMORY_THRESHOLD`). This makes formatted SQL invalid/unparseable and inconsistent with the actual user-facing syntax. Please format options explicitly here (or provide a dedicated formatter) using the grammar’s option keywords. ```suggestion if (node.getOptions() != null) { String optionsSql = node.getOptions().toString(); // Normalize option keywords to match the SQL grammar optionsSql = optionsSql .replace("TIME COLUMN", "TIME") .replace("TAG COLUMNS", "TAGS") .replace("MEMORY", "MEMORY_THRESHOLD"); builder.append(optionsSql); } ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/tsfile/CopyToTsFileOptions.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.CopyToOptions; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.planner.RelationPlan; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; +import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import org.apache.iotdb.db.utils.constant.SqlConstant; + +import org.apache.tsfile.enums.TSDataType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CopyToTsFileOptions implements CopyToOptions { + + private static final String DEFAULT_TABLE_NAME = "default"; + + private String targetTableName; + private String targetTimeColumn; + private Set<String> targetTagColumns; + private final long targetMemoryThreshold; + + private boolean generateNewTimeColumn = false; + + public CopyToTsFileOptions( + String targetTableName, + String targetTimeColumn, + Set<String> targetTagColumns, + long targetMemoryThreshold) { + this.targetTableName = targetTableName; + this.targetTimeColumn = targetTimeColumn; + this.targetTagColumns = targetTagColumns; + this.targetMemoryThreshold = targetMemoryThreshold; + } + + public Format getFormat() { + return Format.TSFILE; + } + + public boolean isGenerateNewTimeColumn() { + return generateNewTimeColumn; + } + + @Override + public void infer( + Analysis analysis, RelationPlan queryRelationPlan, List<ColumnHeader> columnHeaders) { + List<Identifier> tables = queryRelationPlan.getScope().getTables(); + TsTable onlyOneQueriedTable = null; + if (tables != null && tables.size() == 1) { + onlyOneQueriedTable = + DataNodeTableCache.getInstance() + .getTable(analysis.getDatabaseName(), tables.get(0).toString(), false); + } + if (targetTableName == null) { + targetTableName = + onlyOneQueriedTable == null ? DEFAULT_TABLE_NAME : onlyOneQueriedTable.getTableName(); + } + if (onlyOneQueriedTable != null) { + if (targetTimeColumn == null || targetTagColumns == null) { + inferTimeAndTags(onlyOneQueriedTable, columnHeaders); + } + } + if (targetTimeColumn == null) { + generateNewTimeColumn = true; + targetTimeColumn = SqlConstant.TABLE_TIME_COLUMN_NAME; + } + if (targetTagColumns == null) { + targetTagColumns = Collections.emptySet(); + } + } + + private void inferTimeAndTags(TsTable tsTable, List<ColumnHeader> columnHeaders) { + Map<String, ColumnHeader> columnName2ColumnHeaderMapInDataset = new HashMap<>(); + for (ColumnHeader columnHeader : columnHeaders) { + columnName2ColumnHeaderMapInDataset.put(columnHeader.getColumnName(), columnHeader); + } + if (targetTagColumns == null) { + boolean canMatchAllTags = true; + List<TsTableColumnSchema> tagColumnsInTsTable = tsTable.getTagColumnSchemaList(); + for (TsTableColumnSchema tsTableColumnSchema : tagColumnsInTsTable) { + String columnName = tsTableColumnSchema.getColumnName(); + ColumnHeader columnHeaderInDataset = columnName2ColumnHeaderMapInDataset.get(columnName); + if (columnHeaderInDataset == null + || columnHeaderInDataset.getColumnType() != tsTableColumnSchema.getDataType()) { + canMatchAllTags = false; + break; + } + } + if (canMatchAllTags) { + this.targetTagColumns = new LinkedHashSet<>(tagColumnsInTsTable.size()); + for (TsTableColumnSchema tagColumn : tagColumnsInTsTable) { + targetTagColumns.add(tagColumn.getColumnName()); + } + } + } + + if (targetTimeColumn == null) { + String timeColumnInTsTable = tsTable.getTimeColumnName(); + if (timeColumnInTsTable != null) { + ColumnHeader timeColumnHeader = + columnName2ColumnHeaderMapInDataset.get(timeColumnInTsTable); + if (timeColumnHeader != null) { + this.targetTimeColumn = timeColumnHeader.getColumnName(); + } + } + if (targetTimeColumn == null + && columnName2ColumnHeaderMapInDataset.containsKey(SqlConstant.TABLE_TIME_COLUMN_NAME)) { + this.targetTimeColumn = SqlConstant.TABLE_TIME_COLUMN_NAME; + } + } + } + + @Override + public void check(List<ColumnHeader> columnHeaders) { + if (generateNewTimeColumn && targetTagColumns.isEmpty()) { + return; + } + Set<String> columns = new HashSet<>(targetTagColumns.size()); + int foundTagColumns = 0; + for (ColumnHeader columnHeader : columnHeaders) { + if (!generateNewTimeColumn + && columnHeader.getColumnName().equals(targetTimeColumn) + && columnHeader.getColumnType() != TSDataType.TIMESTAMP) { + throw new SemanticException("Data type of target time column is not TIMESTAMP"); + } + if (targetTagColumns.contains(columnHeader.getColumnName())) { + if (columnHeader.getColumnType() != TSDataType.STRING) { + throw new SemanticException( + "Data type of tag column " + columnHeader.getColumnName() + " is not STRING"); + } + foundTagColumns++; + } + columns.add(columnHeader.getColumnName()); + } + if (columns.size() != columnHeaders.size()) { + throw new SemanticException("Duplicate column names in query dataset."); + } + if (foundTagColumns != targetTagColumns.size()) { + throw new SemanticException("Some specified tag columns are not exist in query dataset."); + } + if (foundTagColumns + (generateNewTimeColumn ? 0 : 1) == columns.size()) { + throw new SemanticException("Number of field columns should be larger than 0."); + } + } + + public List<ColumnHeader> getRespColumnHeaders() { + return ColumnHeaderConstant.COPY_TO_TSFILE_COLUMN_HEADERS; + } + + @Override + public long estimatedMaxRamBytesInWrite() { + return targetMemoryThreshold; + } + + public String getTargetTableName() { + return targetTableName; + } + + public String getTargetTimeColumn() { + return targetTimeColumn; + } + + public Set<String> getTargetTagColumns() { + return targetTagColumns; + } + + public long getTargetMemoryThreshold() { + return targetMemoryThreshold; + } + + @Override + public String toString() { + return "(" + + + // FORMAT + "\nFORMAT " + + getFormat().name() + + + + // TABLE + "\nTABLE " + + targetTableName + + + + // TIME COLUMN + "\nTIME COLUMN " + + targetTimeColumn + + + + // TAG COLUMNS + "\nTAG COLUMNS (" + + String.join(", ", targetTagColumns) + + ")" + + + + // MEMORY + "\nMEMORY " Review Comment: `toString()` renders option keywords that don’t match the COPY TO grammar (`TIME COLUMN`, `TAG COLUMNS`, `MEMORY` vs `TIME`, `TAGS`, `MEMORY_THRESHOLD`). Since SQL formatting/logging uses this output, it can be misleading and (when reused) unparseable. Align the rendered keywords with the grammar, or avoid using `toString()` for producing SQL text. ```suggestion // TIME "\nTIME " + targetTimeColumn + // TAGS "\nTAGS (" + String.join(", ", targetTagColumns) + ")" + // MEMORY_THRESHOLD "\nMEMORY_THRESHOLD " ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
