[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923379#comment-16923379
 ] 

Oleg Kuznetsov edited comment on KAFKA-8877 at 9/5/19 12:36 PM:
----------------------------------------------------------------

[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is better to be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    Integer oldPart = indexCache.get(topic);
    Integer newPart = oldPart;
    // Check that the current sticky partition for the topic is either not set 
or that the partition that 
    // triggered the new batch matches the sticky partition that needs to be 
changed.
    if (oldPart == null || oldPart == prevPartition) {
        List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() < 1) {
            Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
            newPart = random % partitions.size();
        } else if (availablePartitions.size() == 1) {
            newPart = availablePartitions.get(0).partition();
        } else {
            while (newPart == null || newPart.equals(oldPart)) {
                Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
            }
        }
        // Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
        if (oldPart == null) {
            indexCache.putIfAbsent(topic, newPart);
        } else {
            indexCache.replace(topic, prevPartition, newPart);
        }
        return indexCache.get(topic);
    }
    return indexCache.get(topic);
}
{code}


was (Author: olkuznsmith):
[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is ** better be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    Integer oldPart = indexCache.get(topic);
    Integer newPart = oldPart;
    // Check that the current sticky partition for the topic is either not set 
or that the partition that 
    // triggered the new batch matches the sticky partition that needs to be 
changed.
    if (oldPart == null || oldPart == prevPartition) {
        List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() < 1) {
            Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
            newPart = random % partitions.size();
        } else if (availablePartitions.size() == 1) {
            newPart = availablePartitions.get(0).partition();
        } else {
            while (newPart == null || newPart.equals(oldPart)) {
                Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
            }
        }
        // Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
        if (oldPart == null) {
            indexCache.putIfAbsent(topic, newPart);
        } else {
            indexCache.replace(topic, prevPartition, newPart);
        }
        return indexCache.get(topic);
    }
    return indexCache.get(topic);
}
{code}

> Race condition on partition counter
> -----------------------------------
>
>                 Key: KAFKA-8877
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8877
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.2.1
>            Reporter: Oleg Kuznetsov
>            Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
>     AtomicInteger counter = topicCounterMap.get(topic);
>     if (null == counter) {
>         counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
>         AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
>         if (currentCounter != null) {
>             counter = currentCounter;
>         }
>     }
>     return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to