THUMarkLau commented on code in PR #7621:
URL: https://github.com/apache/iotdb/pull/7621#discussion_r998925243


##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.performer.impl;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.AlignedFastCompactionPerformerSubTask;
+import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.NonAlignedFastCompactionPerformerSubTask;
+import 
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
+import 
org.apache.iotdb.db.engine.compaction.performer.ICrossCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.writer.FastCrossCompactionWriter;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class FastCompactionPerformer implements ICrossCompactionPerformer {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private List<TsFileResource> seqFiles;
+
+  private List<TsFileResource> unseqFiles;
+
+  private List<TsFileResource> sortedSourceFiles = new ArrayList<>();
+
+  private static final int subTaskNum =
+      IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
+
+  public Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new 
ConcurrentHashMap<>();
+
+  private CompactionTaskSummary summary;
+
+  private List<TsFileResource> targetFiles;
+
+  public Map<TsFileResource, List<Modification>> modificationCache = new 
ConcurrentHashMap<>();
+
+  public FastCompactionPerformer(
+      List<TsFileResource> seqFiles,
+      List<TsFileResource> unseqFiles,
+      List<TsFileResource> targetFiles) {
+    this.seqFiles = seqFiles;
+    this.unseqFiles = unseqFiles;
+    this.targetFiles = targetFiles;
+  }
+
+  public FastCompactionPerformer() {}
+
+  @Override
+  public void perform()
+      throws IOException, MetadataException, StorageEngineException, 
InterruptedException {
+    try (MultiTsFileDeviceIterator deviceIterator =
+            new MultiTsFileDeviceIterator(seqFiles, unseqFiles, 
readerCacheMap);
+        FastCrossCompactionWriter compactionWriter =
+            new FastCrossCompactionWriter(targetFiles, seqFiles)) {
+      while (deviceIterator.hasNextDevice()) {
+        checkThreadInterrupted();
+        Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
+        String device = deviceInfo.left;
+        boolean isAligned = deviceInfo.right;
+
+        // sort the resources by the start time of current device from old to 
new, and remove
+        // resource that does not contain the current device. Notice: when the 
level of time index
+        // is file, there will be a false positive judgment problem, that is, 
the device does not
+        // actually exist but the judgment return device being existed.
+        sortedSourceFiles.addAll(seqFiles);
+        sortedSourceFiles.addAll(unseqFiles);
+        sortedSourceFiles.removeIf(x -> !x.mayContainsDevice(device));
+        sortedSourceFiles.sort(Comparator.comparingLong(x -> 
x.getStartTime(device)));
+
+        compactionWriter.startChunkGroup(device, isAligned);
+
+        if (isAligned) {
+          compactAlignedSeries(device, deviceIterator, compactionWriter);
+        } else {
+          compactNonAlignedSeries(device, deviceIterator, compactionWriter);
+        }
+
+        compactionWriter.endChunkGroup();
+        // update resource of the current device and check whether to flush 
chunk metadata or not
+        compactionWriter.checkAndMayFlushChunkMetadata();
+        sortedSourceFiles.clear();
+      }
+      compactionWriter.endFile();
+      CompactionUtils.updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      // readers of source files have been closed in MultiTsFileDeviceIterator
+      // clean cache
+      sortedSourceFiles = null;
+      readerCacheMap = null;
+      modificationCache = null;
+    }
+  }
+
+  private void compactAlignedSeries(
+      String deviceId,
+      MultiTsFileDeviceIterator deviceIterator,
+      FastCrossCompactionWriter fastCrossCompactionWriter)
+      throws PageException, IOException, WriteProcessException, 
IllegalPathException {
+    // measurement -> tsfile resource -> timeseries metadata <startOffset, 
endOffset>
+    Map<String, Map<TsFileResource, Pair<Long, Long>>> 
timeseriesMetadataOffsetMap =
+        new HashMap<>();
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+
+    // Get all value measurements and their schemas of the current device. 
Also get start offset and
+    // end offset of each timeseries metadata, in order to facilitate the 
reading of chunkMetadata
+    // directly by this offset later. Instead of deserializing chunk metadata 
later, we need to
+    // deserialize chunk metadata here to get the schemas of all value 
measurements, because we
+    // should get schemas of all value measurement to startMeasruement() and 
compaction process is
+    // to read a batch of overlapped files each time, and we cannot make sure 
if the first batch of
+    // overlapped tsfiles contain all the value measurements.
+    for (Map.Entry<String, Pair<MeasurementSchema, Map<TsFileResource, 
Pair<Long, Long>>>> entry :
+        
deviceIterator.getTimeseriesSchemaAndMetadataOffsetOfCurrentDevice().entrySet())
 {
+      if (!entry.getKey().equals("")) {

Review Comment:
   Do use a string here, put it into IoTDBConstant and use a constant.



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/FastCompactionPerformer.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.performer.impl;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.AlignedFastCompactionPerformerSubTask;
+import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.NonAlignedFastCompactionPerformerSubTask;
+import 
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
+import 
org.apache.iotdb.db.engine.compaction.performer.ICrossCompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.writer.FastCrossCompactionWriter;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class FastCompactionPerformer implements ICrossCompactionPerformer {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private List<TsFileResource> seqFiles;
+
+  private List<TsFileResource> unseqFiles;
+
+  private List<TsFileResource> sortedSourceFiles = new ArrayList<>();
+
+  private static final int subTaskNum =
+      IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
+
+  public Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new 
ConcurrentHashMap<>();
+
+  private CompactionTaskSummary summary;
+
+  private List<TsFileResource> targetFiles;
+
+  public Map<TsFileResource, List<Modification>> modificationCache = new 
ConcurrentHashMap<>();
+
+  public FastCompactionPerformer(
+      List<TsFileResource> seqFiles,
+      List<TsFileResource> unseqFiles,
+      List<TsFileResource> targetFiles) {
+    this.seqFiles = seqFiles;
+    this.unseqFiles = unseqFiles;
+    this.targetFiles = targetFiles;
+  }
+
+  public FastCompactionPerformer() {}
+
+  @Override
+  public void perform()
+      throws IOException, MetadataException, StorageEngineException, 
InterruptedException {
+    try (MultiTsFileDeviceIterator deviceIterator =
+            new MultiTsFileDeviceIterator(seqFiles, unseqFiles, 
readerCacheMap);
+        FastCrossCompactionWriter compactionWriter =
+            new FastCrossCompactionWriter(targetFiles, seqFiles)) {
+      while (deviceIterator.hasNextDevice()) {
+        checkThreadInterrupted();
+        Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
+        String device = deviceInfo.left;
+        boolean isAligned = deviceInfo.right;
+
+        // sort the resources by the start time of current device from old to 
new, and remove
+        // resource that does not contain the current device. Notice: when the 
level of time index
+        // is file, there will be a false positive judgment problem, that is, 
the device does not
+        // actually exist but the judgment return device being existed.
+        sortedSourceFiles.addAll(seqFiles);
+        sortedSourceFiles.addAll(unseqFiles);
+        sortedSourceFiles.removeIf(x -> !x.mayContainsDevice(device));
+        sortedSourceFiles.sort(Comparator.comparingLong(x -> 
x.getStartTime(device)));
+
+        compactionWriter.startChunkGroup(device, isAligned);
+
+        if (isAligned) {
+          compactAlignedSeries(device, deviceIterator, compactionWriter);
+        } else {
+          compactNonAlignedSeries(device, deviceIterator, compactionWriter);
+        }
+
+        compactionWriter.endChunkGroup();
+        // update resource of the current device and check whether to flush 
chunk metadata or not
+        compactionWriter.checkAndMayFlushChunkMetadata();
+        sortedSourceFiles.clear();
+      }
+      compactionWriter.endFile();
+      CompactionUtils.updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      // readers of source files have been closed in MultiTsFileDeviceIterator
+      // clean cache
+      sortedSourceFiles = null;
+      readerCacheMap = null;
+      modificationCache = null;
+    }
+  }
+
+  private void compactAlignedSeries(
+      String deviceId,
+      MultiTsFileDeviceIterator deviceIterator,
+      FastCrossCompactionWriter fastCrossCompactionWriter)
+      throws PageException, IOException, WriteProcessException, 
IllegalPathException {
+    // measurement -> tsfile resource -> timeseries metadata <startOffset, 
endOffset>
+    Map<String, Map<TsFileResource, Pair<Long, Long>>> 
timeseriesMetadataOffsetMap =
+        new HashMap<>();
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+
+    // Get all value measurements and their schemas of the current device. 
Also get start offset and
+    // end offset of each timeseries metadata, in order to facilitate the 
reading of chunkMetadata
+    // directly by this offset later. Instead of deserializing chunk metadata 
later, we need to
+    // deserialize chunk metadata here to get the schemas of all value 
measurements, because we
+    // should get schemas of all value measurement to startMeasruement() and 
compaction process is
+    // to read a batch of overlapped files each time, and we cannot make sure 
if the first batch of
+    // overlapped tsfiles contain all the value measurements.
+    for (Map.Entry<String, Pair<MeasurementSchema, Map<TsFileResource, 
Pair<Long, Long>>>> entry :
+        
deviceIterator.getTimeseriesSchemaAndMetadataOffsetOfCurrentDevice().entrySet())
 {
+      if (!entry.getKey().equals("")) {
+        measurementSchemas.add(entry.getValue().left);
+      }
+      timeseriesMetadataOffsetMap.put(entry.getKey(), entry.getValue().right);
+    }
+
+    new AlignedFastCompactionPerformerSubTask(
+            fastCrossCompactionWriter,
+            timeseriesMetadataOffsetMap,
+            measurementSchemas,
+            readerCacheMap,
+            modificationCache,
+            sortedSourceFiles,
+            deviceId,
+            0)
+        .call();
+  }
+
+  private void compactNonAlignedSeries(
+      String deviceID,
+      MultiTsFileDeviceIterator deviceIterator,
+      FastCrossCompactionWriter fastCrossCompactionWriter)
+      throws IOException, InterruptedException {
+    // measurement -> tsfile resource -> timeseries metadata <startOffset, 
endOffset>
+    // Get all measurements of the current device. Also get start offset and 
end offset of each
+    // timeseries metadata, in order to facilitate the reading of 
chunkMetadata directly by this
+    // offset later. Here we don't need to deserialize chunk metadata, we can 
deserialize them and
+    // get their schema later.
+    Map<String, Map<TsFileResource, Pair<Long, Long>>> 
timeseriesMetadataOffsetMap =
+        deviceIterator.getTimeseriesMetadataOffsetOfCurrentDevice();
+
+    List<String> allMeasurements = new 
ArrayList<>(timeseriesMetadataOffsetMap.keySet());
+
+    int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+
+    // assign all measurements to different sub tasks
+    List<String>[] measurementsForEachSubTask = new ArrayList[subTaskNums];

Review Comment:
   Since the list is only used to traverse sequentially, LinkedList may be 
better than ArrayList.



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.reader;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.FastCompactionPerformerSubTask;
+import org.apache.iotdb.db.engine.compaction.cross.utils.PageElement;
+import org.apache.iotdb.db.engine.compaction.cross.utils.PointElement;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * This reader is used to deduplicate and organize overlapping pages, and read 
out points in order.
+ * It is used for compaction.
+ */
+public class PointPriorityReader {
+  private long lastTime;
+
+  private final PriorityQueue<PointElement> pointQueue;
+
+  private final FastCompactionPerformerSubTask.RemovePage removePage;
+
+  private Pair<Long, Object> currentPoint;
+
+  private boolean isNewPoint = true;
+
+  private List<PageElement> newOverlappedPages;

Review Comment:
   The usage and update of this variable is outside this class, it should be 
moved outside.



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractInnerCompactionWriter.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.writer;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWriter {
+  protected TsFileIOWriter fileWriter;
+
+  protected boolean isEmptyFile;
+
+  protected TsFileResource targetResource;
+
+  public AbstractInnerCompactionWriter(TsFileResource targetFileResource) 
throws IOException {
+    long sizeForFileWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / 
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
+    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, 
sizeForFileWriter);
+    this.targetResource = targetFileResource;
+    isEmptyFile = true;
+  }
+
+  @Override
+  public void startChunkGroup(String deviceId, boolean isAlign) throws 
IOException {
+    fileWriter.startChunkGroup(deviceId);
+    this.isAlign = isAlign;
+    this.deviceId = deviceId;
+  }
+
+  @Override
+  public void endChunkGroup() throws IOException {
+    fileWriter.endChunkGroup();
+  }
+
+  @Override
+  public void endMeasurement(int subTaskId) throws IOException {
+    flushChunkToFileWriter(fileWriter, chunkWriters[subTaskId]);
+  }
+
+  @Override
+  public abstract void write(long timestamp, Object value, int subTaskId) 
throws IOException;
+
+  @Override
+  public abstract void write(TimeColumn timestamps, Column[] columns, int 
subTaskId, int batchSize)
+      throws IOException;
+
+  @Override
+  public void endFile() throws IOException {
+    fileWriter.endFile();
+    if (isEmptyFile) {
+      fileWriter.getFile().delete();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (fileWriter != null && fileWriter.canWrite()) {
+      fileWriter.close();
+    }
+    fileWriter = null;
+  }
+
+  @Override
+  public void checkAndMayFlushChunkMetadata() throws IOException {
+    // Before flushing chunk metadatas, we use chunk metadatas in tsfile io 
writer to update start
+    // time and end time in resource.
+    List<TimeseriesMetadata> timeseriesMetadatasOfCurrentDevice =
+        fileWriter.getDeviceTimeseriesMetadataMap().get(deviceId);

Review Comment:
   Using this function is not a good idea, it may read all the chunk metadata 
from .meta file, which is costly.



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/NonAlignedFastCompactionPerformerSubTask.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.cross.utils.ChunkMetadataElement;
+import org.apache.iotdb.db.engine.compaction.cross.utils.FileElement;
+import org.apache.iotdb.db.engine.compaction.cross.utils.PageElement;
+import org.apache.iotdb.db.engine.compaction.writer.FastCrossCompactionWriter;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class NonAlignedFastCompactionPerformerSubTask extends 
FastCompactionPerformerSubTask {
+  // measurements of the current device to be compacted, which is assigned to 
the current sub thread
+  private final List<String> measurements;
+
+  private String currentMeasurement;
+
+  boolean hasStartMeasurement = false;
+
+  public NonAlignedFastCompactionPerformerSubTask(
+      FastCrossCompactionWriter compactionWriter,
+      Map<String, Map<TsFileResource, Pair<Long, Long>>> 
timeseriesMetadataOffsetMap,
+      List<String> measurements,
+      Map<TsFileResource, TsFileSequenceReader> readerCacheMap,
+      Map<TsFileResource, List<Modification>> modificationCacheMap,
+      List<TsFileResource> sortedSourceFiles,
+      String deviceId,
+      int subTaskId) {
+    super(
+        compactionWriter,
+        timeseriesMetadataOffsetMap,
+        readerCacheMap,
+        modificationCacheMap,
+        sortedSourceFiles,
+        deviceId,
+        false,
+        subTaskId);
+    this.measurements = measurements;
+  }
+
+  @Override
+  public Void call()
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    for (String measurement : measurements) {

Review Comment:
   SubTask is responsible for multi-series compaction, but some member variable 
in this class is used to execute single series compaction. I think the 
responsibilities of this class are a bit confusing, so we can extract the parts 
that perform a single series compaction into a class and put the associated 
variables into that class.



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/PointElement.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.cross.utils;
+
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+
+public class PointElement {
+  public long timestamp;
+  public int priority;
+  public Pair<Long, Object> timeValuePair;

Review Comment:
   Use TimeValuePair instead of Pair



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java:
##########
@@ -199,6 +209,63 @@ private PageReader 
constructPageReaderForNextPage(PageHeader pageHeader) throws
     return reader;
   }
 
+  /**
+   * Read page data without uncompressing it.
+   *
+   * @return compressed page data
+   */
+  public ByteBuffer readPageDataWithoutUncompressing(PageHeader pageHeader) 
throws IOException {
+    int compressedPageBodyLength = pageHeader.getCompressedSize();
+    byte[] compressedPageBody = new byte[compressedPageBodyLength];
+
+    // doesn't has a complete page body
+    if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
+      throw new IOException(
+          "do not has a complete page body. Expected:"
+              + compressedPageBodyLength
+              + ". Actual:"
+              + chunkDataBuffer.remaining());
+    }
+
+    chunkDataBuffer.get(compressedPageBody);
+    return ByteBuffer.wrap(compressedPageBody);
+  }
+
+  /**
+   * Read data from compressed page data. Uncompress the page and decode it to 
batch data.
+   *
+   * @param compressedPageData Compressed page data
+   */
+  public TsBlock readPageData(PageHeader pageHeader, ByteBuffer 
compressedPageData)
+      throws IOException {
+    // uncompress page data

Review Comment:
   Line 241-258 is duplicated with AlignedChunkReader::uncompressPageData



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/reader/PointPriorityReader.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.reader;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import 
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.FastCompactionPerformerSubTask;
+import org.apache.iotdb.db.engine.compaction.cross.utils.PageElement;
+import org.apache.iotdb.db.engine.compaction.cross.utils.PointElement;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * This reader is used to deduplicate and organize overlapping pages, and read 
out points in order.
+ * It is used for compaction.
+ */
+public class PointPriorityReader {
+  private long lastTime;
+
+  private final PriorityQueue<PointElement> pointQueue;
+
+  private final FastCompactionPerformerSubTask.RemovePage removePage;
+
+  private Pair<Long, Object> currentPoint;
+
+  private boolean isNewPoint = true;

Review Comment:
   ```suggestion
     private boolean shouldReadNextPoint = true;
   ```



##########
server/src/main/java/org/apache/iotdb/db/tools/validate/TsFileValidationTool.java:
##########
@@ -72,7 +72,7 @@
  */
 public class TsFileValidationTool {
   // print detail type of overlap or not
-  private static boolean printDetails = false;
+  private static boolean printDetails = true;

Review Comment:
   set to false



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCrossCompactionWriter.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.writer;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.rescon.SystemInfo;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWriter {
+
+  // target fileIOWriters
+  protected List<TsFileIOWriter> targetFileWriters = new ArrayList<>();
+
+  // source tsfiles
+  private List<TsFileResource> seqTsFileResources;
+
+  // Each sub task has its corresponding seq file index.
+  // The index of the array corresponds to subTaskId.
+  protected int[] seqFileIndexArray = new int[subTaskNum];
+
+  // device end time in each source seq file
+  protected final long[] currentDeviceEndTime;
+
+  // whether each target file is empty or not
+  protected final boolean[] isEmptyFile;
+
+  // whether each target file has device data or not
+  protected final boolean[] isDeviceExistedInTargetFiles;
+
+  // current chunk group header size
+  private int chunkGroupHeaderSize;
+
+  protected List<TsFileResource> targetResources;
+
+  public AbstractCrossCompactionWriter(
+      List<TsFileResource> targetResources, List<TsFileResource> 
seqFileResources)
+      throws IOException {
+    currentDeviceEndTime = new long[seqFileResources.size()];
+    isEmptyFile = new boolean[seqFileResources.size()];
+    isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+    long memorySizeForEachWriter =
+        (long)
+            (SystemInfo.getInstance().getMemorySizeForCompaction()
+                / 
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()
+                / targetResources.size());
+    for (int i = 0; i < targetResources.size(); i++) {
+      this.targetFileWriters.add(
+          new TsFileIOWriter(targetResources.get(i).getTsFile(), true, 
memorySizeForEachWriter));
+      isEmptyFile[i] = true;
+    }
+    this.seqTsFileResources = seqFileResources;
+    this.targetResources = targetResources;
+  }
+
+  @Override
+  public void startChunkGroup(String deviceId, boolean isAlign) throws 
IOException {
+    this.deviceId = deviceId;
+    this.isAlign = isAlign;
+    this.seqFileIndexArray = new int[subTaskNum];
+    checkIsDeviceExistAndGetDeviceEndTime();
+    for (int i = 0; i < targetFileWriters.size(); i++) {
+      chunkGroupHeaderSize = 
targetFileWriters.get(i).startChunkGroup(deviceId);
+    }
+  }
+
+  @Override
+  public void endChunkGroup() throws IOException {
+    for (int i = 0; i < seqTsFileResources.size(); i++) {
+      TsFileIOWriter targetFileWriter = targetFileWriters.get(i);
+      if (isDeviceExistedInTargetFiles[i]) {
+        targetFileWriter.endChunkGroup();
+      } else {
+        targetFileWriter.truncate(targetFileWriter.getPos() - 
chunkGroupHeaderSize);
+      }
+      isDeviceExistedInTargetFiles[i] = false;
+    }
+    seqFileIndexArray = null;
+  }
+
+  @Override
+  public void endMeasurement(int subTaskId) throws IOException {
+    flushChunkToFileWriter(
+        targetFileWriters.get(seqFileIndexArray[subTaskId]), 
chunkWriters[subTaskId]);
+    seqFileIndexArray[subTaskId] = 0;
+  }
+
+  @Override
+  public void write(long timestamp, Object value, int subTaskId) throws 
IOException {
+    checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
+    int fileIndex = seqFileIndexArray[subTaskId];
+    writeDataPoint(timestamp, value, chunkWriters[subTaskId]);
+    chunkPointNumArray[subTaskId]++;
+    checkChunkSizeAndMayOpenANewChunk(
+        targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, 
true);
+    isDeviceExistedInTargetFiles[fileIndex] = true;
+    isEmptyFile[fileIndex] = false;
+  }
+
+  /** Write data in batch, only used for aligned device. */
+  @Override
+  public abstract void write(TimeColumn timestamps, Column[] columns, int 
subTaskId, int batchSize)
+      throws IOException;
+
+  @Override
+  public void endFile() throws IOException {
+    for (int i = 0; i < isEmptyFile.length; i++) {
+      targetFileWriters.get(i).endFile();
+      // delete empty target file
+      if (isEmptyFile[i]) {
+        targetFileWriters.get(i).getFile().delete();
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (TsFileIOWriter targetWriter : targetFileWriters) {
+      if (targetWriter != null && targetWriter.canWrite()) {
+        targetWriter.close();
+      }
+    }
+    targetFileWriters = null;
+    seqTsFileResources = null;
+  }
+
+  @Override
+  public void checkAndMayFlushChunkMetadata() throws IOException {
+    for (int i = 0; i < targetFileWriters.size(); i++) {
+      TsFileIOWriter fileIOWriter = targetFileWriters.get(i);
+      TsFileResource resource = targetResources.get(i);
+      // Before flushing chunk metadatas, we use chunk metadatas in tsfile io 
writer to update start
+      // time and end time in resource.
+      List<TimeseriesMetadata> timeseriesMetadatasOfCurrentDevice =
+          fileIOWriter.getDeviceTimeseriesMetadataMap().get(deviceId);

Review Comment:
   Calling this function is costly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to