jt2594838 commented on code in PR #12720:
URL: https://github.com/apache/iotdb/pull/12720#discussion_r1639355843


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
       throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
+
+    long[] memIncrements = new long[3];
     // device -> measurement -> adding TVList size
-    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = 
new HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
+      if (insertRowNode.isAligned()) {
+        handleAlignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForAligned);
+      } else {
+        handleUnalignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForNonAligned);
+      }
+    }
+    updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
+    return memIncrements;
+  }
+
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private void handleAlignedData(
+      InsertRowNode insertRowNode,
+      long[] memIncrements,
+      Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned) {
+    long memTableIncrement = memIncrements[0];
+    long textDataIncrement = memIncrements[1];
+    long chunkMetadataIncrement = memIncrements[2];
+
+    IDeviceID deviceId = insertRowNode.getDeviceID();
+    TSDataType[] dataTypes = insertRowNode.getDataTypes();
+    Object[] values = insertRowNode.getValues();
+    String[] measurements = insertRowNode.getMeasurements();
+
+    if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+        && !increasingMemTableInfoForAligned.containsKey(deviceId)) {
+      // For new device of this mem table
+      // ChunkMetadataIncrement
+      chunkMetadataIncrement +=
+          ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
+              * dataTypes.length;
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
-        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
-            && (!increasingMemTableInfo.containsKey(deviceId)
-                || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
-          // ChunkMetadataIncrement
-          chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
-          memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
-          increasingMemTableInfo
-              .computeIfAbsent(deviceId, k -> new HashMap<>())
-              .putIfAbsent(measurements[i], 1);
-        } else {
-          // here currentChunkPointNum >= 1
-          long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
-          int addingPointNum =
-              increasingMemTableInfo
-                  .computeIfAbsent(deviceId, k -> new HashMap<>())
-                  .computeIfAbsent(measurements[i], k -> 0);
+        increasingMemTableInfoForAligned
+            .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+            .left
+            .put(measurements[i], dataTypes[i]);
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
+
+    } else {
+      // For existed device of this mem table
+      AlignedWritableMemChunkGroup memChunkGroup =
+          (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
+      AlignedWritableMemChunk alignedMemChunk =
+          memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+      long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
+      List<TSDataType> dataTypesInTVList = new ArrayList<>();
+      Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+          increasingMemTableInfoForAligned.computeIfAbsent(
+              deviceId, k -> new Pair<>(new HashMap<>(), 0));
+      for (int i = 0; i < dataTypes.length; i++) {
+        // Skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+
+        int addingPointNum = addingPointNumInfo.getRight();
+        // Extending the column of aligned mem chunk
+        if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
+            && 
!increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i]))
 {
           memTableIncrement +=
-              ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
-                  ? TVList.tvListArrayMemCost(dataTypes[i])
-                  : 0;
-          
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) 
-> v + 1);
+              ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                  * AlignedTVList.valueListArrayMemCost(dataTypes[i]);

Review Comment:
   This does not seem like "increment".



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
       throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
+
+    long[] memIncrements = new long[3];
     // device -> measurement -> adding TVList size
-    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = 
new HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
+      if (insertRowNode.isAligned()) {
+        handleAlignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForAligned);
+      } else {
+        handleUnalignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForNonAligned);
+      }
+    }
+    updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
+    return memIncrements;
+  }
+
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private void handleAlignedData(
+      InsertRowNode insertRowNode,
+      long[] memIncrements,
+      Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned) {
+    long memTableIncrement = memIncrements[0];
+    long textDataIncrement = memIncrements[1];
+    long chunkMetadataIncrement = memIncrements[2];
+
+    IDeviceID deviceId = insertRowNode.getDeviceID();
+    TSDataType[] dataTypes = insertRowNode.getDataTypes();
+    Object[] values = insertRowNode.getValues();
+    String[] measurements = insertRowNode.getMeasurements();
+
+    if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+        && !increasingMemTableInfoForAligned.containsKey(deviceId)) {
+      // For new device of this mem table
+      // ChunkMetadataIncrement
+      chunkMetadataIncrement +=
+          ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
+              * dataTypes.length;
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
-        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
-            && (!increasingMemTableInfo.containsKey(deviceId)
-                || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
-          // ChunkMetadataIncrement
-          chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
-          memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
-          increasingMemTableInfo
-              .computeIfAbsent(deviceId, k -> new HashMap<>())
-              .putIfAbsent(measurements[i], 1);
-        } else {
-          // here currentChunkPointNum >= 1
-          long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
-          int addingPointNum =
-              increasingMemTableInfo
-                  .computeIfAbsent(deviceId, k -> new HashMap<>())
-                  .computeIfAbsent(measurements[i], k -> 0);
+        increasingMemTableInfoForAligned
+            .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+            .left
+            .put(measurements[i], dataTypes[i]);
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
+
+    } else {
+      // For existed device of this mem table
+      AlignedWritableMemChunkGroup memChunkGroup =
+          (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
+      AlignedWritableMemChunk alignedMemChunk =
+          memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+      long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
+      List<TSDataType> dataTypesInTVList = new ArrayList<>();
+      Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+          increasingMemTableInfoForAligned.computeIfAbsent(
+              deviceId, k -> new Pair<>(new HashMap<>(), 0));
+      for (int i = 0; i < dataTypes.length; i++) {
+        // Skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+
+        int addingPointNum = addingPointNumInfo.getRight();
+        // Extending the column of aligned mem chunk
+        if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
+            && 
!increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i]))
 {
           memTableIncrement +=
-              ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
-                  ? TVList.tvListArrayMemCost(dataTypes[i])
-                  : 0;
-          
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) 
-> v + 1);
+              ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                  * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+          
increasingMemTableInfoForAligned.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
         }
         // TEXT data mem size
