[ 
https://issues.apache.org/jira/browse/KAFKA-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-5568:
---------------------------------
    Affects Version/s: 3.3.2
                       3.3.1
                       3.2.3
                       3.2.2
                       3.4.0
                       3.2.1
                       3.1.2
                       3.0.2
                       3.3.0
                       3.1.1
                       3.2.0
                       2.8.2
                       3.0.1
                       3.0.0
                       2.8.1
                       2.7.2
                       2.6.3
                       3.1.0
                       2.6.2
                       2.7.1
                       2.8.0
                       2.6.1
                       2.7.0
                       2.5.1
                       2.6.0
                       2.4.1
                       2.5.0
                       2.3.1
                       2.4.0
                       2.2.2
                       2.2.1
                       2.3.0
                       2.1.1
                       2.2.0
                       2.1.0
                       2.0.1
                       2.0.0
                       1.1.1
                       1.1.0
                       1.0.2
                       1.0.1
                       1.0.0
                       0.11.0.3
                       0.11.0.2
                       0.11.0.1
                       0.10.2.2

> Transformations that mutate topic-partitions break sink connectors that 
> manage their own configuration
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5568
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5568
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 
> 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2
>            Reporter: Ewen Cheslack-Postava
>            Priority: Major
>              Labels: needs-discussion, needs-kip
>
> KAFKA-5567 describes how offset commits for sink connectors are broken if a 
> record's topic-partition is mutated by an SMT, e.g RegexRouter or 
> TimestampRouter.
> This is also a problem for sink connectors that manage their own offsets, 
> i.e. those that store offsets elsewhere and call SinkTaskContext.rewind(). In 
> this case, the transformation has already been applied by the time the 
> SinkTask sees it, so there is no way it could correctly track offsets and 
> call rewind() with valid values. For example, this would make the offset 
> tracking that Confluent's HDFS connector does by working with filenames no 
> longer work. Even if they were stored separately in a file rather than 
> relying on filenames, it still wouldn't have ever had the correct offsets to 
> write to that file.
> There are a couple of options:
> 1. Decide that this is an acceptable consequence of combining SMTs with sink 
> connectors and it's a limitation we accept. You can either transform the data 
> via Kafka Streams instead or accept that you can't do these "routing" type 
> operations in the sink connector unless it supports it natively. This *might* 
> not be the wrong choice since we think there are very few connectors that 
> track their own offsets. In the case of HDFS, we might rarely hit this issue 
> because it supports its own file/directory partitioning schemes anyway so 
> doing this via SMTs isn't as necessary there.
> 2. Try to expose the original record information to the sink connector via 
> the records. I can think of 2 ways this could be done. The first is to attach 
> the original record to each SinkRecord. The cost here is relatively high in 
> terms of memory, especially for sink connectors that need to buffer data. The 
> second is to add fields to SinkRecords for originalTopic() and 
> originalPartition(). This feels a bit ugly to me but might be the least 
> intrusive change API-wise and we can guarantee those fields aren't 
> overwritten by not allowing public constructors to set them.
> 3. Try to expose the original record information to the sink connector via a 
> new pre-processing callback. The idea is similar to preCommit, but instead 
> would happen before any processing occurs. Taken to its logical conclusion 
> this turns into a sort of interceptor interface (preConversion, 
> preTransformation, put, and preCommit).
> 4. Add something to the Context that allows the connector to get back at the 
> original information. Maybe some sort of IdentityMap<Record, Record> 
> originalPutRecords() that would let you get a mapping back to the original 
> records. One nice aspect of this is that the connector can hold onto the 
> original only if it needs it.
> 5. A very intrusive change/extension to the SinkTask API that passes in pairs 
> of <original, transformed> records. Accomplishes the same as 2 but requires 
> what I think are more complicated changes. Mentioned for completeness.
> 6. Something else I haven't thought of?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to