[ https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567745#comment-17567745 ]
Christopher L. Shannon edited comment on KAFKA-14079 at 7/17/22 10:07 PM: -------------------------------------------------------------------------- [~ChrisEgerton] - I agree, I updated the title and description and pushed a new PR update. was (Author: christopher.l.shannon): [~ChrisEgerton] - I agree, I updated the title and description and pushed a new PR update. > Source task will not commit offsets and develops memory leak if > "error.tolerance" is set to "all" > ------------------------------------------------------------------------------------------------- > > Key: KAFKA-14079 > URL: https://issues.apache.org/jira/browse/KAFKA-14079 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 3.2.0 > Reporter: Christopher L. Shannon > Priority: Major > Fix For: 3.2.1 > > > KAFKA-13348 added the ability to ignore producer exceptions by setting > {{error.tolerance}} to {{{}all{}}}. When this is set to all a null record > metadata is passed to commitRecord() and the task continues. > The issue is that records are tracked by \{{SubmittedRecords }}and the first > time an error happens the code does not remove the record with the error from > SubmittedRecords before calling commitRecord(). > This leads to a bug where offsets won't be commited anymore and also a memory > leak because the algorithm that removes acked records from the internal map > [looks > |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177]at > the head of the Deque where the records are tracked in and if it sees the > record is unacked it will not process anymore removals. This leads to all new > records that go through the task to continue to be added and not commited and > never removed until an OOM error occurrs. > The fix is to make sure to ack the failed records before calling > commitRecord(). The failed records can be acked and commited as they won't be > retried. Metrics also need to be updated as well. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)