-        if (dataTypes[i].isBinary() && values[i] != null) {
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
           textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
         }
       }
+      int addingPointNum = 
increasingMemTableInfoForAligned.get(deviceId).getRight();
+      // Here currentChunkPointNum + addingPointNum >= 1
+      if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
+        if (alignedMemChunk != null) {
+          dataTypesInTVList.addAll(((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
+        }
+        
dataTypesInTVList.addAll(increasingMemTableInfoForAligned.get(deviceId).getLeft().values());
+        memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);

Review Comment:
   Will the calculation repeat the one in line 638?
   For example, if "s3" is a new measurement of the device, its size will be 
calculated in line 638 and here again.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
       throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
+
+    long[] memIncrements = new long[3];
     // device -> measurement -> adding TVList size
-    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = 
new HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
+      if (insertRowNode.isAligned()) {
+        handleAlignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForAligned);
+      } else {
+        handleUnalignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForNonAligned);
+      }
+    }
+    updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
+    return memIncrements;
+  }
+
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private void handleAlignedData(
+      InsertRowNode insertRowNode,
+      long[] memIncrements,
+      Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned) {
+    long memTableIncrement = memIncrements[0];
+    long textDataIncrement = memIncrements[1];
+    long chunkMetadataIncrement = memIncrements[2];
+
+    IDeviceID deviceId = insertRowNode.getDeviceID();
+    TSDataType[] dataTypes = insertRowNode.getDataTypes();
+    Object[] values = insertRowNode.getValues();
+    String[] measurements = insertRowNode.getMeasurements();
+
+    if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+        && !increasingMemTableInfoForAligned.containsKey(deviceId)) {
+      // For new device of this mem table
+      // ChunkMetadataIncrement
+      chunkMetadataIncrement +=
+          ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
+              * dataTypes.length;
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
-        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
-            && (!increasingMemTableInfo.containsKey(deviceId)
-                || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
-          // ChunkMetadataIncrement
-          chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
-          memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
-          increasingMemTableInfo
-              .computeIfAbsent(deviceId, k -> new HashMap<>())
-              .putIfAbsent(measurements[i], 1);
-        } else {
-          // here currentChunkPointNum >= 1
-          long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
-          int addingPointNum =
-              increasingMemTableInfo
-                  .computeIfAbsent(deviceId, k -> new HashMap<>())
-                  .computeIfAbsent(measurements[i], k -> 0);
+        increasingMemTableInfoForAligned
+            .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+            .left
+            .put(measurements[i], dataTypes[i]);
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
+
+    } else {
+      // For existed device of this mem table
+      AlignedWritableMemChunkGroup memChunkGroup =
+          (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
+      AlignedWritableMemChunk alignedMemChunk =
+          memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+      long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
+      List<TSDataType> dataTypesInTVList = new ArrayList<>();
+      Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+          increasingMemTableInfoForAligned.computeIfAbsent(
+              deviceId, k -> new Pair<>(new HashMap<>(), 0));
+      for (int i = 0; i < dataTypes.length; i++) {
+        // Skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+
+        int addingPointNum = addingPointNumInfo.getRight();
+        // Extending the column of aligned mem chunk
+        if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
+            && 
!increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i]))
 {
           memTableIncrement +=
-              ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
-                  ? TVList.tvListArrayMemCost(dataTypes[i])
-                  : 0;
-          
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) 
-> v + 1);
+              ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                  * AlignedTVList.valueListArrayMemCost(dataTypes[i]);

