XNX02 commented on code in PR #16201:
URL: https://github.com/apache/iotdb/pull/16201#discussion_r2282720034
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java:
##########
@@ -462,17 +469,32 @@ private void moveToNextChunkReader() throws IOException,
IllegalStateException {
chunkHeader.getMeasurementID(),
(measurement, index) -> Objects.nonNull(index) ? index + 1
: 0);
- // Emit when encountered non-sequential value chunk, or the chunk
list size exceeds
+ // Emit when encountered non-sequential value chunk, or the chunk
size exceeds
// certain value to avoid OOM
// Do not record or end current value chunks when there are empty
chunks
if (chunkHeader.getDataSize() == 0) {
break;
}
boolean needReturn = false;
- if (lastIndex >= 0
- && (valueIndex != lastIndex
- || valueChunkList.size() >=
pipeMaxAlignedSeriesNumInOneBatch)) {
- needReturn = recordAlignedChunk(valueChunkList, marker);
+ final long timeChunkSize =
+ lastIndex >= 0
+ ?
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
+ : 0;
+ if (lastIndex >= 0) {
+ if (valueIndex != lastIndex) {
+ needReturn = recordAlignedChunk(valueChunkList, marker);
+ } else {
+ final long chunkSize = timeChunkSize + valueChunkSize;
+ if (chunkSize + chunkHeader.getDataSize()
Review Comment:
Would it be better to add the new chunk to the current batch for simpler
logic, or leave it for the next batch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]