choubenson commented on code in PR #7621:
URL: https://github.com/apache/iotdb/pull/7621#discussion_r1030071871


##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/utils/SeriesCompactionExecutor.java:
##########
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.cross.utils;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.compaction.reader.PointPriorityReader;
+import org.apache.iotdb.db.engine.compaction.task.SubCompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+public abstract class SeriesCompactionExecutor {
+  protected enum ModifiedStatus {
+    ALL_DELETED,
+    PARTIAL_DELETED,
+    NONE_DELETED;
+  }
+
+  @FunctionalInterface
+  public interface RemovePage {
+    void call(PageElement pageElement)
+        throws WriteProcessException, IOException, IllegalPathException;
+  }
+
+  private final SubCompactionTaskSummary summary;
+
+  // source files which are sorted by the start time of current device from 
old to new. Notice: If
+  // the type of timeIndex is FileTimeIndex, it may contain resources in which 
the current device
+  // does not exist.
+  protected List<FileElement> fileList = new ArrayList<>();;
+
+  protected final PriorityQueue<ChunkMetadataElement> chunkMetadataQueue;
+
+  protected final PriorityQueue<PageElement> pageQueue;
+
+  protected AbstractCompactionWriter compactionWriter;
+
+  protected int subTaskId;
+
+  protected Map<TsFileResource, TsFileSequenceReader> readerCacheMap;
+
+  private final Map<TsFileResource, List<Modification>> modificationCacheMap;
+
+  private final PointPriorityReader pointPriorityReader = new 
PointPriorityReader(this::removePage);
+
+  protected String deviceId;
+
+  // Pages in this list will be sequentially judged whether there is a real 
overlap to choose
+  // whether to put them in the point priority reader to deserialize or 
directly flush to chunk
+  // writer. During the process of compacting overlapped page, there may be 
new overlapped pages
+  // added into this list.
+  private final List<PageElement> candidateOverlappedPages = new ArrayList<>();
+
+  public SeriesCompactionExecutor(
+      AbstractCompactionWriter compactionWriter,
+      Map<TsFileResource, TsFileSequenceReader> readerCacheMap,
+      Map<TsFileResource, List<Modification>> modificationCacheMap,
+      String deviceId,
+      int subTaskId,
+      SubCompactionTaskSummary summary) {
+    this.compactionWriter = compactionWriter;
+    this.subTaskId = subTaskId;
+    this.deviceId = deviceId;
+    this.readerCacheMap = readerCacheMap;
+    this.modificationCacheMap = modificationCacheMap;
+    this.summary = summary;
+
+    chunkMetadataQueue =
+        new PriorityQueue<>(
+            (o1, o2) -> {
+              int timeCompare = Long.compare(o1.startTime, o2.startTime);
+              return timeCompare != 0 ? timeCompare : 
Long.compare(o2.priority, o1.priority);
+            });
+
+    pageQueue =
+        new PriorityQueue<>(
+            (o1, o2) -> {
+              int timeCompare = Long.compare(o1.startTime, o2.startTime);
+              return timeCompare != 0 ? timeCompare : 
Long.compare(o2.priority, o1.priority);
+            });
+  }
+
+  public abstract void excute()
+      throws PageException, IllegalPathException, IOException, 
WriteProcessException;
+
+  protected abstract void compactFiles()
+      throws PageException, IOException, WriteProcessException, 
IllegalPathException;
+
+  /** Compact chunks in chunk metadata queue. */
+  protected void compactChunks()
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    while (!chunkMetadataQueue.isEmpty()) {
+      ChunkMetadataElement firstChunkMetadataElement = 
chunkMetadataQueue.peek();
+      List<ChunkMetadataElement> overlappedChunkMetadatas =
+          findOverlapChunkMetadatas(firstChunkMetadataElement);
+      boolean isChunkOverlap = overlappedChunkMetadatas.size() > 1;
+      boolean isModified = isChunkModified(firstChunkMetadataElement);
+
+      if (isChunkOverlap || isModified) {
+        // has overlap or modified chunk, then deserialize it
+        summary.CHUNK_OVERLAP_OR_MODIFIED += overlappedChunkMetadatas.size();
+        compactWithOverlapChunks(overlappedChunkMetadatas);
+      } else {
+        // has none overlap or modified chunk, flush it to file writer directly
+        summary.CHUNK_NONE_OVERLAP += 1;
+        compactWithNonOverlapChunk(firstChunkMetadataElement);
+      }
+    }
+  }
+
+  /**
+   * Deserialize chunks and start compacting pages. Compact a series of chunks 
that overlap with
+   * each other. Eg: The parameters are chunk 1 and chunk 2, that is, chunk 1 
only overlaps with
+   * chunk 2, while chunk 2 overlap with chunk 3, chunk 3 overlap with chunk 
4,and so on, there are
+   * 10 chunks in total. This method will merge all 10 chunks.
+   */
+  private void compactWithOverlapChunks(List<ChunkMetadataElement> 
overlappedChunkMetadatas)
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    for (ChunkMetadataElement overlappedChunkMetadata : 
overlappedChunkMetadatas) {
+      readChunk(overlappedChunkMetadata);
+      deserializeChunkIntoQueue(overlappedChunkMetadata);
+    }
+    compactPages();
+  }
+
+  /**
+   * Flush chunk to target file directly. If the end time of chunk exceeds the 
end time of file or
+   * the unsealed chunk is too small, then deserialize it.
+   */
+  private void compactWithNonOverlapChunk(ChunkMetadataElement 
chunkMetadataElement)
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    readChunk(chunkMetadataElement);
+    boolean success;
+    if (chunkMetadataElement.chunkMetadata instanceof AlignedChunkMetadata) {
+      success =
+          compactionWriter.flushAlignedChunk(
+              chunkMetadataElement.chunk,
+              ((AlignedChunkMetadata) 
chunkMetadataElement.chunkMetadata).getTimeChunkMetadata(),
+              chunkMetadataElement.valueChunks,
+              ((AlignedChunkMetadata) chunkMetadataElement.chunkMetadata)
+                  .getValueChunkMetadataList(),
+              subTaskId);
+    } else {
+      success =
+          compactionWriter.flushNonAlignedChunk(
+              chunkMetadataElement.chunk,
+              (ChunkMetadata) chunkMetadataElement.chunkMetadata,
+              subTaskId);
+    }
+    if (success) {
+      // flush chunk successfully, then remove this chunk
+      removeChunk(chunkMetadataQueue.peek());
+    } else {
+      // unsealed chunk is not large enough or chunk.endTime > file.endTime, 
then deserialize chunk
+      summary.CHUNK_NONE_OVERLAP_BUT_DESERIALIZE += 1;
+      deserializeChunkIntoQueue(chunkMetadataElement);
+      compactPages();
+    }
+  }
+
+  abstract void deserializeChunkIntoQueue(ChunkMetadataElement 
chunkMetadataElement)
+      throws IOException;
+
+  abstract void readChunk(ChunkMetadataElement chunkMetadataElement) throws 
IOException;
+
+  /** Deserialize files into chunk metadatas and put them into the chunk 
metadata queue. */
+  abstract void deserializeFileIntoQueue(List<FileElement> fileElements)
+      throws IOException, IllegalPathException;
+
+  /** Compact pages in page queue. */
+  private void compactPages()
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    while (!pageQueue.isEmpty()) {
+      PageElement firstPageElement = pageQueue.peek();
+      ModifiedStatus modifiedStatus = isPageModified(firstPageElement);
+
+      if (modifiedStatus == ModifiedStatus.ALL_DELETED) {
+        // all data on this page has been deleted, remove it
+        removePage(firstPageElement);
+        continue;
+      }
+
+      List<PageElement> overlapPages = findOverlapPages(firstPageElement);
+      boolean isPageOverlap = overlapPages.size() > 1;
+
+      if (isPageOverlap || modifiedStatus == ModifiedStatus.PARTIAL_DELETED) {
+        // has overlap or modified pages, then deserialize it
+        summary.PAGE_OVERLAP_OR_MODIFIED += 1;
+        pointPriorityReader.addNewPage(overlapPages.remove(0));
+        addOverlappedPagesIntoList(overlapPages);
+        compactWithOverlapPages();
+      } else {
+        // has none overlap or modified pages, flush it to chunk writer 
directly
+        summary.PAGE_NONE_OVERLAP += 1;
+        compactWithNonOverlapPage(firstPageElement);
+      }
+    }
+  }
+
+  private void compactWithNonOverlapPage(PageElement pageElement)
+      throws PageException, IOException, WriteProcessException, 
IllegalPathException {
+    boolean success;
+    if (pageElement.iChunkReader instanceof AlignedChunkReader) {
+      success =
+          compactionWriter.flushAlignedPage(
+              pageElement.pageData,
+              pageElement.pageHeader,
+              pageElement.valuePageDatas,
+              pageElement.valuePageHeaders,
+              subTaskId);
+    } else {
+      success =
+          compactionWriter.flushNonAlignedPage(
+              pageElement.pageData, pageElement.pageHeader, subTaskId);
+    }
+    if (success) {
+      // flush the page successfully, then remove this page
+      removePage(pageElement);
+    } else {
+      // unsealed page is not large enough or page.endTime > file.endTime, 
then deserialze it
+      summary.PAGE_NONE_OVERLAP_BUT_DESERIALIZE += 1;
+      pointPriorityReader.addNewPage(pageElement);
+
+      // write data points of the current page into chunk writer
+      while (pointPriorityReader.hasNext()
+          && pointPriorityReader.currentPoint().getTimestamp()
+              <= pageElement.pageHeader.getEndTime()) {
+        compactionWriter.write(pointPriorityReader.currentPoint(), subTaskId);
+        pointPriorityReader.next();
+      }
+    }
+  }
+
+  /**
+   * Compact a series of pages that overlap with each other. Eg: The 
parameters are page 1 and page
+   * 2, that is, page 1 only overlaps with page 2, while page 2 overlap with 
page 3, page 3 overlap
+   * with page 4,and so on, there are 10 pages in total. This method will 
compact all 10 pages.
+   * Pages in the candidate overlapped pages list will be sequentially judged 
whether there is a
+   * real overlap, if so, it will be put into the point priority reader and 
deserialized; if not, it
+   * means that the page is located in a gap inside another pages, and it can 
be directly flushed to
+   * chunk writer. There will be new overlapped pages added into the list 
during the process of
+   * compacting overlapped pages. Notice: for a real overlap page, it will be 
removed from candidate
+   * list after it has been adding into point priority reader and 
deserializing. For a fake overlap
+   * page, it will be removed from candidate list after it has been flushing 
to chunk writer
+   * completely.
+   */
+  private void compactWithOverlapPages()
+      throws IOException, PageException, WriteProcessException, 
IllegalPathException {
+    checkAndCompactOverlappePages();
+
+    // write remaining data points, of which point.time >= the last overlapped 
page.startTime
+    while (pointPriorityReader.hasNext()) {
+      // write data point to chunk writer
+
+      compactionWriter.write(pointPriorityReader.currentPoint(), subTaskId);
+      pointPriorityReader.next();
+      if (candidateOverlappedPages.size() > 0) {
+        // finish compacting the first page or there are new chunks being 
deserialized and find
+        // the new overlapped pages, then start compacting them
+        checkAndCompactOverlappePages();
+      }
+    }
+  }
+
+  /**
+   * Check whether the page is true overlap or fake overlap. If a page is 
located in the gap of
+   * another page, then this page is fake overlap, which can be flushed to 
chunk writer directly.
+   * Otherwise, deserialize this page into point priority reader.
+   */
+  private void checkAndCompactOverlappePages()
+      throws IllegalPathException, IOException, WriteProcessException, 
PageException {
+    // write point.time < the last overlapped page.startTime
+    while (candidateOverlappedPages.size() > 0) {
+      PageElement nextPageElement = candidateOverlappedPages.get(0);
+
+      int oldSize = candidateOverlappedPages.size();
+      // write currentPage.point.time < nextPage.startTime to chunk writer
+      while (pointPriorityReader.hasNext()
+          && pointPriorityReader.currentPoint().getTimestamp() < 
nextPageElement.startTime) {
+        // write data point to chunk writer
+        compactionWriter.write(pointPriorityReader.currentPoint(), subTaskId);
+        pointPriorityReader.next();
+        if (candidateOverlappedPages.size() > oldSize) {
+          // during the process of writing overlapped points, if the first 
page is compacted
+          // completely or a new chunk is deserialized, there may be new pages 
overlapped with the
+          // first page in page queue which are added into the list. If so, 
the next overlapped
+          // page in the list may be changed, so we should re-get next overlap 
page here.
+          oldSize = candidateOverlappedPages.size();
+          nextPageElement = candidateOverlappedPages.get(0);
+        }
+      }
+
+      ModifiedStatus nextPageModifiedStatus = isPageModified(nextPageElement);
+
+      if (nextPageModifiedStatus == ModifiedStatus.ALL_DELETED) {
+        // all data on next page has been deleted, remove it
+        removePage(nextPageElement);
+      } else {
+        boolean isNextPageOverlap =
+            (pointPriorityReader.hasNext()
+                    && pointPriorityReader.currentPoint().getTimestamp()
+                        <= nextPageElement.pageHeader.getEndTime())
+                || isPageOverlap(nextPageElement);
+
+        if (isNextPageOverlap || nextPageModifiedStatus == 
ModifiedStatus.PARTIAL_DELETED) {
+          // has overlap or modified pages, then deserialize it
+          pointPriorityReader.addNewPage(nextPageElement);
+        } else {
+          // has none overlap or modified pages, flush it to chunk writer 
directly
+          summary.PAGE_FAKE_OVERLAP += 1;
+          compactWithNonOverlapPage(nextPageElement);
+        }
+      }
+      candidateOverlappedPages.remove(0);
+    }
+  }
+
+  /**
+   * Add the new overlapped pages into the global list and sort it according 
to the startTime of the
+   * page from small to large, so that each page can be compacted in order. If 
the page has been
+   * deleted completely, we remove it.
+   */
+  private void addOverlappedPagesIntoList(List<PageElement> 
newOverlappedPages) {
+    summary.PAGE_OVERLAP_OR_MODIFIED += newOverlappedPages.size();
+    int oldSize = candidateOverlappedPages.size();
+    candidateOverlappedPages.addAll(newOverlappedPages);
+    if (oldSize != 0 && candidateOverlappedPages.size() > oldSize) {
+      // if there is no pages in the overlappedPages, then we don't need to 
sort it after adding the
+      // new overlapped pages, because newOverlappedPages is already sorted. 
When there is pages in
+      // list before and there is new pages added into list, then we need to 
sort it again.
+      // we should ensure that the list is ordered according to the startTime 
of the page from small
+      // to large, so that each page can be compacted in order
+      candidateOverlappedPages.sort(Comparator.comparingLong(o -> 
o.startTime));
+    }
+  }
+
+  /**
+   * Find overlaped pages which have not been selected. Notice: We must ensure 
that the returned
+   * list is ordered according to the startTime of the page from small to 
large, so that each page
+   * can be compacted in order.
+   */
+  private List<PageElement> findOverlapPages(PageElement page) {
+    List<PageElement> elements = new ArrayList<>();
+    long endTime = page.pageHeader.getEndTime();
+    for (PageElement element : pageQueue) {
+      if (element.startTime <= endTime) {
+        if (!element.isOverlaped) {
+          elements.add(element);
+          element.isOverlaped = true;
+        }
+      }
+    }
+    elements.sort(Comparator.comparingLong(o -> o.startTime));
+    return elements;
+  }
+
+  /**
+   * Find overlapped chunks which have not been selected. Notice: We must 
ensure that the returned
+   * list is ordered according to the startTime of the chunk from small to 
large, so that each chunk
+   * can be compacted in order.
+   */
+  private List<ChunkMetadataElement> findOverlapChunkMetadatas(
+      ChunkMetadataElement chunkMetadataElement) {
+    List<ChunkMetadataElement> elements = new ArrayList<>();
+    long endTime = chunkMetadataElement.chunkMetadata.getEndTime();
+    for (ChunkMetadataElement element : chunkMetadataQueue) {
+      if (element.chunkMetadata.getStartTime() <= endTime) {
+        if (!element.isOverlaped) {
+          elements.add(element);
+          element.isOverlaped = true;
+        }
+      }
+    }
+    elements.sort(Comparator.comparingLong(o -> o.startTime));
+    return elements;
+  }
+
+  /**
+   * Find overlapped files which have not been selected. Notice: We must 
ensure that the returned
+   * list is ordered according to the startTime of the current device in the 
file from small to
+   * large, so that each file can be compacted in order.
+   */
+  protected List<FileElement> findOverlapFiles(FileElement file) {
+    List<FileElement> overlappedFiles = new ArrayList<>();
+    long endTime = file.resource.getEndTime(deviceId);
+    for (FileElement fileElement : fileList) {
+      if (fileElement.resource.getStartTime(deviceId) <= endTime) {
+        if (!fileElement.isOverlap) {
+          overlappedFiles.add(fileElement);
+          fileElement.isOverlap = true;
+        }
+      } else {
+        break;
+      }
+    }
+    return overlappedFiles;
+  }
+
+  /** Check is the page overlap with other pages later then the specific page 
in queue or not. */
+  private boolean isPageOverlap(PageElement pageElement) {
+    long endTime = pageElement.pageHeader.getEndTime();
+    long startTime = pageElement.startTime;
+    for (PageElement element : pageQueue) {
+      if (element.equals(pageElement)) {
+        continue;
+      }
+      // only check pages later than the specific page
+      if (element.startTime >= startTime && element.startTime <= endTime) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check whether the chunk is modified.
+   *
+   * <p>Notice: if is aligned chunk, return true if any of value chunk has 
data been deleted. Return
+   * false if and only if all value chunks has no data been deleted.
+   */
+  protected boolean isChunkModified(ChunkMetadataElement chunkMetadataElement) 
{

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to