Caideyipi commented on code in PR #16201:
URL: https://github.com/apache/iotdb/pull/16201#discussion_r2290486291
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java:
##########
@@ -462,17 +469,32 @@ private void moveToNextChunkReader() throws IOException,
IllegalStateException {
chunkHeader.getMeasurementID(),
(measurement, index) -> Objects.nonNull(index) ? index + 1
: 0);
- // Emit when encountered non-sequential value chunk, or the chunk
list size exceeds
+ // Emit when encountered non-sequential value chunk, or the chunk
size exceeds
// certain value to avoid OOM
// Do not record or end current value chunks when there are empty
chunks
if (chunkHeader.getDataSize() == 0) {
break;
}
boolean needReturn = false;
- if (lastIndex >= 0
- && (valueIndex != lastIndex
- || valueChunkList.size() >=
pipeMaxAlignedSeriesNumInOneBatch)) {
- needReturn = recordAlignedChunk(valueChunkList, marker);
+ final long timeChunkSize =
+ lastIndex >= 0
+ ?
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
+ : 0;
+ if (lastIndex >= 0) {
+ if (valueIndex != lastIndex) {
+ needReturn = recordAlignedChunk(valueChunkList, marker);
+ } else {
+ final long chunkSize = timeChunkSize + valueChunkSize;
+ if (chunkSize + chunkHeader.getDataSize()
Review Comment:
The safer logic is leave it for the next, and the complication can be
reduced by comments
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java:
##########
@@ -359,25 +354,27 @@ private void initChunkReader(final
AbstractAlignedChunkMetadata alignedChunkMeta
measurementList.subList(deviceIdSize, measurementList.size()).clear();
dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();
- final int startOffset = offset;
for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size();
++offset) {
final IChunkMetadata metadata =
alignedChunkMetadata.getValueChunkMetadataList().get(offset);
if (metadata != null) {
- // Record the column information corresponding to Meta to fill in
Tablet
- columnTypes.add(ColumnCategory.FIELD);
- measurementList.add(metadata.getMeasurementUid());
- dataTypeList.add(metadata.getDataType());
-
final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
- if (allocatedMemoryBlockForChunk.getMemoryUsageInBytes() < size) {
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
size);
+ if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ if (valueChunkList.size() == 1) {
+ // If the first chunk exceeds the memory limit, we need to
allocate more memory
+ PipeDataNodeResourceManager.memory()
+ .forceResize(
+ allocatedMemoryBlockForChunk,
+ size -
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk));
+ }
+ break;
+ } else {
+ // Record the column information corresponding to Meta to fill in
Tablet
+ columnTypes.add(ColumnCategory.FIELD);
+ measurementList.add(metadata.getMeasurementUid());
+ dataTypeList.add(metadata.getDataType());
+ valueChunkList.add(chunk);
}
-
- valueChunkList.add(chunk);
- }
- if (offset - startOffset >= pipeMaxAlignedSeriesNumInOneBatch) {
Review Comment:
No... A bug maybe
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]