[ https://issues.apache.org/jira/browse/KAFKA-9703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057655#comment-17057655 ]
jiamei xie edited comment on KAFKA-9703 at 3/12/20, 7:17 AM: ------------------------------------------------------------- PR has been created in github.com/apache/kafka/pull/8286 was (Author: adally): PR has been created in ithub.com/apache/kafka/pull/8286 > ProducerBatch.split takes up too many resources if the bigBatch is huge > ----------------------------------------------------------------------- > > Key: KAFKA-9703 > URL: https://issues.apache.org/jira/browse/KAFKA-9703 > Project: Kafka > Issue Type: Bug > Reporter: jiamei xie > Priority: Major > > ProducerBatch.split takes up too many resources and might cause outOfMemory > error if the bigBatch is huge. About how I found this issue is in > https://lists.apache.org/list.html?us...@kafka.apache.org:lte=1M:MESSAGE_TOO_LARGE > Following is the code which takes a lot of resources. > {code:java} > for (Record record : recordBatch) { > assert thunkIter.hasNext(); > Thunk thunk = thunkIter.next(); > if (batch == null) > batch = createBatchOffAccumulatorForRecord(record, > splitBatchSize); > // A newly created batch can always host the first message. > if (!batch.tryAppendForSplit(record.timestamp(), record.key(), > record.value(), record.headers(), thunk)) { > batches.add(batch); > batch = createBatchOffAccumulatorForRecord(record, > splitBatchSize); > batch.tryAppendForSplit(record.timestamp(), record.key(), > record.value(), record.headers(), thunk); > } > {code} > Refer to RecordAccumulator#tryAppend, we can call closeForRecordAppends() > after a batch is full. > {code:java} > private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] > value, Header[] headers, > Callback callback, > Deque<ProducerBatch> deque, long nowMs) { > ProducerBatch last = deque.peekLast(); > if (last != null) { > FutureRecordMetadata future = last.tryAppend(timestamp, key, > value, headers, callback, nowMs); > if (future == null) > last.closeForRecordAppends(); > else > return new RecordAppendResult(future, deque.size() > 1 || > last.isFull(), false, false); > } > return null; > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)