Greg Harris created KAFKA-16087:
-----------------------------------

             Summary: Sink connector dropping incorrect record when 
ErrantRecordReporter used asynchronously
                 Key: KAFKA-16087
                 URL: https://issues.apache.org/jira/browse/KAFKA-16087
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 3.7.0
            Reporter: Greg Harris
            Assignee: Greg Harris


The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
records to the connector DLQ topic. The implementation of this reporter 
interacts with the ProcessingContext within the per-task 
RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
the current operation, such as what error has occurred or what record is being 
operated on.

The ProcessingContext and RetryWithToleranceOperator is also used by the 
converter and transformation pipeline of the connector for similar reasons. 
When the ErrantRecordReporter#report function is called from SinkTask#put, 
there is no contention over the mutable state, as the thread used for 
SinkTask#put is also responsible for converting and transforming the record. 
However, if ErrantRecordReporter#report is called by an extra thread within the 
SinkTask, there is thread contention on the single mutable ProcessingContext.

This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
synchronized keyword was added to all methods of RetryWithToleranceOperator 
which interact with the ProcessingContext. However, this solution still allows 
the RWTO methods to interleave, and produce unintended data races. Consider the 
following interleaving:

1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call 
ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
5. Thread 1 calls RWTO#execute for a converter or transformation operation. For 
example, [converting 
headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
6. The operation succeeds, and the ProcessingContext is left with error == 
null, or equivalently failed() == false.
7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
[RWTO 
executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
 and returns.
9. The operation leaves ProcessingContext with error != null, or equivalently 
failed() == true.
10. Thread 1 then resumes execution, and calls [RWTO 
failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
 which evaluates to true.
11. Thread 1 then drops Record B, even though the header conversion succeeded 
without error.
12. Record B is never delivered to the Sink Task, and never delivered to the 
error reporter for processing, despite having produced no error during 
processing.

This per-method synchronization for returning nulls and errors separately is 
insufficient, and either the data sharing should be avoided or a different 
locking mechanism should be used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to