[
https://issues.apache.org/jira/browse/KAFKA-17755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jerry Cai updated KAFKA-17755:
------------------------------
Description:
During my local test and debug, I noticed that the below logical is in correct,
it needs to change
from !racksPerPartition.values().stream().allMatch(partitionRacks::equals)
to racksPerPartition.values().stream().allMatch(partitionRacks::equals)
current logical
{code:java}
protected boolean useRackAwareAssignment(Set<String> consumerRacks, Set<String>
partitionRacks, Map<TopicPartition, Set<String>> racksPerPartition) {
if (consumerRacks.isEmpty() || Collections.disjoint(consumerRacks,
partitionRacks))
return false;
else if (preferRackAwareLogic)
return true;
else {
return
!racksPerPartition.values().stream().allMatch(partitionRacks::equals);
}
}
{code}
expected logical
{code:java}
protected boolean useRackAwareAssignment(Set<String> consumerRacks, Set<String>
partitionRacks, Map<TopicPartition, Set<String>> racksPerPartition) {
if (consumerRacks.isEmpty() || Collections.disjoint(consumerRacks,
partitionRacks))
return false;
else if (preferRackAwareLogic)
return true;
else {
return
racksPerPartition.values().stream().allMatch(partitionRacks::equals);
}
}
{code}
was:
During my local test and debug, I noticed that the below logical is in correct,
it need to change
from !racksPerPartition.values().stream().allMatch(partitionRacks::equals)
to racksPerPartition.values().stream().allMatch(partitionRacks::equals)
current logical
{code:java}
protected boolean useRackAwareAssignment(Set<String> consumerRacks, Set<String>
partitionRacks, Map<TopicPartition, Set<String>> racksPerPartition) {
if (consumerRacks.isEmpty() || Collections.disjoint(consumerRacks,
partitionRacks))
return false;
else if (preferRackAwareLogic)
return true;
else {
return
!racksPerPartition.values().stream().allMatch(partitionRacks::equals);
}
}
{code}
expected logical
{code:java}
protected boolean useRackAwareAssignment(Set<String> consumerRacks, Set<String>
partitionRacks, Map<TopicPartition, Set<String>> racksPerPartition) {
if (consumerRacks.isEmpty() || Collections.disjoint(consumerRacks,
partitionRacks))
return false;
else if (preferRackAwareLogic)
return true;
else {
return
racksPerPartition.values().stream().allMatch(partitionRacks::equals);
}
}
{code}
> AbstractPartitionAssignor can not enable RackAwareAssignment
> -------------------------------------------------------------
>
> Key: KAFKA-17755
> URL: https://issues.apache.org/jira/browse/KAFKA-17755
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 3.8.0
> Reporter: Jerry Cai
> Assignee: Rajini Sivaram
> Priority: Critical
>
> During my local test and debug, I noticed that the below logical is in
> correct, it needs to change
> from !racksPerPartition.values().stream().allMatch(partitionRacks::equals)
> to racksPerPartition.values().stream().allMatch(partitionRacks::equals)
>
>
>
> current logical
> {code:java}
> protected boolean useRackAwareAssignment(Set<String> consumerRacks,
> Set<String> partitionRacks, Map<TopicPartition, Set<String>>
> racksPerPartition) {
> if (consumerRacks.isEmpty() || Collections.disjoint(consumerRacks,
> partitionRacks))
> return false;
> else if (preferRackAwareLogic)
> return true;
> else {
> return
> !racksPerPartition.values().stream().allMatch(partitionRacks::equals);
> }
> }
> {code}
>
> expected logical
> {code:java}
> protected boolean useRackAwareAssignment(Set<String> consumerRacks,
> Set<String> partitionRacks, Map<TopicPartition, Set<String>>
> racksPerPartition) {
> if (consumerRacks.isEmpty() || Collections.disjoint(consumerRacks,
> partitionRacks))
> return false;
> else if (preferRackAwareLogic)
> return true;
> else {
> return
> racksPerPartition.values().stream().allMatch(partitionRacks::equals);
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)