THUMarkLau commented on code in PR #7621: URL: https://github.com/apache/iotdb/pull/7621#discussion_r998925243
########## server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.performer.impl; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.AlignedFastCompactionPerformerSubTask; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.NonAlignedFastCompactionPerformerSubTask; +import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator; +import org.apache.iotdb.db.engine.compaction.performer.ICrossCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; +import org.apache.iotdb.db.engine.compaction.writer.FastCrossCompactionWriter; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.tsfile.exception.write.PageException; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class FastCompactionPerformer implements ICrossCompactionPerformer { + private final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); + private List<TsFileResource> seqFiles; + + private List<TsFileResource> unseqFiles; + + private List<TsFileResource> sortedSourceFiles = new ArrayList<>(); + + private static final int subTaskNum = + IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); + + public Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new ConcurrentHashMap<>(); + + private CompactionTaskSummary summary; + + private List<TsFileResource> targetFiles; + + public Map<TsFileResource, List<Modification>> modificationCache = new ConcurrentHashMap<>(); + + public FastCompactionPerformer( + List<TsFileResource> seqFiles, + List<TsFileResource> unseqFiles, + List<TsFileResource> targetFiles) { + this.seqFiles = seqFiles; + this.unseqFiles = unseqFiles; + this.targetFiles = targetFiles; + } + + public FastCompactionPerformer() {} + + @Override + public void perform() + throws IOException, MetadataException, StorageEngineException, InterruptedException { + try (MultiTsFileDeviceIterator deviceIterator = + new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap); + FastCrossCompactionWriter compactionWriter = + new FastCrossCompactionWriter(targetFiles, seqFiles)) { + while (deviceIterator.hasNextDevice()) { + checkThreadInterrupted(); + Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice(); + String device = deviceInfo.left; + boolean isAligned = deviceInfo.right; + + // sort the resources by the start time of current device from old to new, and remove + // resource that does not contain the current device. Notice: when the level of time index + // is file, there will be a false positive judgment problem, that is, the device does not + // actually exist but the judgment return device being existed. + sortedSourceFiles.addAll(seqFiles); + sortedSourceFiles.addAll(unseqFiles); + sortedSourceFiles.removeIf(x -> !x.mayContainsDevice(device)); + sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device))); + + compactionWriter.startChunkGroup(device, isAligned); + + if (isAligned) { + compactAlignedSeries(device, deviceIterator, compactionWriter); + } else { + compactNonAlignedSeries(device, deviceIterator, compactionWriter); + } + + compactionWriter.endChunkGroup(); + // update resource of the current device and check whether to flush chunk metadata or not + compactionWriter.checkAndMayFlushChunkMetadata(); + sortedSourceFiles.clear(); + } + compactionWriter.endFile(); + CompactionUtils.updatePlanIndexes(targetFiles, seqFiles, unseqFiles); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + // readers of source files have been closed in MultiTsFileDeviceIterator + // clean cache + sortedSourceFiles = null; + readerCacheMap = null; + modificationCache = null; + } + } + + private void compactAlignedSeries( + String deviceId, + MultiTsFileDeviceIterator deviceIterator, + FastCrossCompactionWriter fastCrossCompactionWriter) + throws PageException, IOException, WriteProcessException, IllegalPathException { + // measurement -> tsfile resource -> timeseries metadata <startOffset, endOffset> + Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap = + new HashMap<>(); + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); + + // Get all value measurements and their schemas of the current device. Also get start offset and + // end offset of each timeseries metadata, in order to facilitate the reading of chunkMetadata + // directly by this offset later. Instead of deserializing chunk metadata later, we need to + // deserialize chunk metadata here to get the schemas of all value measurements, because we + // should get schemas of all value measurement to startMeasruement() and compaction process is + // to read a batch of overlapped files each time, and we cannot make sure if the first batch of + // overlapped tsfiles contain all the value measurements. + for (Map.Entry<String, Pair<MeasurementSchema, Map<TsFileResource, Pair<Long, Long>>>> entry : + deviceIterator.getTimeseriesSchemaAndMetadataOffsetOfCurrentDevice().entrySet()) { + if (!entry.getKey().equals("")) { Review Comment: Do use a string here, put it into IoTDBConstant and use a constant. ########## server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.performer.impl; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.AlignedFastCompactionPerformerSubTask; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.NonAlignedFastCompactionPerformerSubTask; +import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator; +import org.apache.iotdb.db.engine.compaction.performer.ICrossCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; +import org.apache.iotdb.db.engine.compaction.writer.FastCrossCompactionWriter; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.tsfile.exception.write.PageException; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class FastCompactionPerformer implements ICrossCompactionPerformer { + private final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); + private List<TsFileResource> seqFiles; + + private List<TsFileResource> unseqFiles; + + private List<TsFileResource> sortedSourceFiles = new ArrayList<>(); + + private static final int subTaskNum = + IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); + + public Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new ConcurrentHashMap<>(); + + private CompactionTaskSummary summary; + + private List<TsFileResource> targetFiles; + + public Map<TsFileResource, List<Modification>> modificationCache = new ConcurrentHashMap<>(); + + public FastCompactionPerformer( + List<TsFileResource> seqFiles, + List<TsFileResource> unseqFiles, + List<TsFileResource> targetFiles) { + this.seqFiles = seqFiles; + this.unseqFiles = unseqFiles; + this.targetFiles = targetFiles; + } + + public FastCompactionPerformer() {} + + @Override + public void perform() + throws IOException, MetadataException, StorageEngineException, InterruptedException { + try (MultiTsFileDeviceIterator deviceIterator = + new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap); + FastCrossCompactionWriter compactionWriter = + new FastCrossCompactionWriter(targetFiles, seqFiles)) { + while (deviceIterator.hasNextDevice()) { + checkThreadInterrupted(); + Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice(); + String device = deviceInfo.left; + boolean isAligned = deviceInfo.right; + + // sort the resources by the start time of current device from old to new, and remove + // resource that does not contain the current device. Notice: when the level of time index + // is file, there will be a false positive judgment problem, that is, the device does not + // actually exist but the judgment return device being existed. + sortedSourceFiles.addAll(seqFiles); + sortedSourceFiles.addAll(unseqFiles); + sortedSourceFiles.removeIf(x -> !x.mayContainsDevice(device)); + sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device))); + + compactionWriter.startChunkGroup(device, isAligned); + + if (isAligned) { + compactAlignedSeries(device, deviceIterator, compactionWriter); + } else { + compactNonAlignedSeries(device, deviceIterator, compactionWriter); + } + + compactionWriter.endChunkGroup(); + // update resource of the current device and check whether to flush chunk metadata or not + compactionWriter.checkAndMayFlushChunkMetadata(); + sortedSourceFiles.clear(); + } + compactionWriter.endFile(); + CompactionUtils.updatePlanIndexes(targetFiles, seqFiles, unseqFiles); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + // readers of source files have been closed in MultiTsFileDeviceIterator + // clean cache + sortedSourceFiles = null; + readerCacheMap = null; + modificationCache = null; + } + } + + private void compactAlignedSeries( + String deviceId, + MultiTsFileDeviceIterator deviceIterator, + FastCrossCompactionWriter fastCrossCompactionWriter) + throws PageException, IOException, WriteProcessException, IllegalPathException { + // measurement -> tsfile resource -> timeseries metadata <startOffset, endOffset> + Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap = + new HashMap<>(); + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); + + // Get all value measurements and their schemas of the current device. Also get start offset and + // end offset of each timeseries metadata, in order to facilitate the reading of chunkMetadata + // directly by this offset later. Instead of deserializing chunk metadata later, we need to + // deserialize chunk metadata here to get the schemas of all value measurements, because we + // should get schemas of all value measurement to startMeasruement() and compaction process is + // to read a batch of overlapped files each time, and we cannot make sure if the first batch of + // overlapped tsfiles contain all the value measurements. + for (Map.Entry<String, Pair<MeasurementSchema, Map<TsFileResource, Pair<Long, Long>>>> entry : + deviceIterator.getTimeseriesSchemaAndMetadataOffsetOfCurrentDevice().entrySet()) { + if (!entry.getKey().equals("")) { + measurementSchemas.add(entry.getValue().left); + } + timeseriesMetadataOffsetMap.put(entry.getKey(), entry.getValue().right); + } + + new AlignedFastCompactionPerformerSubTask( + fastCrossCompactionWriter, + timeseriesMetadataOffsetMap, + measurementSchemas, + readerCacheMap, + modificationCache, + sortedSourceFiles, + deviceId, + 0) + .call(); + } + + private void compactNonAlignedSeries( + String deviceID, + MultiTsFileDeviceIterator deviceIterator, + FastCrossCompactionWriter fastCrossCompactionWriter) + throws IOException, InterruptedException { + // measurement -> tsfile resource -> timeseries metadata <startOffset, endOffset> + // Get all measurements of the current device. Also get start offset and end offset of each + // timeseries metadata, in order to facilitate the reading of chunkMetadata directly by this + // offset later. Here we don't need to deserialize chunk metadata, we can deserialize them and + // get their schema later. + Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap = + deviceIterator.getTimeseriesMetadataOffsetOfCurrentDevice(); + + List<String> allMeasurements = new ArrayList<>(timeseriesMetadataOffsetMap.keySet()); + + int subTaskNums = Math.min(allMeasurements.size(), subTaskNum); + + // assign all measurements to different sub tasks + List<String>[] measurementsForEachSubTask = new ArrayList[subTaskNums]; Review Comment: Since the list is only used to traverse sequentially, LinkedList may be better than ArrayList. ########## server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.reader; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.FastCompactionPerformerSubTask; +import org.apache.iotdb.db.engine.compaction.cross.utils.PageElement; +import org.apache.iotdb.db.engine.compaction.cross.utils.PointElement; +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; + +/** + * This reader is used to deduplicate and organize overlapping pages, and read out points in order. + * It is used for compaction. + */ +public class PointPriorityReader { + private long lastTime; + + private final PriorityQueue<PointElement> pointQueue; + + private final FastCompactionPerformerSubTask.RemovePage removePage; + + private Pair<Long, Object> currentPoint; + + private boolean isNewPoint = true; + + private List<PageElement> newOverlappedPages; Review Comment: The usage and update of this variable is outside this class, it should be moved outside. ########## server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractInnerCompactionWriter.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.writer; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.rescon.SystemInfo; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import java.io.IOException; +import java.util.List; + +public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWriter { + protected TsFileIOWriter fileWriter; + + protected boolean isEmptyFile; + + protected TsFileResource targetResource; + + public AbstractInnerCompactionWriter(TsFileResource targetFileResource) throws IOException { + long sizeForFileWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()); + this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter); + this.targetResource = targetFileResource; + isEmptyFile = true; + } + + @Override + public void startChunkGroup(String deviceId, boolean isAlign) throws IOException { + fileWriter.startChunkGroup(deviceId); + this.isAlign = isAlign; + this.deviceId = deviceId; + } + + @Override + public void endChunkGroup() throws IOException { + fileWriter.endChunkGroup(); + } + + @Override + public void endMeasurement(int subTaskId) throws IOException { + flushChunkToFileWriter(fileWriter, chunkWriters[subTaskId]); + } + + @Override + public abstract void write(long timestamp, Object value, int subTaskId) throws IOException; + + @Override + public abstract void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize) + throws IOException; + + @Override + public void endFile() throws IOException { + fileWriter.endFile(); + if (isEmptyFile) { + fileWriter.getFile().delete(); + } + } + + @Override + public void close() throws Exception { + if (fileWriter != null && fileWriter.canWrite()) { + fileWriter.close(); + } + fileWriter = null; + } + + @Override + public void checkAndMayFlushChunkMetadata() throws IOException { + // Before flushing chunk metadatas, we use chunk metadatas in tsfile io writer to update start + // time and end time in resource. + List<TimeseriesMetadata> timeseriesMetadatasOfCurrentDevice = + fileWriter.getDeviceTimeseriesMetadataMap().get(deviceId); Review Comment: Using this function is not a good idea, it may read all the chunk metadata from .meta file, which is costly. ########## server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/NonAlignedFastCompactionPerformerSubTask.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.cross.rewrite.task; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.engine.compaction.cross.utils.ChunkMetadataElement; +import org.apache.iotdb.db.engine.compaction.cross.utils.FileElement; +import org.apache.iotdb.db.engine.compaction.cross.utils.PageElement; +import org.apache.iotdb.db.engine.compaction.writer.FastCrossCompactionWriter; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.db.utils.QueryUtils; +import org.apache.iotdb.tsfile.exception.write.PageException; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class NonAlignedFastCompactionPerformerSubTask extends FastCompactionPerformerSubTask { + // measurements of the current device to be compacted, which is assigned to the current sub thread + private final List<String> measurements; + + private String currentMeasurement; + + boolean hasStartMeasurement = false; + + public NonAlignedFastCompactionPerformerSubTask( + FastCrossCompactionWriter compactionWriter, + Map<String, Map<TsFileResource, Pair<Long, Long>>> timeseriesMetadataOffsetMap, + List<String> measurements, + Map<TsFileResource, TsFileSequenceReader> readerCacheMap, + Map<TsFileResource, List<Modification>> modificationCacheMap, + List<TsFileResource> sortedSourceFiles, + String deviceId, + int subTaskId) { + super( + compactionWriter, + timeseriesMetadataOffsetMap, + readerCacheMap, + modificationCacheMap, + sortedSourceFiles, + deviceId, + false, + subTaskId); + this.measurements = measurements; + } + + @Override + public Void call() + throws IOException, PageException, WriteProcessException, IllegalPathException { + for (String measurement : measurements) { Review Comment: SubTask is responsible for multi-series compaction, but some member variable in this class is used to execute single series compaction. I think the responsibilities of this class are a bit confusing, so we can extract the parts that perform a single series compaction into a class and put the associated variables into that class. ########## server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.cross.utils; + +import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.io.IOException; + +public class PointElement { + public long timestamp; + public int priority; + public Pair<Long, Object> timeValuePair; Review Comment: Use TimeValuePair instead of Pair ########## tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java: ########## @@ -199,6 +209,63 @@ private PageReader constructPageReaderForNextPage(PageHeader pageHeader) throws return reader; } + /** + * Read page data without uncompressing it. + * + * @return compressed page data + */ + public ByteBuffer readPageDataWithoutUncompressing(PageHeader pageHeader) throws IOException { + int compressedPageBodyLength = pageHeader.getCompressedSize(); + byte[] compressedPageBody = new byte[compressedPageBodyLength]; + + // doesn't has a complete page body + if (compressedPageBodyLength > chunkDataBuffer.remaining()) { + throw new IOException( + "do not has a complete page body. Expected:" + + compressedPageBodyLength + + ". Actual:" + + chunkDataBuffer.remaining()); + } + + chunkDataBuffer.get(compressedPageBody); + return ByteBuffer.wrap(compressedPageBody); + } + + /** + * Read data from compressed page data. Uncompress the page and decode it to batch data. + * + * @param compressedPageData Compressed page data + */ + public TsBlock readPageData(PageHeader pageHeader, ByteBuffer compressedPageData) + throws IOException { + // uncompress page data Review Comment: Line 241-258 is duplicated with AlignedChunkReader::uncompressPageData ########## server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.reader; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.FastCompactionPerformerSubTask; +import org.apache.iotdb.db.engine.compaction.cross.utils.PageElement; +import org.apache.iotdb.db.engine.compaction.cross.utils.PointElement; +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; + +/** + * This reader is used to deduplicate and organize overlapping pages, and read out points in order. + * It is used for compaction. + */ +public class PointPriorityReader { + private long lastTime; + + private final PriorityQueue<PointElement> pointQueue; + + private final FastCompactionPerformerSubTask.RemovePage removePage; + + private Pair<Long, Object> currentPoint; + + private boolean isNewPoint = true; Review Comment: ```suggestion private boolean shouldReadNextPoint = true; ``` ########## server/src/main/java/org/apache/iotdb/db/tools/validate/TsFileValidationTool.java: ########## @@ -72,7 +72,7 @@ */ public class TsFileValidationTool { // print detail type of overlap or not - private static boolean printDetails = false; + private static boolean printDetails = true; Review Comment: set to false ########## server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.writer; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.rescon.SystemInfo; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWriter { + + // target fileIOWriters + protected List<TsFileIOWriter> targetFileWriters = new ArrayList<>(); + + // source tsfiles + private List<TsFileResource> seqTsFileResources; + + // Each sub task has its corresponding seq file index. + // The index of the array corresponds to subTaskId. + protected int[] seqFileIndexArray = new int[subTaskNum]; + + // device end time in each source seq file + protected final long[] currentDeviceEndTime; + + // whether each target file is empty or not + protected final boolean[] isEmptyFile; + + // whether each target file has device data or not + protected final boolean[] isDeviceExistedInTargetFiles; + + // current chunk group header size + private int chunkGroupHeaderSize; + + protected List<TsFileResource> targetResources; + + public AbstractCrossCompactionWriter( + List<TsFileResource> targetResources, List<TsFileResource> seqFileResources) + throws IOException { + currentDeviceEndTime = new long[seqFileResources.size()]; + isEmptyFile = new boolean[seqFileResources.size()]; + isDeviceExistedInTargetFiles = new boolean[targetResources.size()]; + long memorySizeForEachWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion() + / targetResources.size()); + for (int i = 0; i < targetResources.size(); i++) { + this.targetFileWriters.add( + new TsFileIOWriter(targetResources.get(i).getTsFile(), true, memorySizeForEachWriter)); + isEmptyFile[i] = true; + } + this.seqTsFileResources = seqFileResources; + this.targetResources = targetResources; + } + + @Override + public void startChunkGroup(String deviceId, boolean isAlign) throws IOException { + this.deviceId = deviceId; + this.isAlign = isAlign; + this.seqFileIndexArray = new int[subTaskNum]; + checkIsDeviceExistAndGetDeviceEndTime(); + for (int i = 0; i < targetFileWriters.size(); i++) { + chunkGroupHeaderSize = targetFileWriters.get(i).startChunkGroup(deviceId); + } + } + + @Override + public void endChunkGroup() throws IOException { + for (int i = 0; i < seqTsFileResources.size(); i++) { + TsFileIOWriter targetFileWriter = targetFileWriters.get(i); + if (isDeviceExistedInTargetFiles[i]) { + targetFileWriter.endChunkGroup(); + } else { + targetFileWriter.truncate(targetFileWriter.getPos() - chunkGroupHeaderSize); + } + isDeviceExistedInTargetFiles[i] = false; + } + seqFileIndexArray = null; + } + + @Override + public void endMeasurement(int subTaskId) throws IOException { + flushChunkToFileWriter( + targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriters[subTaskId]); + seqFileIndexArray[subTaskId] = 0; + } + + @Override + public void write(long timestamp, Object value, int subTaskId) throws IOException { + checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId); + int fileIndex = seqFileIndexArray[subTaskId]; + writeDataPoint(timestamp, value, chunkWriters[subTaskId]); + chunkPointNumArray[subTaskId]++; + checkChunkSizeAndMayOpenANewChunk( + targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, true); + isDeviceExistedInTargetFiles[fileIndex] = true; + isEmptyFile[fileIndex] = false; + } + + /** Write data in batch, only used for aligned device. */ + @Override + public abstract void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize) + throws IOException; + + @Override + public void endFile() throws IOException { + for (int i = 0; i < isEmptyFile.length; i++) { + targetFileWriters.get(i).endFile(); + // delete empty target file + if (isEmptyFile[i]) { + targetFileWriters.get(i).getFile().delete(); + } + } + } + + @Override + public void close() throws IOException { + for (TsFileIOWriter targetWriter : targetFileWriters) { + if (targetWriter != null && targetWriter.canWrite()) { + targetWriter.close(); + } + } + targetFileWriters = null; + seqTsFileResources = null; + } + + @Override + public void checkAndMayFlushChunkMetadata() throws IOException { + for (int i = 0; i < targetFileWriters.size(); i++) { + TsFileIOWriter fileIOWriter = targetFileWriters.get(i); + TsFileResource resource = targetResources.get(i); + // Before flushing chunk metadatas, we use chunk metadatas in tsfile io writer to update start + // time and end time in resource. + List<TimeseriesMetadata> timeseriesMetadatasOfCurrentDevice = + fileIOWriter.getDeviceTimeseriesMetadataMap().get(deviceId); Review Comment: Calling this function is costly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
