JackieTien97 commented on code in PR #17372: URL: https://github.com/apache/iotdb/pull/17372#discussion_r3007363397
########## integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/copyto/IoTDBCopyToTsFileIT.java: ########## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent.copyto; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBCopyToTsFileIT { + + private static final String DATABASE_NAME = "test_db"; + + protected static final String[] createSqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "create table table1(tag1 string tag, tag2 string tag, s1 int32 field, s2 int32 field)", + "insert into table1(time, tag1, tag2, s1, s2) values (1, 't1_1', 't2', 1, 1)", + "insert into table1(time, tag1, tag2, s1, s2) values (2, 't1_1', 't2', 2, 2)", + "insert into table1(time, tag1, tag2, s1, s2) values (3, 't1_1', 't2', 3, 3)", + "insert into table1(time, tag1, tag2, s1, s2) values (1, 't1_2', 't2', 1, 1)", + "insert into table1(time, tag1, tag2, s1, s2) values (2, 't1_2', 't2', 2, 2)", + "insert into table1(time, tag1, tag2, s1, s2) values (3, 't1_2', 't2', 3, 3)", + "create table table2(tag1 string tag, tag2 string tag, s1 int32 field, s2 int32 field)", + "insert into table2(time, tag1, tag2, s1, s2) values (1, 't1_1', 't2', 1, 1)", + }; + + private String targetFilePath = null; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + prepareTableData(createSqls); + } + + @After + public void tearDownAfterTest() { + if (targetFilePath != null) { + try { + Files.deleteIfExists(new File(targetFilePath).toPath()); + } catch (Exception ignored) { + } + targetFilePath = null; + } + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testCopyTable() + throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + SessionDataSet sessionDataSet = + session.executeQueryStatement("copy table1 to '1.tsfile' (memory_threshold 1000000)"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + targetFilePath = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); Review Comment: should get long? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java: ########## @@ -101,6 +102,12 @@ private void addOutputSymbolsToTypeProvider(PlanNode node) { } } + @Override + public Void visitCopyTo(CopyToNode node, Void context) { + visitPlan(node.getChild(), context); + return null; Review Comment: add output symbol type to beTypeProvider.putTableModelType, just like intonode? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CopyToNode.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.CopyToOptions; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + +public class CopyToNode extends SingleChildProcessNode { + + private final String targetFilePath; + private final CopyToOptions copyToOptions; + private final List<Symbol> childPermittedOutputs; + private final List<Symbol> innerQueryOutputSymbols; + private final DatasetHeader innerQueryDatasetHeader; + + public CopyToNode( + PlanNodeId id, + PlanNode child, + String targetFilePath, + CopyToOptions copyToOptions, + List<Symbol> childPermittedOutputs, + DatasetHeader innerQueryDatasetHeader, + List<Symbol> innerQueryOutputSymbols) { + super(id); + this.child = child; + this.targetFilePath = targetFilePath; + this.copyToOptions = copyToOptions; + this.childPermittedOutputs = childPermittedOutputs; + this.innerQueryDatasetHeader = innerQueryDatasetHeader; + this.innerQueryOutputSymbols = innerQueryOutputSymbols; + } + + public DatasetHeader getInnerQueryDatasetHeader() { + return innerQueryDatasetHeader; + } + + public List<Symbol> getInnerQueryOutputSymbols() { + return innerQueryOutputSymbols; + } + + public String getTargetFilePath() { + return targetFilePath; + } + + public CopyToOptions getCopyToOptions() { + return copyToOptions; + } + + public List<Symbol> getChildPermittedOutputs() { + return childPermittedOutputs; + } + + @Override + public PlanNode clone() { + return new CopyToNode( + id, + child, + targetFilePath, + copyToOptions, + childPermittedOutputs, + innerQueryDatasetHeader, + innerQueryOutputSymbols); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitCopyTo(this, context); + } + + @Override + public List<Symbol> getOutputSymbols() { Review Comment: This recreates Symbol objects on every invocation. Since getOutputSymbols() is called frequently during planning, this is wasteful. Cache the result or use a static final list. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/tsfile/TsFileFormatCopyToWriter.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.IFormatCopyToWriter; + +import org.apache.ratis.util.MemoizedCheckedSupplier; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.ColumnSchema; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.TableTsBlock2TsFileWriter; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class TsFileFormatCopyToWriter implements IFormatCopyToWriter { + private final File targetFile; + private final String targetTableName; + private final String targetTimeColumn; + private final Set<String> targetTagColumns; + private final boolean generateNewTimeColumn; + + private MemoizedCheckedSupplier<TableTsBlock2TsFileWriter, IOException> tsFileWriter; + private long rowCount = 0; + private long deviceCount = 0; + + public TsFileFormatCopyToWriter( + File file, + CopyToTsFileOptions copyToOptions, + List<ColumnHeader> innerQueryDatasetHeader, + int[] columnIndex2TsBlockColumnIndex) { + this.targetFile = file; + this.targetTableName = copyToOptions.getTargetTableName(); + this.targetTimeColumn = copyToOptions.getTargetTimeColumn(); + this.generateNewTimeColumn = copyToOptions.isGenerateNewTimeColumn(); + targetTagColumns = copyToOptions.getTargetTagColumns(); + + ColumnHeader[] columnHeadersMatchChildTsBlock = + getColumnHeadersMatchTsBlock(innerQueryDatasetHeader, columnIndex2TsBlockColumnIndex); + List<ColumnSchema> columnSchemas = + new ArrayList<>(columnHeadersMatchChildTsBlock.length + (generateNewTimeColumn ? 1 : 0)); + Map<String, Integer> columnNameIndexMapInDatasetHeader = new HashMap<>(); + for (int i = 0; i < columnHeadersMatchChildTsBlock.length; i++) { + columnNameIndexMapInDatasetHeader.put(columnHeadersMatchChildTsBlock[i].getColumnName(), i); + } + // add time column + columnSchemas.add( + new ColumnSchema(targetTimeColumn, TSDataType.TIMESTAMP, ColumnCategory.TIME)); + int timeColumnIdxInQueryTsBlock = + generateNewTimeColumn ? -1 : columnNameIndexMapInDatasetHeader.get(targetTimeColumn); + + // add tag columns + int[] tagColumnIndexesInTsBlock = new int[targetTagColumns.size()]; + int arrIdx = 0; + for (String targetTagColumn : targetTagColumns) { + int tagIdx = columnNameIndexMapInDatasetHeader.get(targetTagColumn); + ColumnHeader tagColumnHeader = columnHeadersMatchChildTsBlock[tagIdx]; + columnSchemas.add( + new ColumnSchema( + tagColumnHeader.getColumnName(), + tagColumnHeader.getColumnType(), + ColumnCategory.TAG)); + tagColumnIndexesInTsBlock[arrIdx++] = tagIdx; + } + // add field columns + int fieldColumnCount = + columnHeadersMatchChildTsBlock.length + - tagColumnIndexesInTsBlock.length + - (generateNewTimeColumn ? 0 : 1); + int[] fieldColumnIndexesInQueryTsBlock = new int[fieldColumnCount]; + IMeasurementSchema[] fieldColumnSchemas = new IMeasurementSchema[fieldColumnCount]; + arrIdx = 0; + for (int i = 0; i < columnHeadersMatchChildTsBlock.length; i++) { + ColumnHeader columnHeader = columnHeadersMatchChildTsBlock[i]; + if (targetTagColumns.contains(columnHeader.getColumnName()) + || columnHeader.getColumnName().equals(targetTimeColumn)) { + continue; + } + columnSchemas.add( + new ColumnSchema( + columnHeader.getColumnName(), columnHeader.getColumnType(), ColumnCategory.FIELD)); + fieldColumnSchemas[arrIdx] = + new MeasurementSchema(columnHeader.getColumnName(), columnHeader.getColumnType()); + fieldColumnIndexesInQueryTsBlock[arrIdx] = i; + arrIdx++; + } + + TableSchema tableSchema = new TableSchema(targetTableName, columnSchemas); + this.tsFileWriter = + MemoizedCheckedSupplier.valueOf( + () -> + new TableTsBlock2TsFileWriter( + targetFile, + tableSchema, + copyToOptions.getTargetMemoryThreshold(), + generateNewTimeColumn, + timeColumnIdxInQueryTsBlock, + tagColumnIndexesInTsBlock, + fieldColumnIndexesInQueryTsBlock, + fieldColumnSchemas)); + } + + private ColumnHeader[] getColumnHeadersMatchTsBlock( + List<ColumnHeader> queryOutputColumnHeaders, int[] columnIndex2TsBlockColumnIndexList) { + ColumnHeader[] columnHeadersMatchTsBlock = new ColumnHeader[queryOutputColumnHeaders.size()]; + for (int i = 0; i < columnIndex2TsBlockColumnIndexList.length; i++) { + int tsBlockIndex = columnIndex2TsBlockColumnIndexList[i]; + columnHeadersMatchTsBlock[tsBlockIndex] = queryOutputColumnHeaders.get(i); + } + return columnHeadersMatchTsBlock; + } + + @Override + public void write(TsBlock tsBlock) throws Exception { + tsFileWriter.get().write(tsBlock); + } + + @Override + public void seal() throws Exception { + if (!tsFileWriter.isInitialized()) { + return; + } + TableTsBlock2TsFileWriter writer = tsFileWriter.get(); + writer.close(); + // should call these methods after writer.close() + deviceCount = writer.getDeviceCount(); + rowCount = writer.getRowCount(); + tsFileWriter = null; + } + + @Override + public TsBlock buildResultTsBlock() { + TsBlockBuilder builder = + TsBlockBuilder.withMaxTsBlockSize( + 1024, + ColumnHeaderConstant.COPY_TO_TSFILE_COLUMN_HEADERS.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList())); + builder.getTimeColumnBuilder().writeLong(0); + builder.getValueColumnBuilders()[0].writeBinary( + new Binary(rowCount > 0 ? targetFile.getAbsolutePath() : "", TSFileConfig.STRING_CHARSET)); + builder.getValueColumnBuilders()[1].writeLong(rowCount); + builder.getValueColumnBuilders()[2].writeLong(deviceCount); + builder.getValueColumnBuilders()[3].writeLong(targetFile.length()); + builder.getValueColumnBuilders()[4].writeBinary( + new Binary(targetTableName, TSFileConfig.STRING_CHARSET)); + builder.getValueColumnBuilders()[5].writeBinary( + new Binary( + targetTimeColumn + (generateNewTimeColumn ? "(auto_gen)" : ""), Review Comment: make "(auto_gen)" a constant ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/TableCopyToOperator.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.CopyToTsFileOptions; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.TsFileFormatCopyToWriter; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +public class TableCopyToOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableCopyToOperator.class); + + private final OperatorContext operatorContext; + private final Operator childOperator; + private final String targetFilePath; + private final CopyToOptions options; + private final List<ColumnHeader> innerQueryColumnHeaders; + private final int[] columnIndex2TsBlockColumnIndex; + + private IFormatCopyToWriter writer; + private File targetFile; + private boolean isFinished = false; + private boolean hasData = false; + + public TableCopyToOperator( + OperatorContext operatorContext, + Operator child, + String targetFilePath, + CopyToOptions options, + List<ColumnHeader> innerQueryColumnHeaders, + int[] columnIndex2TsBlockColumnIndex) { + this.operatorContext = operatorContext; + this.childOperator = child; + this.targetFilePath = targetFilePath; + this.options = options; + this.innerQueryColumnHeaders = innerQueryColumnHeaders; + this.columnIndex2TsBlockColumnIndex = columnIndex2TsBlockColumnIndex; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + IFormatCopyToWriter formatWriter = getWriter(); + if (!childOperator.hasNext()) { + isFinished = true; + formatWriter.seal(); + return formatWriter.buildResultTsBlock(); + } + TsBlock tsBlock = childOperator.next(); + if (tsBlock == null || tsBlock.isEmpty()) { + return null; + } + hasData = true; + formatWriter.write(tsBlock); + return null; + } + + private IFormatCopyToWriter getWriter() throws Exception { + if (writer != null) { + return writer; + } + this.targetFile = createTargetFile(targetFilePath); + switch (options.getFormat()) { + case TSFILE: + default: + this.writer = + new TsFileFormatCopyToWriter( + this.targetFile, + (CopyToTsFileOptions) options, + innerQueryColumnHeaders, + columnIndex2TsBlockColumnIndex); + } + return writer; + } + + private File createTargetFile(String path) throws Exception { + File file = new File(path); + if (file.getParent() == null) { + String dir = TierManager.getInstance().getNextFolderForCopyToTargetFile(); + file = new File(dir, path); + } + File parent = file.getParentFile(); + if (parent != null && !parent.exists() && !parent.mkdirs()) { + throw new IOException("Failed to create directories: " + parent); + } + if (file.exists()) { + throw new IOException("Target file already exists: " + file.getAbsolutePath()); + } + if (!file.createNewFile()) { + throw new IOException("Failed to create file: " + file.getAbsolutePath()); + } + + return file; + } + + @Override + public boolean hasNext() throws Exception { + return !isFinished; + } + + @Override + public ListenableFuture<?> isBlocked() { + return childOperator.isBlocked(); + } + + @Override + public void close() throws Exception { + childOperator.close(); + if (writer != null) { + writer.close(); + writer = null; + } + if (targetFile == null || (hasData && isFinished)) { + return; + } + Files.deleteIfExists(targetFile.toPath()); + } + + @Override + public boolean isFinished() throws Exception { + return isFinished; + } + + @Override + public long calculateMaxPeekMemory() { Review Comment: should include the memory occupied by tsfile writer, that's the memoryThreshold ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/TableCopyToOperator.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.CopyToTsFileOptions; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.TsFileFormatCopyToWriter; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +public class TableCopyToOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableCopyToOperator.class); + + private final OperatorContext operatorContext; + private final Operator childOperator; + private final String targetFilePath; + private final CopyToOptions options; + private final List<ColumnHeader> innerQueryColumnHeaders; + private final int[] columnIndex2TsBlockColumnIndex; + + private IFormatCopyToWriter writer; + private File targetFile; + private boolean isFinished = false; + private boolean hasData = false; + + public TableCopyToOperator( + OperatorContext operatorContext, + Operator child, + String targetFilePath, + CopyToOptions options, + List<ColumnHeader> innerQueryColumnHeaders, + int[] columnIndex2TsBlockColumnIndex) { + this.operatorContext = operatorContext; + this.childOperator = child; + this.targetFilePath = targetFilePath; + this.options = options; + this.innerQueryColumnHeaders = innerQueryColumnHeaders; + this.columnIndex2TsBlockColumnIndex = columnIndex2TsBlockColumnIndex; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + IFormatCopyToWriter formatWriter = getWriter(); + if (!childOperator.hasNext()) { + isFinished = true; + formatWriter.seal(); Review Comment: if seal() throws, isFinished is set to true on line 81 before seal() on line 82. So isFinished = true and hasData = true→ the file won't be cleaned up even though sealing failed. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/CopyToOptions.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.CopyToTsFileOptions; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.planner.RelationPlan; + +import org.apache.tsfile.utils.Accountable; + +import java.util.List; +import java.util.Set; + +/** + * Interface for COPY TO command options. + * + * <p>This interface defines the configuration for COPY TO operations, including target format, + * target table name, column mappings, and memory management. Implementations provide + * format-specific validation and inference logic. + */ +public interface CopyToOptions extends Accountable { + + /** + * Infers and validates options based on query analysis and relation plan. + * + * <p>This method is called during analysis phase to fill in default values and validate + * user-specified options against the query structure. For example, it can infer the target time + * column from the query's time column if not explicitly specified. + * + * @param analysis the query analysis containing metadata about the query + * @param queryRelationPlan the logical relation plan of the inner query + * @param columnHeaders the column headers from the query result + */ + void infer(Analysis analysis, RelationPlan queryRelationPlan, List<ColumnHeader> columnHeaders); + + /** + * Validates the options against the actual column schema. + * + * <p>This method is called after planning to ensure the specified options (e.g., target columns, + * tags) are valid for the given output schema. + * + * @param columnHeaders the column headers of the query result + * @throws SemanticException if validation fails + */ + void check(List<ColumnHeader> columnHeaders); + + /** + * Returns the response column headers for the result set. + * + * <p>These headers describe the columns that will be returned to the client after the COPY TO + * operation completes (e.g., file path, row count). + * + * @return list of column headers for the response + */ + List<ColumnHeader> getRespColumnHeaders(); + + /** + * Returns the target output format. + * + * @return the format enum value + */ + Format getFormat(); + + /** + * Estimates the maximum memory usage in bytes required for writing. + * + * <p>This is used for memory management and determining whether to flush data to disk. + * + * @return estimated maximum memory usage in bytes + */ + long estimatedMaxRamBytesInWrite(); + + /** Supported output formats for COPY TO command. */ + enum Format { + /** TsFile format output. */ + TSFILE, + } + + class Builder { + private CopyToOptions.Format format = CopyToOptions.Format.TSFILE; + private String targetTableName = null; + private String targetTimeColumn = null; + private Set<String> targetTagColumns = null; + private long memoryThreshold = 0; + + public Builder withFormat(CopyToOptions.Format format) { + this.format = format; + return this; + } + + public Builder withTargetTableName(String targetTableName) { + this.targetTableName = targetTableName; + return this; + } + + public Builder withTargetTimeColumn(String targetTimeColumn) { + this.targetTimeColumn = targetTimeColumn; + return this; + } + + public Builder withTargetTagColumns(Set<String> targetTagColumns) { + this.targetTagColumns = targetTagColumns; + return this; + } + + public Builder withMemoryThreshold(long memoryThreshold) { + if (memoryThreshold <= 0) { + throw new SemanticException("The memory threshold must be greater than 0."); + } + this.memoryThreshold = memoryThreshold; + return this; + } + + public CopyToOptions build() { + switch (format) { + case TSFILE: + default: + return new CopyToTsFileOptions( + targetTableName, + targetTimeColumn, + targetTagColumns, + memoryThreshold != 0 ? memoryThreshold : 32 * 1024 * 1024); Review Comment: keep it to `0`, means using the default value defined in tsfile writer api, no need to using `32 * 1024 * 1024` magic number ########## integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/copyto/IoTDBCopyToTsFileIT.java: ########## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent.copyto; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBCopyToTsFileIT { + + private static final String DATABASE_NAME = "test_db"; + + protected static final String[] createSqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "create table table1(tag1 string tag, tag2 string tag, s1 int32 field, s2 int32 field)", + "insert into table1(time, tag1, tag2, s1, s2) values (1, 't1_1', 't2', 1, 1)", + "insert into table1(time, tag1, tag2, s1, s2) values (2, 't1_1', 't2', 2, 2)", + "insert into table1(time, tag1, tag2, s1, s2) values (3, 't1_1', 't2', 3, 3)", + "insert into table1(time, tag1, tag2, s1, s2) values (1, 't1_2', 't2', 1, 1)", + "insert into table1(time, tag1, tag2, s1, s2) values (2, 't1_2', 't2', 2, 2)", + "insert into table1(time, tag1, tag2, s1, s2) values (3, 't1_2', 't2', 3, 3)", + "create table table2(tag1 string tag, tag2 string tag, s1 int32 field, s2 int32 field)", + "insert into table2(time, tag1, tag2, s1, s2) values (1, 't1_1', 't2', 1, 1)", + }; + + private String targetFilePath = null; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + prepareTableData(createSqls); + } + + @After + public void tearDownAfterTest() { + if (targetFilePath != null) { + try { + Files.deleteIfExists(new File(targetFilePath).toPath()); + } catch (Exception ignored) { Review Comment: While acceptable in test cleanup, this could silently mask test infrastructure issues. Consider logging a warning. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java: ########## @@ -226,6 +247,10 @@ public String getNextFolderForTsFile(int tierLevel, boolean sequence) : unSeqTiers.get(tierLevel).getNextFolder(); } + public String getNextFolderForCopyToTargetFile() throws DiskSpaceInsufficientException { Review Comment: @shuwenwei why marking it as resolved, it seems that it's not resolved yet. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/TableCopyToOperator.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.CopyToTsFileOptions; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.TsFileFormatCopyToWriter; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +public class TableCopyToOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableCopyToOperator.class); + + private final OperatorContext operatorContext; + private final Operator childOperator; + private final String targetFilePath; + private final CopyToOptions options; + private final List<ColumnHeader> innerQueryColumnHeaders; + private final int[] columnIndex2TsBlockColumnIndex; + + private IFormatCopyToWriter writer; + private File targetFile; + private boolean isFinished = false; + private boolean hasData = false; + + public TableCopyToOperator( + OperatorContext operatorContext, + Operator child, + String targetFilePath, + CopyToOptions options, + List<ColumnHeader> innerQueryColumnHeaders, + int[] columnIndex2TsBlockColumnIndex) { + this.operatorContext = operatorContext; + this.childOperator = child; + this.targetFilePath = targetFilePath; + this.options = options; + this.innerQueryColumnHeaders = innerQueryColumnHeaders; + this.columnIndex2TsBlockColumnIndex = columnIndex2TsBlockColumnIndex; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + IFormatCopyToWriter formatWriter = getWriter(); + if (!childOperator.hasNext()) { + isFinished = true; + formatWriter.seal(); Review Comment: ```suggestion formatWriter.seal(); isFinished = true; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
