[ https://issues.apache.org/jira/browse/KAFKA-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton updated KAFKA-5568: --------------------------------- Affects Version/s: 3.3.2 3.3.1 3.2.3 3.2.2 3.4.0 3.2.1 3.1.2 3.0.2 3.3.0 3.1.1 3.2.0 2.8.2 3.0.1 3.0.0 2.8.1 2.7.2 2.6.3 3.1.0 2.6.2 2.7.1 2.8.0 2.6.1 2.7.0 2.5.1 2.6.0 2.4.1 2.5.0 2.3.1 2.4.0 2.2.2 2.2.1 2.3.0 2.1.1 2.2.0 2.1.0 2.0.1 2.0.0 1.1.1 1.1.0 1.0.2 1.0.1 1.0.0 0.11.0.3 0.11.0.2 0.11.0.1 0.10.2.2 > Transformations that mutate topic-partitions break sink connectors that > manage their own configuration > ------------------------------------------------------------------------------------------------------ > > Key: KAFKA-5568 > URL: https://issues.apache.org/jira/browse/KAFKA-5568 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, > 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, > 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, > 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, > 3.3.2 > Reporter: Ewen Cheslack-Postava > Priority: Major > Labels: needs-discussion, needs-kip > > KAFKA-5567 describes how offset commits for sink connectors are broken if a > record's topic-partition is mutated by an SMT, e.g RegexRouter or > TimestampRouter. > This is also a problem for sink connectors that manage their own offsets, > i.e. those that store offsets elsewhere and call SinkTaskContext.rewind(). In > this case, the transformation has already been applied by the time the > SinkTask sees it, so there is no way it could correctly track offsets and > call rewind() with valid values. For example, this would make the offset > tracking that Confluent's HDFS connector does by working with filenames no > longer work. Even if they were stored separately in a file rather than > relying on filenames, it still wouldn't have ever had the correct offsets to > write to that file. > There are a couple of options: > 1. Decide that this is an acceptable consequence of combining SMTs with sink > connectors and it's a limitation we accept. You can either transform the data > via Kafka Streams instead or accept that you can't do these "routing" type > operations in the sink connector unless it supports it natively. This *might* > not be the wrong choice since we think there are very few connectors that > track their own offsets. In the case of HDFS, we might rarely hit this issue > because it supports its own file/directory partitioning schemes anyway so > doing this via SMTs isn't as necessary there. > 2. Try to expose the original record information to the sink connector via > the records. I can think of 2 ways this could be done. The first is to attach > the original record to each SinkRecord. The cost here is relatively high in > terms of memory, especially for sink connectors that need to buffer data. The > second is to add fields to SinkRecords for originalTopic() and > originalPartition(). This feels a bit ugly to me but might be the least > intrusive change API-wise and we can guarantee those fields aren't > overwritten by not allowing public constructors to set them. > 3. Try to expose the original record information to the sink connector via a > new pre-processing callback. The idea is similar to preCommit, but instead > would happen before any processing occurs. Taken to its logical conclusion > this turns into a sort of interceptor interface (preConversion, > preTransformation, put, and preCommit). > 4. Add something to the Context that allows the connector to get back at the > original information. Maybe some sort of IdentityMap<Record, Record> > originalPutRecords() that would let you get a mapping back to the original > records. One nice aspect of this is that the connector can hold onto the > original only if it needs it. > 5. A very intrusive change/extension to the SinkTask API that passes in pairs > of <original, transformed> records. Accomplishes the same as 2 but requires > what I think are more complicated changes. Mentioned for completeness. > 6. Something else I haven't thought of? -- This message was sent by Atlassian Jira (v8.20.10#820010)