[
https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Greg Harris resolved KAFKA-16087.
---------------------------------
Fix Version/s: 3.8.0
Resolution: Fixed
> Tasks dropping incorrect records when errors.tolerance=all and errors
> reported asynchronously due to data race
> --------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-16087
> URL: https://issues.apache.org/jira/browse/KAFKA-16087
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.6.0, 3.2.0, 3.7.0
> Reporter: Greg Harris
> Assignee: Greg Harris
> Priority: Major
> Fix For: 3.8.0
>
>
> The ErrantRecordReporter introduced in KIP-610 (2.6.0) allows sink connectors
> to push records to the connector DLQ topic. The implementation of this
> reporter interacts with the ProcessingContext within the per-task
> RetryWithToleranceOperator. The ProcessingContext stores mutable state about
> the current operation, such as what error has occurred or what record is
> being operated on.
> The ProcessingContext and RetryWithToleranceOperator is also used by the
> converter and transformation pipeline of the connector for similar reasons.
> When the ErrantRecordReporter#report function is called from SinkTask#put,
> there is no contention over the mutable state, as the thread used for
> SinkTask#put is also responsible for converting and transforming the record.
> However, if ErrantRecordReporter#report is called by an extra thread within
> the SinkTask, there is thread contention on the single mutable
> ProcessingContext.
> This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the
> synchronized keyword was added to all methods of RetryWithToleranceOperator
> which interact with the ProcessingContext. However, this solution still
> allows the RWTO methods to interleave, and produce unintended data races.
> Consider the following interleaving:
> 1. Thread 1 converts and transforms record A successfully.
> 2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
> 3. Thread 1 queues some other thread 2 with some delay to call
> ErrantRecordReporter#report(A).
> 4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
> 5. Thread 1 calls RWTO#execute for a converter or transformation operation.
> For example, [converting
> headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
> 6. The operation succeeds, and the ProcessingContext is left with error ==
> null, or equivalently failed() == false.
> 7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
> 8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls
> [RWTO
> executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
> and returns.
> 9. The operation leaves ProcessingContext with error != null, or equivalently
> failed() == true.
> 10. Thread 1 then resumes execution, and calls [RWTO
> failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
> which evaluates to true.
> 11. Thread 1 then drops Record B, even though the header conversion succeeded
> without error.
> 12. Record B is never delivered to the Sink Task, and never delivered to the
> error reporter for processing, despite having produced no error during
> processing.
> This per-method synchronization for returning nulls and errors separately is
> insufficient, and either the data sharing should be avoided or a different
> locking mechanism should be used.
> A similar flaw exists in source connectors and asynchronous errors reported
> by the producer, and was introduced in KIP-779 (3.2.0)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)