jt2594838 commented on code in PR #14616:
URL: https://github.com/apache/iotdb/pull/14616#discussion_r1908232114


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java:
##########
@@ -649,6 +650,40 @@ public void releaseResourceWhenAllDriversAreClosed() {
     releaseResource();
   }
 
+  /**
+   * It checks all referenced TVList by the query: 1. If current is not the 
owner, just remove
+   * itself from query context list 2. If current query is the owner and no 
other query use it now,
+   * release the TVList 3. If current query is the owner and other queries 
still use it, set the
+   * next query as owner
+   */
+  private void releaseTVListOwnedByQuery() {
+    for (TVList tvList : tvListSet) {
+      tvList.lockQueryList();
+      List<QueryContext> queryContextList = tvList.getQueryContextList();
+      try {
+        queryContextList.remove(this);
+        if (tvList.getOwnerQuery() == this) {
+          if (queryContextList.isEmpty()) {
+            LOGGER.debug(
+                "TVList {} is released by the query, FragmentInstance Id is 
{}",
+                tvList,
+                this.getId());
+            tvList.clear();
+            
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());

Review Comment:
   Is the memory calculation still correct after the list is cleared?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java:
##########
@@ -419,4 +531,107 @@ public List<IChunkMetadata> 
getVisibleMetadataListFromWriter(
     chunkMetadataList.removeIf(x -> x.getEndTime() < timeLowerBound);
     return chunkMetadataList;
   }
+
+  /**
+   * Prepare the TVList references for the query. We remember TVLists' row 
count here and determine
+   * whether the TVLists needs sorting later during operator execution based 
on it. It need not
+   * protect sorted list. Sorted list is changed in the handover process of 
inserting, which holds
+   * the data region write lock. At this moment, query thread holds the data 
region read lock.
+   *
+   * @param context query context
+   * @param memChunk writable memchunk
+   * @param isWorkMemTable in working or flushing memtable
+   * @param globalTimeFilter global time filter
+   * @return Map<TVList, Integer>
+   */
+  private Map<TVList, Integer> prepareTvListMapForQuery(

Review Comment:
   prepareTvListMapForQuery and prepareAlignedTvListMapForQuery are highly 
alike. Is there any chance to abstract them?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java:
##########
@@ -584,7 +946,9 @@ public void serializeToWAL(IWALByteBufferView buffer) {
       schema.serializeTo(ByteBuffer.wrap(bytes));
       buffer.put(bytes);
     }
-
+    for (AlignedTVList alignedTvList : sortedList) {
+      alignedTvList.serializeToWAL(buffer);
+    }

Review Comment:
   Where is the deserialization? Is it possible to know how many sorted lists 
there will be during deserialization?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkReader.java:
##########
@@ -65,4 +104,204 @@ public void close() {
   public List<IPageReader> loadPageReaderList() {
     return this.pageReaderList;
   }
+
+  class TsBlockSupplier implements Supplier<TsBlock> {
+    private int[] pageEndOffsets;
+
+    public TsBlockSupplier() {}
+
+    public void setPageEndOffsets(int[] pageEndOffsets) {
+      this.pageEndOffsets = pageEndOffsets;
+    }
+
+    @Override
+    public TsBlock get() {
+      return buildTsBlock();
+    }
+
+    private TsBlock buildTsBlock() {
+      try {
+        List<TSDataType> tsDataTypes = readableChunk.getDataTypes();
+        TsBlockBuilder builder = new TsBlockBuilder(tsDataTypes);
+        writeValidValuesIntoTsBlock(builder);
+        return builder.build();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private boolean isOutOfMemPageBounds() {
+      if (pageEndOffsets == null) {
+        return false;
+      }
+      int[] currTvListOffsets = 
timeValuePairIterator.getAlignedTVListOffsets();
+      for (int i = 0; i < pageEndOffsets.length; i++) {
+        if (currTvListOffsets[i] < pageEndOffsets[i]) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private void writePageTimeIntoBuilder(long[] time, int count, 
TsBlockBuilder builder) {
+      for (int index = 0; index < count; index++) {
+        builder.getTimeColumnBuilder().writeLong(time[index]);
+      }
+    }
+
+    private void writePageValuesIntoBuilder(
+        PageColumnAccessInfo[] columnAccessInfo,
+        List<TSDataType> tsDataTypes,
+        TsBlockBuilder builder) {
+      for (int columnIndex = 0; columnIndex < tsDataTypes.size(); 
columnIndex++) {
+        PageColumnAccessInfo pageAccessInfo = columnAccessInfo[columnIndex];
+        ColumnBuilder valueBuilder = builder.getColumnBuilder(columnIndex);
+        switch (tsDataTypes.get(columnIndex)) {
+          case BOOLEAN:
+            for (int index = 0; index < pageAccessInfo.count(); index++) {
+              int[] accessInfo = pageAccessInfo.get(index);
+              TsPrimitiveType value =
+                  timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+              if (value == null) {
+                valueBuilder.appendNull();
+              } else {
+                valueBuilder.writeBoolean(value.getBoolean());
+              }
+            }
+            break;
+          case INT32:
+          case DATE:
+            for (int index = 0; index < pageAccessInfo.count(); index++) {
+              int[] accessInfo = pageAccessInfo.get(index);
+              TsPrimitiveType value =
+                  timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+              if (value == null) {
+                valueBuilder.appendNull();
+              } else {
+                valueBuilder.writeInt(value.getInt());
+              }
+            }
+            break;
+          case INT64:
+          case TIMESTAMP:
+            for (int index = 0; index < pageAccessInfo.count(); index++) {
+              int[] accessInfo = pageAccessInfo.get(index);
+              TsPrimitiveType value =
+                  timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+              if (value == null) {
+                valueBuilder.appendNull();
+              } else {
+                valueBuilder.writeLong(value.getLong());
+              }
+            }
+            break;
+          case FLOAT:
+            for (int index = 0; index < pageAccessInfo.count(); index++) {
+              int[] accessInfo = pageAccessInfo.get(index);
+              TsPrimitiveType value =
+                  timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+              if (value == null) {
+                valueBuilder.appendNull();
+              } else {
+                valueBuilder.writeFloat(value.getFloat());
+              }
+            }
+            break;
+          case DOUBLE:
+            for (int index = 0; index < pageAccessInfo.count(); index++) {
+              int[] accessInfo = pageAccessInfo.get(index);
+              TsPrimitiveType value =
+                  timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+              if (value == null) {
+                valueBuilder.appendNull();
+              } else {
+                valueBuilder.writeDouble(value.getDouble());
+              }
+            }
+            break;
+          case TEXT:
+          case BLOB:
+          case STRING:
+            for (int index = 0; index < pageAccessInfo.count(); index++) {
+              int[] accessInfo = pageAccessInfo.get(index);
+              TsPrimitiveType value =
+                  timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+              if (value == null) {
+                valueBuilder.appendNull();
+              } else {
+                valueBuilder.writeBinary(value.getBinary());
+              }
+            }
+            break;
+          default:
+            break;
+        }
+      }
+    }
+
+    // read one page and write to tsblock
+    private synchronized void writeValidValuesIntoTsBlock(TsBlockBuilder 
builder) {
+      boolean ignoreAllNullRows = 
readableChunk.getContext().isIgnoreAllNullRows();
+      List<TSDataType> tsDataTypes = readableChunk.getDataTypes();
+      List<TimeRange> timeColumnDeletion = 
readableChunk.getTimeColumnDeletion();
+      List<List<TimeRange>> valueColumnsDeletionList = 
readableChunk.getValueColumnsDeletionList();
+
+      int pointsInPage = 0;
+      long[] time = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+      PageColumnAccessInfo[] pageColumnAccessInfo = new 
PageColumnAccessInfo[tsDataTypes.size()];

Review Comment:
   A similar procedure is in `initChunkMetaFromTvLists`. Is it possible to 
reuse some calculations there?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java:
##########
@@ -1516,4 +1509,212 @@ public List<List<BitMap>> getBitMaps() {
   public boolean isAllDeleted() {
     return timeDeletedCnt == rowCount;
   }
+
+  public AlignedTVListIterator iterator(
+      List<TSDataType> dataTypeList,
+      List<Integer> columnIndexList,
+      boolean ignoreAllNullRows,
+      Integer floatPrecision,
+      List<TSEncoding> encodingList) {
+    return new AlignedTVListIterator(
+        dataTypeList, columnIndexList, ignoreAllNullRows, floatPrecision, 
encodingList);
+  }
+
+  /* AlignedTVList Iterator */
+  public class AlignedTVListIterator extends TVListIterator {
+    private BitMap allValueColDeletedMap;
+
+    private TSDataType[] dataTypeArray;
+    private int[] columnIndexArray;
+    private Integer floatPrecision;
+    private TSEncoding[] encodingArray;
+
+    // remember the selected index of last not-null value for each column 
during prepareNext phase.
+    // It is already converted by getValueIndex method, so it can be directly 
used in
+    // getPrimitiveObject method.
+    private int[] selectedIndex;
+
+    public AlignedTVListIterator() {
+      super();
+    }
+
+    public AlignedTVListIterator(
+        List<TSDataType> dataTypeList,
+        List<Integer> columnIndexList,
+        boolean ignoreAllNullRows,
+        Integer floatPrecision,
+        List<TSEncoding> encodingList) {
+      super(null, null);
+      this.dataTypeArray = dataTypeList.toArray(new TSDataType[0]);
+      this.columnIndexArray =
+          (columnIndexList == null)
+              ? IntStream.range(0, dataTypes.size()).toArray()
+              : columnIndexList.stream().mapToInt(Integer::intValue).toArray();
+      this.allValueColDeletedMap = ignoreAllNullRows ? 
getAllValueColDeletedMap() : null;
+      this.floatPrecision = floatPrecision;
+      this.encodingArray = encodingList == null ? null : 
encodingList.toArray(new TSEncoding[0]);
+      this.selectedIndex = new int[dataTypeList.size()];
+    }
+
+    private void prepareNext() {
+      // find the first row that is neither deleted nor empty (all NULL values)
+      boolean findValidRow = false;
+      while (index < rows && !findValidRow) {
+        int rowIndex = getValueIndex(index);
+        // all columns values are deleted
+        if ((allValueColDeletedMap != null && 
allValueColDeletedMap.isMarked(rowIndex))
+            || isTimeDeleted(rowIndex, false)) {
+          index++;
+          currentTime = index < rows ? getTime(index) : Long.MIN_VALUE;
+          continue;
+        }
+
+        // does not find any valid row
+        if (index >= rows) {
+          probeNext = true;
+          return;
+        }
+        Arrays.fill(selectedIndex, rowIndex);
+        findValidRow = true;
+      }
+
+      // handle duplicated timestamp
+      while (index + 1 < rows && getTime(index + 1) == currentTime) {
+        index++;
+        // skip all-Null rows if allValueColDeletedMap exits

Review Comment:
   exists



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java:
##########
@@ -281,14 +430,20 @@ protected void cloneAs(TVList cloneList) {
       cloneList.timestamps.add(cloneTime(timestampArray));
     }
     cloneList.rowCount = rowCount;
+    cloneList.seqRowCount = seqRowCount;
     cloneList.sorted = sorted;
     cloneList.maxTime = maxTime;
+    cloneList.minTime = minTime;
   }
 
   public void clear() {
     rowCount = 0;
+    seqRowCount = 0;
     sorted = true;
     maxTime = Long.MIN_VALUE;
+    minTime = Long.MAX_VALUE;
+    queryContextList.clear();
+    ownerQuery = null;

Review Comment:
   How about indices and bitmaps?



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -1142,6 +1142,13 @@ unseq_memtable_flush_check_interval_in_ms=30000
 # effectiveMode: restart
 tvlist_sort_algorithm=TIM
 
+# When point number in the working TVList exceeds this, it is sorted and 
handover in writable memtable
+# default 0 means it does not handover working tvlist
+# effectiveMode: restart

Review Comment:
   Better to make it a hot_reload



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java:
##########
@@ -281,14 +430,20 @@ protected void cloneAs(TVList cloneList) {
       cloneList.timestamps.add(cloneTime(timestampArray));
     }
     cloneList.rowCount = rowCount;
+    cloneList.seqRowCount = seqRowCount;
     cloneList.sorted = sorted;
     cloneList.maxTime = maxTime;
+    cloneList.minTime = minTime;

Review Comment:
   Should the indices and bitmaps also be cloned?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java:
##########
@@ -32,6 +34,9 @@
 import java.util.concurrent.BlockingQueue;
 
 public interface IWritableMemChunk extends WALEntryValue {
+  int TVLIST_SORT_THRESHOLD = 
IoTDBDescriptor.getInstance().getConfig().getTvListSortThreshold();
+  int MAX_NUMBER_OF_POINTS_IN_PAGE =
+      TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();

Review Comment:
   If you put them here (as static members), they cannot be hot-reloaded.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java:
##########
@@ -241,21 +297,97 @@ private Pair<Object[], BitMap[]> 
checkAndReorderColumnValuesInInsertPlan(
     return new Pair<>(reorderedColumnValues, reorderedBitMaps);
   }
 
+  private void filterDeletedTimeStamp(
+      AlignedTVList alignedTVList,
+      List<List<TimeRange>> valueColumnsDeletionList,
+      boolean ignoreAllNullRows,
+      Map<Long, BitMap> timestampWithBitmap) {
+    BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
+
+    int rowCount = alignedTVList.rowCount();
+    List<int[]> valueColumnDeleteCursor = new ArrayList<>();
+    if (valueColumnsDeletionList != null) {
+      valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new 
int[] {0}));
+    }
+
+    for (int row = 0; row < rowCount; row++) {
+      // the row is deleted
+      if (allValueColDeletedMap != null && 
allValueColDeletedMap.isMarked(row)) {
+        continue;
+      }
+      long timestamp = alignedTVList.getTime(row);
+
+      BitMap bitMap = new BitMap(schemaList.size());
+      bitMap.markAll();
+      for (int column = 0; column < schemaList.size(); column++) {
+        if (!alignedTVList.isNullValue(alignedTVList.getValueIndex(row), 
column)) {
+          bitMap.unmark(column);
+        }
+
+        // skip deleted row

Review Comment:
   "mark deleted column" may be more precise



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java:
##########
@@ -245,17 +312,96 @@ private void sortTVList() {
 
   @Override
   public synchronized void sortTvListForFlush() {
-    sortTVList();
+    TVList cloneList = null;
+    list.lockQueryList();
+    try {
+      // During flush, if the working TVList is not sorted and referenced by 
some query, we need to
+      // clone it. The query still refer to original unsorted TVList.
+      if (!list.isSorted() && !list.getQueryContextList().isEmpty()) {
+        QueryContext firstQuery = list.getQueryContextList().get(0);
+        // reserve query memory
+        if (firstQuery instanceof FragmentInstanceContext) {
+          MemoryReservationManager memoryReservationManager =
+              ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
+          
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
+        }
+        list.setOwnerQuery(firstQuery);
+        cloneList = list.clone();
+      }
+    } finally {
+      list.unlockQueryList();
+    }
+    if (cloneList != null) {
+      setWorkingTVList(cloneList);
+    }
+
+    if (!list.isSorted()) {
+      list.sort();
+    }
+  }
+
+  private void filterDeletedTimestamp(
+      TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) {
+    long lastTime = Long.MIN_VALUE;
+    int[] deletionCursor = {0};
+    int rowCount = tvlist.rowCount();
+    for (int i = 0; i < rowCount; i++) {
+      if (tvlist.getBitMap() != null && tvlist.isNullValue(i)) {
+        continue;
+      }
+      long curTime = tvlist.getTime(i);
+      if (deletionList != null
+          && ModificationUtils.isPointDeleted(curTime, deletionList, 
deletionCursor)) {
+        continue;
+      }
+
+      if (i == rowCount - 1 || curTime != lastTime) {
+        timestampList.add(curTime);
+      }
+      lastTime = curTime;
+    }
+  }
+
+  public long[] getFilteredTimestamp(List<TimeRange> deletionList) {
+    List<Long> timestampList = new ArrayList<>();
+    filterDeletedTimestamp(list, deletionList, timestampList);
+    for (TVList tvList : sortedList) {
+      filterDeletedTimestamp(tvList, deletionList, timestampList);
+    }
+
+    // remove duplicated time
+    List<Long> distinctTimestamps = 
timestampList.stream().distinct().collect(Collectors.toList());
+    // sort timestamps
+    long[] filteredTimestamps = 
distinctTimestamps.stream().mapToLong(Long::longValue).toArray();
+    Arrays.sort(filteredTimestamps);

Review Comment:
   Not sure about that, but I wonder if it will be faster to duplicate a sorted 
list.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java:
##########
@@ -534,34 +717,210 @@ private void handleEncoding(
     }
   }
 
+  private void writePageValuesIntoWriter(
+      IChunkWriter chunkWriter,
+      long[] times,
+      PageColumnAccessInfo[] pageColumnAccessInfo,
+      MergeSortAlignedTVListIterator timeValuePairIterator) {
+    AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
+
+    // update value statistics
+    for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
+      ValueChunkWriter valueChunkWriter =
+          alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
+      PageColumnAccessInfo pageAccessInfo = pageColumnAccessInfo[columnIndex];
+      switch (dataTypes.get(columnIndex)) {
+        case BOOLEAN:
+          for (int index = 0; index < pageAccessInfo.count(); index++) {
+            int[] accessInfo = pageAccessInfo.get(index);
+            TsPrimitiveType value =
+                timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+            valueChunkWriter.write(
+                times[index], value != null && value.getBoolean(), value == 
null);
+          }
+          break;
+        case INT32:
+        case DATE:
+          for (int index = 0; index < pageAccessInfo.count(); index++) {
+            int[] accessInfo = pageAccessInfo.get(index);
+            TsPrimitiveType value =
+                timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+            valueChunkWriter.write(times[index], value == null ? 0 : 
value.getInt(), value == null);
+          }
+          break;
+        case INT64:
+        case TIMESTAMP:
+          for (int index = 0; index < pageAccessInfo.count(); index++) {
+            int[] accessInfo = pageAccessInfo.get(index);
+            TsPrimitiveType value =
+                timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+            valueChunkWriter.write(
+                times[index], value == null ? 0L : value.getLong(), value == 
null);
+          }
+          break;
+        case FLOAT:
+          for (int index = 0; index < pageAccessInfo.count(); index++) {
+            int[] accessInfo = pageAccessInfo.get(index);
+            TsPrimitiveType value =
+                timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+            valueChunkWriter.write(
+                times[index], value == null ? 0f : value.getFloat(), value == 
null);
+          }
+          break;
+        case DOUBLE:
+          for (int index = 0; index < pageAccessInfo.count(); index++) {
+            int[] accessInfo = pageAccessInfo.get(index);
+            TsPrimitiveType value =
+                timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+            valueChunkWriter.write(
+                times[index], value == null ? 0d : value.getDouble(), value == 
null);
+          }
+          break;
+        case TEXT:
+        case BLOB:
+        case STRING:
+          for (int index = 0; index < pageAccessInfo.count(); index++) {
+            int[] accessInfo = pageAccessInfo.get(index);
+            TsPrimitiveType value =
+                timeValuePairIterator.getPrimitiveObject(accessInfo, 
columnIndex);
+            valueChunkWriter.write(
+                times[index],
+                value == null ? Binary.EMPTY_VALUE : value.getBinary(),
+                value == null);
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", 
dataTypes.get(columnIndex)));
+      }
+    }
+  }
+
+  @Override
+  public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
+    if (TVLIST_SORT_THRESHOLD == 0) {
+      encodeWorkingAlignedTVList(ioTaskQueue);
+      return;
+    }
+
+    AlignedChunkWriterImpl alignedChunkWriter = new 
AlignedChunkWriterImpl(schemaList);
+    // create MergeSortAlignedTVListIterator.
+    List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
+    alignedTvLists.add(list);
+    MergeSortAlignedTVListIterator timeValuePairIterator =
+        new MergeSortAlignedTVListIterator(
+            alignedTvLists, dataTypes, null, null, null, ignoreAllNullRows);
+
+    int pointNumInPage = 0;
+    int pointNumInChunk = 0;
+    long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+
+    PageColumnAccessInfo[] pageColumnAccessInfo = new 
PageColumnAccessInfo[dataTypes.size()];
+    for (int i = 0; i < pageColumnAccessInfo.length; i++) {
+      pageColumnAccessInfo[i] = new PageColumnAccessInfo();
+    }
+
+    while (timeValuePairIterator.hasNextTimeValuePair()) {
+      // prepare column access info for current page
+      int[][] accessInfo = timeValuePairIterator.getColumnAccessInfo();
+      times[pointNumInPage] = timeValuePairIterator.getTime();
+      for (int i = 0; i < dataTypes.size(); i++) {
+        pageColumnAccessInfo[i].add(accessInfo[i]);
+      }
+      timeValuePairIterator.step();
+      pointNumInPage++;
+      pointNumInChunk++;
+
+      if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE
+          || pointNumInChunk >= maxNumberOfPointsInChunk) {
+        writePageValuesIntoWriter(
+            alignedChunkWriter, times, pageColumnAccessInfo, 
timeValuePairIterator);
+        alignedChunkWriter.write(times, pointNumInPage, 0);
+        for (PageColumnAccessInfo columnAccessInfo : pageColumnAccessInfo) {
+          columnAccessInfo.reset();
+        }
+        pointNumInPage = 0;
+      }
+
+      if (pointNumInChunk >= maxNumberOfPointsInChunk) {
+        alignedChunkWriter.sealCurrentPage();
+        alignedChunkWriter.clearPageWriter();
+        try {
+          ioTaskQueue.put(alignedChunkWriter);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
+        pointNumInChunk = 0;
+      }
+    }

Review Comment:
   Is it necessary to do paging during flush here? Is it possible to just write 
from MergeSortAlignedTVListIterator to AlignedChunkWriterImpl and let the chunk 
writer handle itself?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java:
##########
@@ -252,27 +406,22 @@ protected void releaseLastTimeArray() {
   }
 
   public int delete(long lowerBound, long upperBound) {
-    int newSize = 0;
-    maxTime = Long.MIN_VALUE;
+    int deletedNumber = 0;
+    long maxTime = Long.MIN_VALUE;
+    long minTime = Long.MAX_VALUE;
     for (int i = 0; i < rowCount; i++) {
       long time = getTime(i);
-      if (time < lowerBound || time > upperBound) {
-        set(i, newSize++);
+      if (time >= lowerBound && time <= upperBound) {
+        int originRowIndex = getValueIndex(i);
+        int arrayIndex = originRowIndex / ARRAY_SIZE;
+        int elementIndex = originRowIndex % ARRAY_SIZE;
+        markNullValue(arrayIndex, elementIndex);
+        deletedNumber++;

Review Comment:
   May check if the time is already deleted.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java:
##########
@@ -413,24 +582,164 @@ public void encode(BlockingQueue<Object> ioTaskQueue) {
     }
   }
 
+  private Pair<Long, Integer> writeData(
+      ChunkWriterImpl chunkWriterImpl,
+      TimeValuePair tvPair,
+      long dataSizeInCurrentChunk,
+      int pointNumInCurrentChunk) {
+    switch (schema.getType()) {
+      case BOOLEAN:
+        chunkWriterImpl.write(tvPair.getTimestamp(), 
tvPair.getValue().getBoolean());
+        dataSizeInCurrentChunk += 8L + 1L;
+        break;
+      case INT32:
+      case DATE:
+        chunkWriterImpl.write(tvPair.getTimestamp(), 
tvPair.getValue().getInt());
+        dataSizeInCurrentChunk += 8L + 4L;
+        break;
+      case INT64:
+      case TIMESTAMP:
+        chunkWriterImpl.write(tvPair.getTimestamp(), 
tvPair.getValue().getLong());
+        dataSizeInCurrentChunk += 8L + 8L;
+        break;
+      case FLOAT:
+        chunkWriterImpl.write(tvPair.getTimestamp(), 
tvPair.getValue().getFloat());
+        dataSizeInCurrentChunk += 8L + 4L;
+        break;
+      case DOUBLE:
+        chunkWriterImpl.write(tvPair.getTimestamp(), 
tvPair.getValue().getDouble());
+        dataSizeInCurrentChunk += 8L + 8L;
+        break;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        Binary value = tvPair.getValue().getBinary();
+        chunkWriterImpl.write(tvPair.getTimestamp(), value);
+        dataSizeInCurrentChunk += 8L + getBinarySize(value);
+        break;
+      default:
+        LOGGER.error("WritableMemChunk does not support data type: {}", 
schema.getType());
+        break;
+    }
+    pointNumInCurrentChunk++;
+    return new Pair<>(dataSizeInCurrentChunk, pointNumInCurrentChunk);
+  }
+
+  @Override
+  public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
+    if (TVLIST_SORT_THRESHOLD == 0) {
+      encodeWorkingTVList(ioTaskQueue);
+      return;
+    }
+
+    ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
+    long dataSizeInCurrentChunk = 0;
+    int pointNumInCurrentChunk = 0;
+
+    // create MergeSortTvListIterator. It need not handle float/double 
precision here.
+    List<TVList> tvLists = new ArrayList<>(sortedList);
+    tvLists.add(list);
+    MergeSortTvListIterator timeValuePairIterator = new 
MergeSortTvListIterator(tvLists);
+
+    TimeValuePair prevTvPair = null;
+    while (timeValuePairIterator.hasNextTimeValuePair()) {
+      TimeValuePair currTvPair = timeValuePairIterator.nextTimeValuePair();
+      if (prevTvPair == null) {
+        prevTvPair = currTvPair;
+        continue;
+      }
+      Pair<Long, Integer> updatedStats =
+          writeData(chunkWriterImpl, prevTvPair, dataSizeInCurrentChunk, 
pointNumInCurrentChunk);
+      dataSizeInCurrentChunk = updatedStats.left;
+      pointNumInCurrentChunk = updatedStats.right;
+      prevTvPair = currTvPair;
+
+      if (pointNumInCurrentChunk > MAX_NUMBER_OF_POINTS_IN_CHUNK
+          || dataSizeInCurrentChunk > TARGET_CHUNK_SIZE) {
+        chunkWriterImpl.sealCurrentPage();
+        chunkWriterImpl.clearPageWriter();
+        try {
+          ioTaskQueue.put(chunkWriterImpl);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        chunkWriterImpl = createIChunkWriter();
+        dataSizeInCurrentChunk = 0;
+        pointNumInCurrentChunk = 0;
+      }
+    }
+    // last point for SDT
+    if (prevTvPair != null) {
+      chunkWriterImpl.setLastPoint(true);
+      Pair<Long, Integer> updatedStats =
+          writeData(chunkWriterImpl, prevTvPair, dataSizeInCurrentChunk, 
pointNumInCurrentChunk);
+      pointNumInCurrentChunk = updatedStats.right;
+    }
+
+    if (pointNumInCurrentChunk != 0) {
+      chunkWriterImpl.sealCurrentPage();
+      chunkWriterImpl.clearPageWriter();
+      try {
+        ioTaskQueue.put(chunkWriterImpl);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Release process for memtable flush. Release the TVList if there is no 
query on it, otherwise
+   * set query owner and release the TVList until query finishes.
+   *
+   * @param tvList
+   */
+  private void maybeReleaseTvList(TVList tvList) {
+    tvList.lockQueryList();
+    try {
+      if (tvList.getQueryContextList().isEmpty()) {
+        tvList.clear();
+      } else {
+        QueryContext firstQuery = tvList.getQueryContextList().get(0);
+        // transfer memory from write process to read process. Here it 
reserves read memory and
+        // releaseFlushedMemTable will release write memory.
+        if (firstQuery instanceof FragmentInstanceContext) {
+          MemoryReservationManager memoryReservationManager =
+              ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
+          
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
+        }
+        // update current TVList owner to first query in the list
+        tvList.setOwnerQuery(firstQuery);
+      }
+    } finally {
+      tvList.unlockQueryList();
+    }
+  }
+
   @Override
   public void release() {
-    if (list.getReferenceCount() == 0) {
-      list.clear();
+    maybeReleaseTvList(list);
+    for (TVList tvList : sortedList) {
+      maybeReleaseTvList(tvList);
     }
   }
 
   @Override
   public int serializedSize() {
-    return schema.serializedSize() + list.serializedSize();
+    int serializedSize = schema.serializedSize() + list.serializedSize();
+    for (TVList tvList : sortedList) {
+      serializedSize += tvList.serializedSize();
+    }
+    return serializedSize;
   }
 
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     byte[] bytes = new byte[schema.serializedSize()];
     schema.serializeTo(ByteBuffer.wrap(bytes));
     buffer.put(bytes);
-
+    for (TVList tvList : sortedList) {
+      tvList.serializeToWAL(buffer);
+    }

Review Comment:
   Deserialization. Add associated tests.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java:
##########
@@ -321,17 +476,26 @@ protected long[] cloneTime(long[] array) {
     return cloneArray;
   }
 
-  void updateMaxTimeAndSorted(long[] time, int start, int end) {
+  void updateMinMaxTimeAndSorted(long[] time, int start, int end) {
     int length = time.length;
     long inPutMinTime = Long.MAX_VALUE;
     boolean inputSorted = true;
+    int inputSeqRowCount = 0;
     for (int i = start; i < end; i++) {
       inPutMinTime = Math.min(inPutMinTime, time[i]);
       maxTime = Math.max(maxTime, time[i]);
-      if (inputSorted && i < length - 1 && time[i] > time[i + 1]) {
-        inputSorted = false;
+      minTime = Math.min(minTime, time[i]);
+      if (inputSorted) {
+        if (i < length - 1 && time[i] > time[i + 1]) {
+          inputSorted = false;
+        } else {
+          inputSeqRowCount++;
+        }
       }
     }
+    if (sorted && (rowCount == 0 || time[start] > getTime(rowCount - 1))) {
+      seqRowCount += inputSeqRowCount;
+    }

Review Comment:
   I think if time[start] == getTime(rowCount -1), the list is still sorted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to