[ 
https://issues.apache.org/jira/browse/KAFKA-8816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8816:
---------------------------------
    Fix Version/s: 2.3.1
                   2.2.2
                   2.1.2
                   2.0.2

> RecordCollector offsets updated indirectly by StreamTask
> --------------------------------------------------------
>
>                 Key: KAFKA-8816
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8816
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Chris Pettitt
>            Assignee: Chris Pettitt
>            Priority: Major
>             Fix For: 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
>
> Currently it is possible to indirectly update the offsets in 
> RecordCollectorImpl via the offset read function:
> {code:java}
> @Override
> public Map<TopicPartition, Long> offsets() {
>     return offsets;
> } {code}
> The offsets here is the a private final field in RecordCollectorImpl. It 
> appears that the intent is for this field to be updated only when the 
> producer acknowledges an offset. However, because it is handed back in a 
> mutable form, it is possible to update offsets through this call, as actually 
> happens today in StreamTask:
> {code:java}
> protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
>     final Map<TopicPartition, Long> checkpointableOffsets = 
> recordCollector.offsets();
>     for (final Map.Entry<TopicPartition, Long> entry : 
> consumedOffsets.entrySet()) {
>         checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
>     }
>     return checkpointableOffsets;
> }{code}
> Here it is possible to set a new checkpoint if the topic partition is not 
> already in the offsets map, which happens for the input topic when we're 
> using optimized topologies and a KTable. The effect is that we continue to 
> checkpoint the first offset seen (putIfAbsent).
> It seems the correct behavior would be to return a read only view of the 
> offsets from RecordCollectorImpl and create a copy of the returned map in 
> activeTaskCheckpointableOffsets before we mutate it.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to