[jira] [Commented] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923379#comment-16923379
 ] 

Oleg Kuznetsov commented on KAFKA-8877:
---

[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is ** better be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
{code}

> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923313#comment-16923313
 ] 

huxihx commented on KAFKA-8877:
---

Seems `nextValue` is already removed by 
[KAFKA-8601|https://issues.apache.org/jira/browse/KAFKA-8601]. 

> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)