[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-8877:
----------------------------------
    Description: 
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}
{code}
the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*)) ** *

 

  was:
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}
{code}
**the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*))* **

 


> Race condition on partition counter
> -----------------------------------
>
>                 Key: KAFKA-8877
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8877
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.2.1
>            Reporter: Oleg Kuznetsov
>            Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
>     AtomicInteger counter = topicCounterMap.get(topic);
>     if (null == counter) {
>         counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
>         AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
>         if (currentCounter != null) {
>             counter = currentCounter;
>         }
>     }
>     return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like ** 
> *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*)) 
> ** *
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to