Review Comment:
   If (currentChunkPointNum + addingPointNum) == 
PrimitiveArrayManager.ARRAY_SIZE, the calculation may be imprecise.
   In the worst case, where each batch contains exactly 
PrimitiveArrayManager.ARRAY_SIZE rows, the estimation can be doubled.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
       throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
+
+    long[] memIncrements = new long[3];
     // device -> measurement -> adding TVList size
-    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = 
new HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
+      if (insertRowNode.isAligned()) {
+        handleAlignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForAligned);
+      } else {
+        handleUnalignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForNonAligned);
+      }
+    }
+    updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
+    return memIncrements;
+  }
+
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private void handleAlignedData(

Review Comment:
   The name is too broad. Something like `calculateAlignedRowMemory` woule be 
better.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
       throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
+
+    long[] memIncrements = new long[3];
     // device -> measurement -> adding TVList size
-    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = 
new HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
+      if (insertRowNode.isAligned()) {
+        handleAlignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForAligned);
+      } else {
+        handleUnalignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForNonAligned);
+      }
+    }
+    updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
+    return memIncrements;
+  }
+
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private void handleAlignedData(
+      InsertRowNode insertRowNode,
+      long[] memIncrements,
+      Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned) {
+    long memTableIncrement = memIncrements[0];
+    long textDataIncrement = memIncrements[1];
+    long chunkMetadataIncrement = memIncrements[2];
+
+    IDeviceID deviceId = insertRowNode.getDeviceID();
+    TSDataType[] dataTypes = insertRowNode.getDataTypes();
+    Object[] values = insertRowNode.getValues();
+    String[] measurements = insertRowNode.getMeasurements();
+
+    if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+        && !increasingMemTableInfoForAligned.containsKey(deviceId)) {
+      // For new device of this mem table
+      // ChunkMetadataIncrement
+      chunkMetadataIncrement +=
+          ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
+              * dataTypes.length;
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
-        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
-            && (!increasingMemTableInfo.containsKey(deviceId)
-                || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
-          // ChunkMetadataIncrement
-          chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
-          memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
-          increasingMemTableInfo
-              .computeIfAbsent(deviceId, k -> new HashMap<>())
-              .putIfAbsent(measurements[i], 1);
-        } else {
-          // here currentChunkPointNum >= 1
-          long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
-          int addingPointNum =
-              increasingMemTableInfo
-                  .computeIfAbsent(deviceId, k -> new HashMap<>())
-                  .computeIfAbsent(measurements[i], k -> 0);
+        increasingMemTableInfoForAligned
+            .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+            .left
+            .put(measurements[i], dataTypes[i]);
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
+
+    } else {
+      // For existed device of this mem table
+      AlignedWritableMemChunkGroup memChunkGroup =
+          (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
+      AlignedWritableMemChunk alignedMemChunk =
+          memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+      long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
+      List<TSDataType> dataTypesInTVList = new ArrayList<>();
+      Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+          increasingMemTableInfoForAligned.computeIfAbsent(
+              deviceId, k -> new Pair<>(new HashMap<>(), 0));
+      for (int i = 0; i < dataTypes.length; i++) {
+        // Skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+
+        int addingPointNum = addingPointNumInfo.getRight();
+        // Extending the column of aligned mem chunk
+        if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
+            && 
!increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i]))
 {
           memTableIncrement +=
-              ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
-                  ? TVList.tvListArrayMemCost(dataTypes[i])
-                  : 0;
-          
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) 
-> v + 1);
+              ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                  * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+          
increasingMemTableInfoForAligned.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
         }
         // TEXT data mem size
-        if (dataTypes[i].isBinary() && values[i] != null) {
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
           textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
         }
       }
+      int addingPointNum = 
increasingMemTableInfoForAligned.get(deviceId).getRight();
+      // Here currentChunkPointNum + addingPointNum >= 1
+      if (((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {

Review Comment:
   In lines 623 and 627, you initialize the `currentChunkPointNum` and 
`addingPointNum` to 0. They have not changed afterward. So it is possible that 
`currentChunkPointNum + addingPointNum == 0`.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
       throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
+
+    long[] memIncrements = new long[3];
     // device -> measurement -> adding TVList size
-    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = 
new HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
+      if (insertRowNode.isAligned()) {
+        handleAlignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForAligned);
+      } else {
+        handleUnalignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForNonAligned);
+      }
+    }
+    updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);

Review Comment:
   Seeing this, I would recommend wrapping them with a simple class. It may be 
easy for other people to confuse or make a typo.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
     return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
-  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
   private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode 
