[
https://issues.apache.org/jira/browse/KAFKA-8816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-8816:
---------------------------------
Fix Version/s: 2.3.1
2.2.2
2.1.2
2.0.2
> RecordCollector offsets updated indirectly by StreamTask
> --------------------------------------------------------
>
> Key: KAFKA-8816
> URL: https://issues.apache.org/jira/browse/KAFKA-8816
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Chris Pettitt
> Assignee: Chris Pettitt
> Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
>
> Currently it is possible to indirectly update the offsets in
> RecordCollectorImpl via the offset read function:
> {code:java}
> @Override
> public Map<TopicPartition, Long> offsets() {
> return offsets;
> } {code}
> The offsets here is the a private final field in RecordCollectorImpl. It
> appears that the intent is for this field to be updated only when the
> producer acknowledges an offset. However, because it is handed back in a
> mutable form, it is possible to update offsets through this call, as actually
> happens today in StreamTask:
> {code:java}
> protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
> final Map<TopicPartition, Long> checkpointableOffsets =
> recordCollector.offsets();
> for (final Map.Entry<TopicPartition, Long> entry :
> consumedOffsets.entrySet()) {
> checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
> }
> return checkpointableOffsets;
> }{code}
> Here it is possible to set a new checkpoint if the topic partition is not
> already in the offsets map, which happens for the input topic when we're
> using optimized topologies and a KTable. The effect is that we continue to
> checkpoint the first offset seen (putIfAbsent).
> It seems the correct behavior would be to return a read only view of the
> offsets from RecordCollectorImpl and create a copy of the returned map in
> activeTaskCheckpointableOffsets before we mutate it.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)