[ https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Oleg Kuznetsov updated KAFKA-8877: ---------------------------------- Description: In the method: *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* {code:java} private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } {code} the counter might be created multiple times instead of once. I propose to replace it with something like ** *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*)) ** * was: In the method: *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* {code:java} private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } {code} **the counter might be created multiple times instead of once. I propose to replace it with something like ** *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*))* ** > Race condition on partition counter > ----------------------------------- > > Key: KAFKA-8877 > URL: https://issues.apache.org/jira/browse/KAFKA-8877 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 2.2.1 > Reporter: Oleg Kuznetsov > Priority: Major > > In the method: > *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue* > {code:java} > private int nextValue(String topic) { > AtomicInteger counter = topicCounterMap.get(topic); > if (null == counter) { > counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); > AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, > counter); > if (currentCounter != null) { > counter = currentCounter; > } > } > return counter.getAndIncrement(); > } > {code} > the counter might be created multiple times instead of once. > I propose to replace it with something like ** > *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*)) > ** * > -- This message was sent by Atlassian Jira (v8.3.2#803003)