insertRowsNode)
       throws WriteProcessException {
-    // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
-    long memTableIncrement = 0L;
-    long textDataIncrement = 0L;
-    long chunkMetadataIncrement = 0L;
+
+    long[] memIncrements = new long[3];
     // device -> measurement -> adding TVList size
-    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new 
HashMap<>();
+    Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned = 
new HashMap<>();
+    // device -> (measurements -> datatype, adding aligned TVList size)
+    Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned =
+        new HashMap<>();
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
-      IDeviceID deviceId = insertRowNode.getDeviceID();
-      TSDataType[] dataTypes = insertRowNode.getDataTypes();
-      Object[] values = insertRowNode.getValues();
-      String[] measurements = insertRowNode.getMeasurements();
+      if (insertRowNode.isAligned()) {
+        handleAlignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForAligned);
+      } else {
+        handleUnalignedData(insertRowNode, memIncrements, 
increasingMemTableInfoForNonAligned);
+      }
+    }
+    updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
+    return memIncrements;
+  }
+
+  @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+  private void handleAlignedData(
+      InsertRowNode insertRowNode,
+      long[] memIncrements,
+      Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>> 
increasingMemTableInfoForAligned) {
+    long memTableIncrement = memIncrements[0];
+    long textDataIncrement = memIncrements[1];
+    long chunkMetadataIncrement = memIncrements[2];
+
+    IDeviceID deviceId = insertRowNode.getDeviceID();
+    TSDataType[] dataTypes = insertRowNode.getDataTypes();
+    Object[] values = insertRowNode.getValues();
+    String[] measurements = insertRowNode.getMeasurements();
+
+    if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+        && !increasingMemTableInfoForAligned.containsKey(deviceId)) {
+      // For new device of this mem table
+      // ChunkMetadataIncrement
+      chunkMetadataIncrement +=
+          ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
+              * dataTypes.length;
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
-        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
-            && (!increasingMemTableInfo.containsKey(deviceId)
-                || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
-          // ChunkMetadataIncrement
-          chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
-          memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
-          increasingMemTableInfo
-              .computeIfAbsent(deviceId, k -> new HashMap<>())
-              .putIfAbsent(measurements[i], 1);
-        } else {
-          // here currentChunkPointNum >= 1
-          long currentChunkPointNum = 
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
-          int addingPointNum =
-              increasingMemTableInfo
-                  .computeIfAbsent(deviceId, k -> new HashMap<>())
-                  .computeIfAbsent(measurements[i], k -> 0);
+        increasingMemTableInfoForAligned
+            .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+            .left
+            .put(measurements[i], dataTypes[i]);
+        // TEXT data mem size
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+        }
+      }
+
+    } else {
+      // For existed device of this mem table
+      AlignedWritableMemChunkGroup memChunkGroup =
+          (AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId);
+      AlignedWritableMemChunk alignedMemChunk =
+          memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+      long currentChunkPointNum = alignedMemChunk == null ? 0 : 
alignedMemChunk.alignedListSize();
+      List<TSDataType> dataTypesInTVList = new ArrayList<>();
+      Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+          increasingMemTableInfoForAligned.computeIfAbsent(
+              deviceId, k -> new Pair<>(new HashMap<>(), 0));
+      for (int i = 0; i < dataTypes.length; i++) {
+        // Skip failed Measurements
+        if (dataTypes[i] == null || measurements[i] == null) {
+          continue;
+        }
+
+        int addingPointNum = addingPointNumInfo.getRight();
+        // Extending the column of aligned mem chunk
+        if ((alignedMemChunk != null && 
!alignedMemChunk.containsMeasurement(measurements[i]))
+            && 
!increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i]))
 {
           memTableIncrement +=
-              ((currentChunkPointNum + addingPointNum) % 
PrimitiveArrayManager.ARRAY_SIZE) == 0
-                  ? TVList.tvListArrayMemCost(dataTypes[i])
-                  : 0;
-          
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v) 
-> v + 1);
+              ((currentChunkPointNum + addingPointNum) / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+                  * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+          
increasingMemTableInfoForAligned.get(deviceId).left.put(measurements[i], 
dataTypes[i]);
         }
         // TEXT data mem size
-        if (dataTypes[i].isBinary() && values[i] != null) {
+        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {

Review Comment:
   How about BINARY and BLOB?



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java:
##########
@@ -613,6 +613,65 @@ record = new TSRecord(102, "root.vehicle.d2");
     Assert.assertEquals(memTable1.memSize(), memTable2.memSize());
   }
 
+  @Test
+  public void testRamCostInsertSameDataBy2Ways()

Review Comment:
   Better to add some tests against the corner cases I mentioned above (if they 
are real).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to