[jira] [Commented] (KAFKA-14079) Source task will not commit offsets and develops memory leak if "error.tolerance" is set to "all"

2022-07-26 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571601#comment-17571601
 ] 

Randall Hauch commented on KAFKA-14079:
---

Merged to the `3.3` branch with permission from the 3.3 release manager.

> Source task will not commit offsets and develops memory leak if 
> "error.tolerance" is set to "all"
> -
>
> Key: KAFKA-14079
> URL: https://issues.apache.org/jira/browse/KAFKA-14079
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Christopher L. Shannon
>Assignee: Christopher L. Shannon
>Priority: Critical
> Fix For: 3.3.0, 3.2.1, 3.4.0
>
>
> KAFKA-13348 added the ability to ignore producer exceptions by setting 
> {{error.tolerance}} to {{{}all{}}}.  When this is set to all a null record 
> metadata is passed to commitRecord() and the task continues.
> The issue is that records are tracked by {{SubmittedRecords}} and the first 
> time an error happens the code does not ack the record with the error and 
> just skips it so it will not have the offsets committed or be removed from 
> SubmittedRecords before calling commitRecord(). 
> This leads to a bug where future offsets won't be committed anymore and also 
> a memory leak because the algorithm that removes acked records from the 
> internal map to commit offsets [looks 
> |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177]
>  at the head of the Deque where the records are tracked in and if it sees the 
> record is unacked it will not process anymore removals. This leads to all new 
> records that go through the task to continue to be added and not have offsets 
> committed and never removed from tracking until an OOM error occurs.
> The fix is to make sure to ack the failed records so they can have their 
> offsets commited and be removed from tracking. This is fine to do as the 
> records are intended to be skipped and not reprocessed. Metrics also need to 
> be updated as well.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14079) Source task will not commit offsets and develops memory leak if "error.tolerance" is set to "all"

2022-07-19 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568626#comment-17568626
 ] 

Randall Hauch commented on KAFKA-14079:
---

Following up with some additional detail:

This issue can affect users that are upgrading to AK 3.2.0, even if they don't 
modify any Connect worker config or connector configurations. For example, if a 
user has a pre-AK 3.2.0 Connect installation running with one or more source 
connector configurations that use {{{}error.tolerance=all{}}}, then when that 
Connect installation is upgraded to AK 3.2.0 _and_ subsequently the producer 
fails to send and ack messages generated by the source connector (e.g., message 
too large, etc.), then Connect will continue to write records to topics by will 
no longer commit source offsets for that connector. As mentioned above, Connect 
will accumulate those additional records in-memory, causing the worker to 
eventually fail with an OOM.

Unfortunately, restarting is not likely to be helpful, either: the source 
offsets are not changed/committed once this condition happens, so upon restart 
the connector will resume from the previously-committed source offsets and will 
likely regenerate the same problematic messages as before, triggering the 
problem again and causing the same OOM.

The only way to recover is to fix the underlying problem reported by the 
producer (e.g., message too large), and restart the Connect workers. Luckily 
the problems reported by the producer are captured in the worker logs.Note that 
changing the connector configuration to use {{error.tolerance=none}} will cause 
the connector to stop/fail as soon as the producer fails to write a record to 
the topic (e.g., message too large), and will not generate duplicate messages 
beyond the first problematic one (like with {{{}error.tolerance=all{}}}). But 
again, the underlying problem must be corrected before the connector can be 
restarted successfully.

This issue does not affect:
 * sink connectors;
 * source connector configurations that use {{{}error.tolerance=none{}}}, which 
is the default behavior; or
 * source connectors that never use or rely upon source offsets (a smallish 
fraction of all source connector types)

Most source connectors do rely upon source offsets, though, so this is a fairly 
serious issue.

Thanks, [~cshannon] and [~ChrisEgerton] for the quick work and review of these 
PRs. Both PRs linked above (one for the `trunk` branch and one for the `3.2` 
branch) have been merged. The `3.2` PR was merged before the first 3.2.1 RC, 
and so the AK 3.2.1 release should include this fix.

> Source task will not commit offsets and develops memory leak if 
> "error.tolerance" is set to "all"
> -
>
> Key: KAFKA-14079
> URL: https://issues.apache.org/jira/browse/KAFKA-14079
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Christopher L. Shannon
>Assignee: Christopher L. Shannon
>Priority: Critical
> Fix For: 3.3.0, 3.2.1
>
>
> KAFKA-13348 added the ability to ignore producer exceptions by setting 
> {{error.tolerance}} to {{{}all{}}}.  When this is set to all a null record 
> metadata is passed to commitRecord() and the task continues.
> The issue is that records are tracked by {{SubmittedRecords}} and the first 
> time an error happens the code does not ack the record with the error and 
> just skips it so it will not have the offsets committed or be removed from 
> SubmittedRecords before calling commitRecord(). 
> This leads to a bug where future offsets won't be committed anymore and also 
> a memory leak because the algorithm that removes acked records from the 
> internal map to commit offsets [looks 
> |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177]
>  at the head of the Deque where the records are tracked in and if it sees the 
> record is unacked it will not process anymore removals. This leads to all new 
> records that go through the task to continue to be added and not have offsets 
> committed and never removed from tracking until an OOM error occurs.
> The fix is to make sure to ack the failed records so they can have their 
> offsets commited and be removed from tracking. This is fine to do as the 
> records are intended to be skipped and not reprocessed. Metrics also need to 
> be updated as well.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14079) Source task will not commit offsets and develops memory leak if "error.tolerance" is set to "all"

2022-07-17 Thread Christopher L. Shannon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17567745#comment-17567745
 ] 

Christopher L. Shannon commented on KAFKA-14079:


[~ChrisEgerton] - I agree, I updated the title and description and pushed a new 
PR update.

 

> Source task will not commit offsets and develops memory leak if 
> "error.tolerance" is set to "all"
> -
>
> Key: KAFKA-14079
> URL: https://issues.apache.org/jira/browse/KAFKA-14079
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Christopher L. Shannon
>Priority: Major
> Fix For: 3.2.1
>
>
> KAFKA-13348 added the ability to ignore producer exceptions by setting 
> {{error.tolerance}} to {{{}all{}}}.  When this is set to all a null record 
> metadata is passed to commitRecord() and the task continues.
> The issue is that records are tracked by \{{SubmittedRecords }}and the first 
> time an error happens the code does not remove the record with the error from 
> SubmittedRecords before calling commitRecord(). 
> This leads to a bug where offsets won't be commited anymore and also a memory 
> leak because the algorithm that removes acked records from the internal map 
> [looks 
> |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177]at
>  the head of the Deque where the records are tracked in and if it sees the 
> record is unacked it will not process anymore removals. This leads to all new 
> records that go through the task to continue to be added and not commited and 
> never removed until an OOM error occurrs.
> The fix is to make sure to ack the failed records before calling 
> commitRecord(). The failed records can be acked and commited as they won't be 
> retried. Metrics also need to be updated as well.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)