choubenson commented on code in PR #7621:
URL: https://github.com/apache/iotdb/pull/7621#discussion_r1004116956


##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/FastCompactionPerformerSubTask.java:
##########
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.cross.utils.ChunkMetadataElement;
+import org.apache.iotdb.db.engine.compaction.cross.utils.FileElement;
+import org.apache.iotdb.db.engine.compaction.cross.utils.PageElement;
+import org.apache.iotdb.db.engine.compaction.reader.PointPriorityReader;
+import org.apache.iotdb.db.engine.compaction.writer.FastCrossCompactionWriter;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+
+public abstract class FastCompactionPerformerSubTask implements Callable<Void> 
{
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+
+  @FunctionalInterface
+  public interface RemovePage {
+    void call(PageElement pageElement)
+        throws WriteProcessException, IOException, IllegalPathException;
+  }
+
+  // sorted source files by the start time of device
+  protected List<FileElement> fileList;
+
+  protected final PriorityQueue<ChunkMetadataElement> chunkMetadataQueue;
+
+  protected final PriorityQueue<PageElement> pageQueue;
+
+  protected FastCrossCompactionWriter compactionWriter;
+
+  protected int subTaskId;
+
+  // measurement -> tsfile resource -> timeseries metadata <startOffset, 
endOffset>
+  // used to get the chunk metadatas from tsfile directly according to 
timeseries metadata offset.
+  protected Map<String, Map<TsFileResource, Pair<Long, Long>>> 
timeseriesMetadataOffsetMap;
+
+  protected Map<TsFileResource, TsFileSequenceReader> readerCacheMap;
+
+  private final Map<TsFileResource, List<Modification>> modificationCacheMap;
+
+  // source files which are sorted by the start time of current device from 
old to new. Notice: If
+  // the type of timeIndex is FileTimeIndex, it may contain resources in which 
the current device
+  // does not exist.
+  protected List<TsFileResource> sortedSourceFiles;
+
+  private final PointPriorityReader pointPriorityReader = new 
PointPriorityReader(this::removePage);
+
+  private final boolean isAligned;
+
+  protected String deviceId;
+
+  public FastCompactionPerformerSubTask(
+      FastCrossCompactionWriter compactionWriter,
+      Map<String, Map<TsFileResource, Pair<Long, Long>>> 
timeseriesMetadataOffsetMap,
+      Map<TsFileResource, TsFileSequenceReader> readerCacheMap,
+      Map<TsFileResource, List<Modification>> modificationCacheMap,
+      List<TsFileResource> sortedSourceFiles,
+      String deviceId,
+      boolean isAligned,
+      int subTaskId) {
+    this.compactionWriter = compactionWriter;
+    this.subTaskId = subTaskId;
+    this.timeseriesMetadataOffsetMap = timeseriesMetadataOffsetMap;
+    this.isAligned = isAligned;
+    this.deviceId = deviceId;
+    this.readerCacheMap = readerCacheMap;
+    this.modificationCacheMap = modificationCacheMap;
+    this.sortedSourceFiles = sortedSourceFiles;
+
+    this.fileList = new ArrayList<>();
+    chunkMetadataQueue =
+        new PriorityQueue<>(
+            (o1, o2) -> {
+              int timeCompare = Long.compare(o1.startTime, o2.startTime);
+              return timeCompare != 0 ? timeCompare : 
Integer.compare(o2.priority, o1.priority);
+            });
+
+    pageQueue =
+        new PriorityQueue<>(
+            (o1, o2) -> {
+              int timeCompare = Long.compare(o1.startTime, o2.startTime);
+              return timeCompare != 0 ? timeCompare : 
Integer.compare(o2.priority, o1.priority);
+            });
+  }
+
+  protected void compactFiles()
+      throws PageException, IOException, WriteProcessException, 
IllegalPathException {
+    while (!fileList.isEmpty()) {
+      List<FileElement> overlappedFiles = findOverlapFiles(fileList.get(0));
+
+      // read chunk metadatas from files and put them into chunk metadata queue
+      deserializeFileIntoQueue(overlappedFiles);
+
+      if (!isAligned) {
+        // for nonAligned sensors, only after getting chunkMetadatas can we 
create schema to start
+        // measurement; for aligned sensors, we get all schemas of value 
sensors and
+        // startMeasurement() in the previous process, because we need to get 
all chunk metadatas of
+        // sensors and their schemas under the current device, but since the 
compaction process is
+        // to read a batch of overlapped files each time, which may not 
contain all the sensors.
+        startMeasurement();
+      }
+
+      compactChunks();
+    }
+  }
+
+  protected abstract void startMeasurement() throws IOException;
+
+  /**
+   * Compact chunks in chunk metadata queue.
+   *
+   * @throws IOException
+   * @throws PageException
+   */
+  private void compactChunks()
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    while (!chunkMetadataQueue.isEmpty()) {
+      ChunkMetadataElement firstChunkMetadataElement = 
chunkMetadataQueue.peek();
+      List<ChunkMetadataElement> overlappedChunkMetadatas =
+          findOverlapChunkMetadatas(firstChunkMetadataElement);
+      boolean isChunkOverlap = overlappedChunkMetadatas.size() > 1;
+      boolean isModified = isChunkModified(firstChunkMetadataElement);
+
+      if (isChunkOverlap || isModified) {
+        // has overlap or modified chunk, then deserialize it
+        compactWithOverlapChunks(overlappedChunkMetadatas);
+      } else {
+        // has none overlap or modified chunk, flush it to file writer directly
+        compactWithNonOverlapChunks(firstChunkMetadataElement);
+      }
+    }
+  }
+
+  /**
+   * Deserialize chunks and start compacting pages. Compact a series of chunks 
that overlap with
+   * each other. Eg: The parameters are chunk 1 and chunk 2, that is, chunk 1 
only overlaps with
+   * chunk 2, while chunk 2 overlap with chunk 3, chunk 3 overlap with chunk 
4,and so on, there are
+   * 10 chunks in total. This method will merge all 10 chunks.
+   */
+  private void compactWithOverlapChunks(List<ChunkMetadataElement> 
overlappedChunkMetadatas)
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    for (ChunkMetadataElement overlappedChunkMetadata : 
overlappedChunkMetadatas) {
+      deserializeChunkIntoQueue(overlappedChunkMetadata);
+    }
+    compactPages();
+  }
+
+  /**
+   * Flush chunk to target file directly. If the end time of chunk exceeds the 
end time of file or
+   * the unsealed chunk is too small, then deserialize it.
+   */
+  private void compactWithNonOverlapChunks(ChunkMetadataElement 
chunkMetadataElement)
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    if (compactionWriter.flushChunkToFileWriter(
+        chunkMetadataElement.chunkMetadata,
+        readerCacheMap.get(chunkMetadataElement.fileElement.resource),
+        subTaskId)) {
+      // flush chunk successfully
+      removeChunk(chunkMetadataQueue.peek());
+    } else {
+      // unsealed chunk is not large enough or chunk.endTime > file.endTime, 
then deserialize chunk
+      deserializeChunkIntoQueue(chunkMetadataElement);
+      compactPages();
+    }
+  }
+
+  abstract void deserializeChunkIntoQueue(ChunkMetadataElement 
chunkMetadataElement)
+      throws IOException;
+
+  /** Deserialize files into chunk metadatas and put them into the chunk 
metadata queue. */
+  abstract void deserializeFileIntoQueue(List<FileElement> fileElements)
+      throws IOException, IllegalPathException;
+
+  /** Compact pages in page queue. */
+  private void compactPages()
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    while (!pageQueue.isEmpty()) {
+      PageElement firstPageElement = pageQueue.peek();
+      int modifiedStatus = isPageModified(firstPageElement);
+
+      if (modifiedStatus == 1) {
+        // all data on this page has been deleted, remove it
+        removePage(firstPageElement);
+        continue;
+      }
+
+      List<PageElement> overlappedPages = findOverlapPages(firstPageElement);
+      boolean isPageOverlap = overlappedPages.size() > 1;
+
+      if (isPageOverlap || modifiedStatus == 0) {
+        // has overlap or modified pages, then deserialize it
+        compactWithOverlapPages(overlappedPages);
+      } else {
+        // has none overlap or modified pages, flush it to chunk writer 
directly
+        compactWithNonOverlapPage(firstPageElement);
+      }
+    }
+  }
+
+  private void compactWithNonOverlapPage(PageElement pageElement)
+      throws PageException, IOException, WriteProcessException, 
IllegalPathException {
+    boolean success;
+    if (pageElement.iChunkReader instanceof AlignedChunkReader) {
+      success =
+          compactionWriter.flushAlignedPageToChunkWriter(
+              pageElement.pageData,
+              pageElement.pageHeader,
+              pageElement.valuePageDatas,
+              pageElement.valuePageHeaders,
+              subTaskId);
+    } else {
+      success =
+          compactionWriter.flushPageToChunkWriter(
+              pageElement.pageData, pageElement.pageHeader, subTaskId);
+    }
+    if (success) {
+      // flush the page successfully, then remove this page
+      removePage(pageElement);
+    } else {
+      // unsealed page is not large enough or page.endTime > file.endTime, 
then deserialze it
+      pointPriorityReader.addNewPage(pageElement);
+
+      // write data points of the current page into chunk writer
+      while (pointPriorityReader.hasNext()
+          && pointPriorityReader.currentPoint().left <= 
pageElement.pageHeader.getEndTime()) {
+        compactionWriter.write(
+            pointPriorityReader.currentPoint().left,
+            pointPriorityReader.currentPoint().right,
+            subTaskId);
+        pointPriorityReader.next();
+      }
+    }
+  }
+
+  /**
+   * Compact a series of pages that overlap with each other. Eg: The 
parameters are page 1 and page
+   * 2, that is, page 1 only overlaps with page 2, while page 2 overlap with 
page 3, page 3 overlap
+   * with page 4,and so on, there are 10 pages in total. This method will 
merge all 10 pages.
+   */
+  private void compactWithOverlapPages(List<PageElement> overlappedPages)
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    pointPriorityReader.addNewPage(overlappedPages.remove(0));
+    pointPriorityReader.updateNewOverlappedPages(overlappedPages);
+    while (pointPriorityReader.hasNext()) {
+      // write point.time < the last overlapped page.startTime
+      while (overlappedPages.size() > 0) {
+        PageElement nextPageElement = overlappedPages.get(0);
+
+        int oldSize = overlappedPages.size();
+        // write currentPage.point.time < nextPage.startTime to chunk writer
+        while (pointPriorityReader.currentPoint().left < 
nextPageElement.startTime) {
+          // write data point to chunk writer
+          compactionWriter.write(
+              pointPriorityReader.currentPoint().left,
+              pointPriorityReader.currentPoint().right,
+              subTaskId);
+          pointPriorityReader.next();
+          if (overlappedPages.size() > oldSize) {

Review Comment:
   Resolved. It has been renamed to `candidateOverlappedPages` and put into 
`FastCompactionPerformerSubTask` class as a global variable. Pages in this list 
will be sequentially judged whether there is a real overlap to choose whether 
to put them in the point priority reader to deserialize or directly flush to 
chunk writer. During the process of compacting overlapped page, there may be 
new overlapped pages added into this list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to