choubenson commented on code in PR #7621: URL: https://github.com/apache/iotdb/pull/7621#discussion_r1005156599
########## server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.writer; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.rescon.SystemInfo; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWriter { + + // target fileIOWriters + protected List<TsFileIOWriter> targetFileWriters = new ArrayList<>(); + + // source tsfiles + private List<TsFileResource> seqTsFileResources; + + // Each sub task has its corresponding seq file index. + // The index of the array corresponds to subTaskId. + protected int[] seqFileIndexArray = new int[subTaskNum]; + + // device end time in each source seq file + protected final long[] currentDeviceEndTime; + + // whether each target file is empty or not + protected final boolean[] isEmptyFile; + + // whether each target file has device data or not + protected final boolean[] isDeviceExistedInTargetFiles; + + // current chunk group header size + private int chunkGroupHeaderSize; + + protected List<TsFileResource> targetResources; + + public AbstractCrossCompactionWriter( + List<TsFileResource> targetResources, List<TsFileResource> seqFileResources) + throws IOException { + currentDeviceEndTime = new long[seqFileResources.size()]; + isEmptyFile = new boolean[seqFileResources.size()]; + isDeviceExistedInTargetFiles = new boolean[targetResources.size()]; + long memorySizeForEachWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion() + / targetResources.size()); + for (int i = 0; i < targetResources.size(); i++) { + this.targetFileWriters.add( + new TsFileIOWriter(targetResources.get(i).getTsFile(), true, memorySizeForEachWriter)); + isEmptyFile[i] = true; + } + this.seqTsFileResources = seqFileResources; + this.targetResources = targetResources; + } + + @Override + public void startChunkGroup(String deviceId, boolean isAlign) throws IOException { + this.deviceId = deviceId; + this.isAlign = isAlign; + this.seqFileIndexArray = new int[subTaskNum]; + checkIsDeviceExistAndGetDeviceEndTime(); + for (int i = 0; i < targetFileWriters.size(); i++) { + chunkGroupHeaderSize = targetFileWriters.get(i).startChunkGroup(deviceId); + } + } + + @Override + public void endChunkGroup() throws IOException { + for (int i = 0; i < seqTsFileResources.size(); i++) { + TsFileIOWriter targetFileWriter = targetFileWriters.get(i); + if (isDeviceExistedInTargetFiles[i]) { + targetFileWriter.endChunkGroup(); + } else { + targetFileWriter.truncate(targetFileWriter.getPos() - chunkGroupHeaderSize); + } + isDeviceExistedInTargetFiles[i] = false; + } + seqFileIndexArray = null; + } + + @Override + public void endMeasurement(int subTaskId) throws IOException { + flushChunkToFileWriter( + targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriters[subTaskId]); + seqFileIndexArray[subTaskId] = 0; + } + + @Override + public void write(long timestamp, Object value, int subTaskId) throws IOException { + checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId); + int fileIndex = seqFileIndexArray[subTaskId]; + writeDataPoint(timestamp, value, chunkWriters[subTaskId]); + chunkPointNumArray[subTaskId]++; + checkChunkSizeAndMayOpenANewChunk( + targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, true); + isDeviceExistedInTargetFiles[fileIndex] = true; + isEmptyFile[fileIndex] = false; + } + + /** Write data in batch, only used for aligned device. */ + @Override + public abstract void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize) + throws IOException; + + @Override + public void endFile() throws IOException { + for (int i = 0; i < isEmptyFile.length; i++) { + targetFileWriters.get(i).endFile(); + // delete empty target file + if (isEmptyFile[i]) { + targetFileWriters.get(i).getFile().delete(); + } + } + } + + @Override + public void close() throws IOException { + for (TsFileIOWriter targetWriter : targetFileWriters) { + if (targetWriter != null && targetWriter.canWrite()) { + targetWriter.close(); + } + } + targetFileWriters = null; + seqTsFileResources = null; + } + + @Override + public void checkAndMayFlushChunkMetadata() throws IOException { + for (int i = 0; i < targetFileWriters.size(); i++) { + TsFileIOWriter fileIOWriter = targetFileWriters.get(i); + TsFileResource resource = targetResources.get(i); + // Before flushing chunk metadatas, we use chunk metadatas in tsfile io writer to update start + // time and end time in resource. + List<TimeseriesMetadata> timeseriesMetadatasOfCurrentDevice = + fileIOWriter.getDeviceTimeseriesMetadataMap().get(deviceId); Review Comment: Resolved. I have added a new method named `getChunkMetadatasOfDeviceInMemory()` in `TsFileIOWriter`. ########## server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractInnerCompactionWriter.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.writer; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.rescon.SystemInfo; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import java.io.IOException; +import java.util.List; + +public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWriter { + protected TsFileIOWriter fileWriter; + + protected boolean isEmptyFile; + + protected TsFileResource targetResource; + + public AbstractInnerCompactionWriter(TsFileResource targetFileResource) throws IOException { + long sizeForFileWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()); + this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter); + this.targetResource = targetFileResource; + isEmptyFile = true; + } + + @Override + public void startChunkGroup(String deviceId, boolean isAlign) throws IOException { + fileWriter.startChunkGroup(deviceId); + this.isAlign = isAlign; + this.deviceId = deviceId; + } + + @Override + public void endChunkGroup() throws IOException { + fileWriter.endChunkGroup(); + } + + @Override + public void endMeasurement(int subTaskId) throws IOException { + flushChunkToFileWriter(fileWriter, chunkWriters[subTaskId]); + } + + @Override + public abstract void write(long timestamp, Object value, int subTaskId) throws IOException; + + @Override + public abstract void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize) + throws IOException; + + @Override + public void endFile() throws IOException { + fileWriter.endFile(); + if (isEmptyFile) { + fileWriter.getFile().delete(); + } + } + + @Override + public void close() throws Exception { + if (fileWriter != null && fileWriter.canWrite()) { + fileWriter.close(); + } + fileWriter = null; + } + + @Override + public void checkAndMayFlushChunkMetadata() throws IOException { + // Before flushing chunk metadatas, we use chunk metadatas in tsfile io writer to update start + // time and end time in resource. + List<TimeseriesMetadata> timeseriesMetadatasOfCurrentDevice = + fileWriter.getDeviceTimeseriesMetadataMap().get(deviceId); Review Comment: Resolved. I have added a new method named `getChunkMetadatasOfDeviceInMemory()` in `TsFileIOWriter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
