jt2594838 commented on code in PR #12720:
URL: https://github.com/apache/iotdb/pull/12720#discussion_r1639355843
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java:
##########
@@ -561,55 +557,163 @@ private long[] checkMemCostAndAddToTspInfoForRow(
return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
}
- @SuppressWarnings("squid:S3776") // High Cognitive Complexity
private long[] checkMemCostAndAddToTspInfoForRows(InsertRowsNode
insertRowsNode)
throws WriteProcessException {
- // Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
- long memTableIncrement = 0L;
- long textDataIncrement = 0L;
- long chunkMetadataIncrement = 0L;
+
+ long[] memIncrements = new long[3];
// device -> measurement -> adding TVList size
- Map<IDeviceID, Map<String, Integer>> increasingMemTableInfo = new
HashMap<>();
+ Map<IDeviceID, Map<String, Integer>> increasingMemTableInfoForNonAligned =
new HashMap<>();
+ // device -> (measurements -> datatype, adding aligned TVList size)
+ Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>>
increasingMemTableInfoForAligned =
+ new HashMap<>();
for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
- IDeviceID deviceId = insertRowNode.getDeviceID();
- TSDataType[] dataTypes = insertRowNode.getDataTypes();
- Object[] values = insertRowNode.getValues();
- String[] measurements = insertRowNode.getMeasurements();
+ if (insertRowNode.isAligned()) {
+ handleAlignedData(insertRowNode, memIncrements,
increasingMemTableInfoForAligned);
+ } else {
+ handleUnalignedData(insertRowNode, memIncrements,
increasingMemTableInfoForNonAligned);
+ }
+ }
+ updateMemoryInfo(memIncrements[0], memIncrements[2], memIncrements[1]);
+ return memIncrements;
+ }
+
+ @SuppressWarnings("squid:S3776") // High Cognitive Complexity
+ private void handleAlignedData(
+ InsertRowNode insertRowNode,
+ long[] memIncrements,
+ Map<IDeviceID, Pair<Map<String, TSDataType>, Integer>>
increasingMemTableInfoForAligned) {
+ long memTableIncrement = memIncrements[0];
+ long textDataIncrement = memIncrements[1];
+ long chunkMetadataIncrement = memIncrements[2];
+
+ IDeviceID deviceId = insertRowNode.getDeviceID();
+ TSDataType[] dataTypes = insertRowNode.getDataTypes();
+ Object[] values = insertRowNode.getValues();
+ String[] measurements = insertRowNode.getMeasurements();
+
+ if (workMemTable.checkIfChunkDoesNotExist(deviceId,
AlignedPath.VECTOR_PLACEHOLDER)
+ && !increasingMemTableInfoForAligned.containsKey(deviceId)) {
+ // For new device of this mem table
+ // ChunkMetadataIncrement
+ chunkMetadataIncrement +=
+ ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER,
TSDataType.VECTOR)
+ * dataTypes.length;
+ memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
- if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
- && (!increasingMemTableInfo.containsKey(deviceId)
- ||
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
- // ChunkMetadataIncrement
- chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
- memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
- increasingMemTableInfo
- .computeIfAbsent(deviceId, k -> new HashMap<>())
- .putIfAbsent(measurements[i], 1);
- } else {
- // here currentChunkPointNum >= 1
- long currentChunkPointNum =
workMemTable.getCurrentTVListSize(deviceId, measurements[i]);
- int addingPointNum =
- increasingMemTableInfo
- .computeIfAbsent(deviceId, k -> new HashMap<>())
- .computeIfAbsent(measurements[i], k -> 0);
+ increasingMemTableInfoForAligned
+ .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1))
+ .left
+ .put(measurements[i], dataTypes[i]);
+ // TEXT data mem size
+ if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
+ textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+ }
+ }
+
+ } else {
+ // For existed device of this mem table
+ AlignedWritableMemChunkGroup memChunkGroup =
+ (AlignedWritableMemChunkGroup)
workMemTable.getMemTableMap().get(deviceId);
+ AlignedWritableMemChunk alignedMemChunk =
+ memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+ long currentChunkPointNum = alignedMemChunk == null ? 0 :
alignedMemChunk.alignedListSize();
+ List<TSDataType> dataTypesInTVList = new ArrayList<>();
+ Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
+ increasingMemTableInfoForAligned.computeIfAbsent(
+ deviceId, k -> new Pair<>(new HashMap<>(), 0));
+ for (int i = 0; i < dataTypes.length; i++) {
+ // Skip failed Measurements
+ if (dataTypes[i] == null || measurements[i] == null) {
+ continue;
+ }
+
+ int addingPointNum = addingPointNumInfo.getRight();
+ // Extending the column of aligned mem chunk
+ if ((alignedMemChunk != null &&
!alignedMemChunk.containsMeasurement(measurements[i]))
+ &&
!increasingMemTableInfoForAligned.get(deviceId).left.containsKey(measurements[i]))
{
memTableIncrement +=
- ((currentChunkPointNum + addingPointNum) %
PrimitiveArrayManager.ARRAY_SIZE) == 0
- ? TVList.tvListArrayMemCost(dataTypes[i])
- : 0;
-
increasingMemTableInfo.get(deviceId).computeIfPresent(measurements[i], (k, v)
-> v + 1);
+ ((currentChunkPointNum + addingPointNum) /
PrimitiveArrayManager.ARRAY_SIZE + 1)
+ * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
Review Comment:
This does not seem like "increment".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]