[ 
https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16087:
--------------------------------
    Description: 
The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
records to the connector DLQ topic. The implementation of this reporter 
interacts with the ProcessingContext within the per-task 
RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
the current operation, such as what error has occurred or what record is being 
operated on.

The ProcessingContext and RetryWithToleranceOperator is also used by the 
converter and transformation pipeline of the connector for similar reasons. 
When the ErrantRecordReporter#report function is called from SinkTask#put, 
there is no contention over the mutable state, as the thread used for 
SinkTask#put is also responsible for converting and transforming the record. 
However, if ErrantRecordReporter#report is called by an extra thread within the 
SinkTask, there is thread contention on the single mutable ProcessingContext.

This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
synchronized keyword was added to all methods of RetryWithToleranceOperator 
which interact with the ProcessingContext. However, this solution still allows 
the RWTO methods to interleave, and produce unintended data races. Consider the 
following interleaving:

1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call 
ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
5. Thread 1 calls RWTO#execute for a converter or transformation operation. For 
example, [converting 
headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
6. The operation succeeds, and the ProcessingContext is left with error == 
null, or equivalently failed() == false.
7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
[RWTO 
executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
 and returns.
9. The operation leaves ProcessingContext with error != null, or equivalently 
failed() == true.
10. Thread 1 then resumes execution, and calls [RWTO 
failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
 which evaluates to true.
11. Thread 1 then drops Record B, even though the header conversion succeeded 
without error.
12. Record B is never delivered to the Sink Task, and never delivered to the 
error reporter for processing, despite having produced no error during 
processing.

This per-method synchronization for returning nulls and errors separately is 
insufficient, and either the data sharing should be avoided or a different 
locking mechanism should be used.

A similar flaw exists in source connectors and asynchronous errors reported by 
the producer.

  was:
The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
records to the connector DLQ topic. The implementation of this reporter 
interacts with the ProcessingContext within the per-task 
RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
the current operation, such as what error has occurred or what record is being 
operated on.

The ProcessingContext and RetryWithToleranceOperator is also used by the 
converter and transformation pipeline of the connector for similar reasons. 
When the ErrantRecordReporter#report function is called from SinkTask#put, 
there is no contention over the mutable state, as the thread used for 
SinkTask#put is also responsible for converting and transforming the record. 
However, if ErrantRecordReporter#report is called by an extra thread within the 
SinkTask, there is thread contention on the single mutable ProcessingContext.

This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
synchronized keyword was added to all methods of RetryWithToleranceOperator 
which interact with the ProcessingContext. However, this solution still allows 
the RWTO methods to interleave, and produce unintended data races. Consider the 
following interleaving:

1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call 
ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
5. Thread 1 calls RWTO#execute for a converter or transformation operation. For 
example, [converting 
headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
6. The operation succeeds, and the ProcessingContext is left with error == 
null, or equivalently failed() == false.
7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
[RWTO 
executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
 and returns.
9. The operation leaves ProcessingContext with error != null, or equivalently 
failed() == true.
10. Thread 1 then resumes execution, and calls [RWTO 
failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
 which evaluates to true.
11. Thread 1 then drops Record B, even though the header conversion succeeded 
without error.
12. Record B is never delivered to the Sink Task, and never delivered to the 
error reporter for processing, despite having produced no error during 
processing.

This per-method synchronization for returning nulls and errors separately is 
insufficient, and either the data sharing should be avoided or a different 
locking mechanism should be used.


> Sink connector dropping incorrect record when ErrantRecordReporter used 
> asynchronously
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16087
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16087
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.7.0
>            Reporter: Greg Harris
>            Assignee: Greg Harris
>            Priority: Major
>
> The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
> records to the connector DLQ topic. The implementation of this reporter 
> interacts with the ProcessingContext within the per-task 
> RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
> the current operation, such as what error has occurred or what record is 
> being operated on.
> The ProcessingContext and RetryWithToleranceOperator is also used by the 
> converter and transformation pipeline of the connector for similar reasons. 
> When the ErrantRecordReporter#report function is called from SinkTask#put, 
> there is no contention over the mutable state, as the thread used for 
> SinkTask#put is also responsible for converting and transforming the record. 
> However, if ErrantRecordReporter#report is called by an extra thread within 
> the SinkTask, there is thread contention on the single mutable 
> ProcessingContext.
> This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
> synchronized keyword was added to all methods of RetryWithToleranceOperator 
> which interact with the ProcessingContext. However, this solution still 
> allows the RWTO methods to interleave, and produce unintended data races. 
> Consider the following interleaving:
> 1. Thread 1 converts and transforms record A successfully.
> 2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
> 3. Thread 1 queues some other thread 2 with some delay to call 
> ErrantRecordReporter#report(A).
> 4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
> 5. Thread 1 calls RWTO#execute for a converter or transformation operation. 
> For example, [converting 
> headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
> 6. The operation succeeds, and the ProcessingContext is left with error == 
> null, or equivalently failed() == false.
> 7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
> 8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
> [RWTO 
> executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
>  and returns.
> 9. The operation leaves ProcessingContext with error != null, or equivalently 
> failed() == true.
> 10. Thread 1 then resumes execution, and calls [RWTO 
> failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
>  which evaluates to true.
> 11. Thread 1 then drops Record B, even though the header conversion succeeded 
> without error.
> 12. Record B is never delivered to the Sink Task, and never delivered to the 
> error reporter for processing, despite having produced no error during 
> processing.
> This per-method synchronization for returning nulls and errors separately is 
> insufficient, and either the data sharing should be avoided or a different 
> locking mechanism should be used.
> A similar flaw exists in source connectors and asynchronous errors reported 
> by the producer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to