[jira] [Updated] (KAFKA-14256) Update to Scala 2.13.10
[ https://issues.apache.org/jira/browse/KAFKA-14256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14256: -- Summary: Update to Scala 2.13.10 (was: Update to Scala 2.13.9) > Update to Scala 2.13.10 > --- > > Key: KAFKA-14256 > URL: https://issues.apache.org/jira/browse/KAFKA-14256 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Assignee: Matthew de Detrich >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14256) Update to Scala 2.13.10
[ https://issues.apache.org/jira/browse/KAFKA-14256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14256: -- Fix Version/s: 3.4.0 > Update to Scala 2.13.10 > --- > > Key: KAFKA-14256 > URL: https://issues.apache.org/jira/browse/KAFKA-14256 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Assignee: Matthew de Detrich >Priority: Minor > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi opened a new pull request, #13697: MINOR:code optimization in QuorumController
hudeqi opened a new pull request, #13697: URL: https://github.com/apache/kafka/pull/13697 1. add hint in switch item "BROKER_LOGGER" in ConfigResourceExistenceChecker, otherwise, it will be classified as default break and deleted directly. I don’t know if adding hint is better than deleting directly. 2. delete some unused variables and methods. 3. add the "@Disabled" mark to a method in unit test that is not tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
showuon commented on code in PR #13679: URL: https://github.com/apache/kafka/pull/13679#discussion_r1189322325 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -84,18 +105,30 @@ class SimpleApiVersionManager( throw new UnsupportedOperationException("This method is not supported in SimpleApiVersionManager, use apiVersionResponse(throttleTimeMs, finalizedFeatures, epoch) instead") } - override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], epoch: Long): ApiVersionsResponse = { + override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): ApiVersionsResponse = { ApiVersionsResponse.createApiVersionsResponse( throttleTimeMs, apiVersions, brokerFeatures, finalizedFeatures.asJava, - epoch, + finalizedFeaturesEpoch, zkMigrationEnabled ) } } +/** + * The default ApiVersionManager that supports forwarding and has metadata cache, used in broker and zk controller. + * When forwarding is enabled, the enabled apis are determined by the broker listener type and the controller apis, + * otherwise the enabled apis are determined by the broker listener type, which is the same with SimpleApiVersionManager. + * + * @param listenerType the listener type + * @param forwardingManager the forwarding manager, + * @param features + * @param metadataCache + * @param enableUnstableLastVersion + * @param zkMigrationEnabled Review Comment: @dengziming thanks for adding the javadocs for these 2 classes. Very clear. But please remember to complete them. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi closed pull request #13617: MINOR:code optimization in QuorumController
hudeqi closed pull request #13617: MINOR:code optimization in QuorumController URL: https://github.com/apache/kafka/pull/13617 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13617: MINOR:code optimization in QuorumController
hudeqi commented on PR #13617: URL: https://github.com/apache/kafka/pull/13617#issuecomment-1541277135 Too many invalid commits, re-submit another -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
[ https://issues.apache.org/jira/browse/KAFKA-14979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721163#comment-17721163 ] hudeqi commented on KAFKA-14979: [GitHub Pull Request #13692|https://github.com/apache/kafka/pull/13692] is expired. > Incorrect lag was calculated when markPartitionsForTruncation in > ReplicaAlterLogDirsThread > -- > > Key: KAFKA-14979 > URL: https://issues.apache.org/jira/browse/KAFKA-14979 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > > When the partitions of ReplicaFetcherThread finished truncating, the > ReplicaAlterLogDirsThread to which these partitions belong needs to be marked > truncate. The lag value in the newState (PartitionFetchState) obtained in > this process is still the original value (state.lag). If the truncationOffset > is smaller than the original state.fetchOffset, then the original lag value > is incorrect and needs to be updated. It should be the original lag value > plus the difference between the original state.fetchOffset and > truncationOffset. > If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread > may set the wrong lag value for fetcherLagStats when executing > "processFetchRequest". So it might be more reasonable to recalculate a lag > value based on new state.fetchOffset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13692: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
hudeqi commented on PR #13692: URL: https://github.com/apache/kafka/pull/13692#issuecomment-1541271115 Too many invalid commits, re-submit another [PR](https://github.com/apache/kafka/pull/13696) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi closed pull request #13692: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
hudeqi closed pull request #13692: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread URL: https://github.com/apache/kafka/pull/13692 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request, #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
hudeqi opened a new pull request, #13696: URL: https://github.com/apache/kafka/pull/13696 When the partitions of ReplicaFetcherThread finished truncating, the ReplicaAlterLogDirsThread to which these partitions belong needs to be marked truncate. The lag value in the newState (PartitionFetchState) obtained in this process is still the original value (state.lag). If the truncationOffset is smaller than the original state.fetchOffset, then the original lag value is incorrect and needs to be updated. It should be the original lag value plus the difference between the original state.fetchOffset and truncationOffset. If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread may set the wrong lag value for fetcherLagStats when executing "processFetchRequest". So it might be more reasonable to recalculate a lag value based on new state.fetchOffset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189297499 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ Review Comment: Looks good, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189296779 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of the
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189296233 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of the
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189295560 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of the
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189237663 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189235728 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189220058 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189231570 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189231170 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189231412 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189229928 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189229707 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189229522 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189220058 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. +
[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted
[ https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721114#comment-17721114 ] Matthias J. Sax commented on KAFKA-14981: - Very interesting idea – given that we persist the thread-id (aka process-id) in the state directory on local disk, it could help. And even if we don't persist it (because there is no local storage), it seems no harm would be done if the id changes every single time. Wondering if we would need a KIP for this. By gut feeling is no, but not sure. > Set `group.instance.id` in streams consumer so that rebalance will not happen > if a instance is restarted > > > Key: KAFKA-14981 > URL: https://issues.apache.org/jira/browse/KAFKA-14981 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Priority: Minor > > `group.instance.id` enables static membership so that if a consumer is > restarted within `session.timeout.ms`, rebalance will not be triggered and > originally assignment can be returned directly from broker. We can set this > id in Kafka streams using `threadId` so that no rebalance is trigger within > `session.timeout.ms` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189216494 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189216148 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189214614 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. +
[jira] [Created] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted
Hao Li created KAFKA-14981: -- Summary: Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted Key: KAFKA-14981 URL: https://issues.apache.org/jira/browse/KAFKA-14981 Project: Kafka Issue Type: Improvement Components: streams Reporter: Hao Li `group.instance.id` enables static membership so that if a consumer is restarted within `session.timeout.ms`, rebalance will not be triggered and originally assignment can be returned directly from broker. We can set this id in Kafka streams using `threadId` so that no rebalance is trigger within `session.timeout.ms` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189213049 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189212267 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ Review Comment: I did it in a way that looks best to me in the next commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1189210485 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ Review Comment: It looks a bit wonky after formatting it like that, I don't think there's a great way to add this html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #13695: HOTFIX: fix the VersionedKeyValueToBytesStoreAdapter#isOpen API
ableegoldman commented on PR #13695: URL: https://github.com/apache/kafka/pull/13695#issuecomment-1540876914 @vcrfxia @mjsax pulled this fix out for the 3.5 release (cc @mimaison ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #13682: MINOR: improved exception/warn logging for stream-stream join store settings
ableegoldman commented on code in PR #13682: URL: https://github.com/apache/kafka/pull/13682#discussion_r1189116797 ## streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java: ## @@ -115,7 +115,7 @@ public boolean persistent() { @Override public boolean isOpen() { -return inner.persistent(); +return inner.isOpen(); Review Comment: Good point. Here: https://github.com/apache/kafka/pull/13695 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request, #13695: HOTFIX: fix the VersionedKeyValueToBytesStoreAdapter#isOpen API
ableegoldman opened a new pull request, #13695: URL: https://github.com/apache/kafka/pull/13695 The VersionedKeyValueToBytesStoreAdapter#isOpen API accidentally returns the value of `inner.persistent()` when it should be returning `inner.isOpen()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-14974: -- Fix Version/s: 3.5.0 > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0, 3.4.1, 3.3.3, 3.6.0 > > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721025#comment-17721025 ] Alexandre Dupriez commented on KAFKA-7739: -- Got it. Sorry I misunderstood your suggestion. Thanks for clarifying! (and agree with it). > Kafka Tiered Storage > > > Key: KAFKA-7739 > URL: https://issues.apache.org/jira/browse/KAFKA-7739 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Harsha >Assignee: Satish Duggana >Priority: Major > Labels: needs-kip > Fix For: 3.6.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188898373 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. + *
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188893117 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. + *
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188893478 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { Review Comment: nit: ` : `. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188891494 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. + *
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188891203 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188873259 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +
[jira] [Commented] (KAFKA-14980) MirrorMaker consumers don't get configs prefixed with source.cluster
[ https://issues.apache.org/jira/browse/KAFKA-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720993#comment-17720993 ] Mickael Maison commented on KAFKA-14980: cc [~ChrisEgerton] > MirrorMaker consumers don't get configs prefixed with source.cluster > > > Key: KAFKA-14980 > URL: https://issues.apache.org/jira/browse/KAFKA-14980 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: Mickael Maison >Priority: Blocker > > As part of KAFKA-14021, we made a change to > MirrorConnectorConfig.sourceConsumerConfig() to grab all configs that start > with "source.". Previously it was grabbing configs prefixed with > "source.cluster.". > This means existing connector configuration stop working, as configurations > such as bootstrap.servers are not passed to source consumers. > For example, the following connector configuration was valid in 3.4 and now > makes the connector tasks fail: > {code:json} > { > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "name": "source", > "topics": "test", > "tasks.max": "30", > "source.cluster.alias": "one", > "target.cluster.alias": "two", > "source.cluster.bootstrap.servers": "localhost:9092", >"target.cluster.bootstrap.servers": "localhost:29092" > } > {code} > The connector attempts to start source consumers with bootstrap.servers = [] > and the task crash with > {noformat} > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:837) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:671) > at > org.apache.kafka.connect.mirror.MirrorUtils.newConsumer(MirrorUtils.java:59) > at > org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:103) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) > at > org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) > at > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.config.ConfigException: No resolvable > bootstrap urls given in bootstrap.servers > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14980) MirrorMaker consumers don't get configs prefixed with source.cluster
Mickael Maison created KAFKA-14980: -- Summary: MirrorMaker consumers don't get configs prefixed with source.cluster Key: KAFKA-14980 URL: https://issues.apache.org/jira/browse/KAFKA-14980 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.5.0 Reporter: Mickael Maison As part of KAFKA-14021, we made a change to MirrorConnectorConfig.sourceConsumerConfig() to grab all configs that start with "source.". Previously it was grabbing configs prefixed with "source.cluster.". This means existing connector configuration stop working, as configurations such as bootstrap.servers are not passed to source consumers. For example, the following connector configuration was valid in 3.4 and now makes the connector tasks fail: {code:json} { "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "name": "source", "topics": "test", "tasks.max": "30", "source.cluster.alias": "one", "target.cluster.alias": "two", "source.cluster.bootstrap.servers": "localhost:9092", "target.cluster.bootstrap.servers": "localhost:29092" } {code} The connector attempts to start source consumers with bootstrap.servers = [] and the task crash with {noformat} org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:837) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:671) at org.apache.kafka.connect.mirror.MirrorUtils.newConsumer(MirrorUtils.java:59) at org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:103) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188858930 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188857767 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of the
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188856580 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of the
[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-14974: -- Fix Version/s: 3.4.1 3.3.3 > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.4.1, 3.3.3, 3.6.0 > > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188832049 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); Review
[GitHub] [kafka] kirktrue closed pull request #13495: KAFKA-14274: Introduce FetchRequestManager to integrate fetch into new consumer threading refactor
kirktrue closed pull request #13495: KAFKA-14274: Introduce FetchRequestManager to integrate fetch into new consumer threading refactor URL: https://github.com/apache/kafka/pull/13495 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188823115 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188821932 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188821039 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of members per topic using the given member subscriptions. + * Generate a list of members called potentially unfilled members, which consists of members that have not met the minimum required quota of partitions for the assignment AND + * get a list called assigned sticky partitions per topic, which has the partitions that will be retained in the new assignment. + * Generate a list of unassigned partitions by calculating the difference between the total partitions for the topic and the assigned (sticky) partitions. +
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188819218 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188817725 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] urbandan commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics
urbandan commented on PR #13690: URL: https://github.com/apache/kafka/pull/13690#issuecomment-1540445965 Added a unit test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188811581 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * This Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +/** + * Pair of memberId and remaining partitions to meet the quota. + */ +private static class MemberWithRemainingAssignments { +/** + * Member Id. + */ +private final String memberId; +/** + * Number of partitions required to meet the assignment quota. + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} +} + +/** + * @return Map of topicIds to a list of members subscribed to them. + */ +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. + * Generate a list of members (potentiallyUnfilledMembers) that have not met the minimum required quota of partitions for the assignment AND + * get a list (assignedStickyPartitionsPerTopic) of partitions that will be retained in the new assignment. + * Add members from the potentiallyUnfilledMembers list to the unfilledMembersPerTopic map if they haven't met the total required quota + * i.e. minRequiredQuota + 1, if the member is designated to receive one of
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
jeffkbkim commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1188772104 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala: ## @@ -160,6 +160,7 @@ class GroupMetadataTest { assertThrows(classOf[IllegalStateException], () => group.transitionTo(CompletingRebalance)) } + @Test Review Comment: created https://github.com/apache/kafka/pull/13694 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13694: MINOR: add test tag for testDeadToDeadIllegalTransition
jeffkbkim opened a new pull request, #13694: URL: https://github.com/apache/kafka/pull/13694 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java
dajac commented on code in PR #13663: URL: https://github.com/apache/kafka/pull/13663#discussion_r1188744749 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala: ## @@ -160,6 +160,7 @@ class GroupMetadataTest { assertThrows(classOf[IllegalStateException], () => group.transitionTo(CompletingRebalance)) } + @Test Review Comment: Nice catch! Do you mind doing a separate PR for this one? This way we can keep changes in this PR scoped in the new module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720955#comment-17720955 ] Jorge Esteban Quilcate Otoya commented on KAFKA-7739: - Yes, we know transactional index is optional when copying but not when fetching. I agree throwing the exception should be correct, but the javadoc is ambiguous as it refers to "no resources associated with the given remoteLogSegmentMetadata" that may mean _all_ resources, instead of "the requested resource is not found in the remote storage" or similar. > Kafka Tiered Storage > > > Key: KAFKA-7739 > URL: https://issues.apache.org/jira/browse/KAFKA-7739 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Harsha >Assignee: Satish Duggana >Priority: Major > Labels: needs-kip > Fix For: 3.6.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on PR #13443: URL: https://github.com/apache/kafka/pull/13443#issuecomment-1540289311 @rreddy-22 The build failed due to compilation errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
dajac merged PR #13644: URL: https://github.com/apache/kafka/pull/13644 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
jeffkbkim commented on PR #13644: URL: https://github.com/apache/kafka/pull/13644#issuecomment-1540246709 @dajac the test failure is unrelated. can we merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics
C0urante commented on PR #13690: URL: https://github.com/apache/kafka/pull/13690#issuecomment-1540232079 Tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14414) Remove unnecessary usage of ObjectSerializationCache
[ https://issues.apache.org/jira/browse/KAFKA-14414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-14414: - Attachment: producer-only-cpu-33-clean.html > Remove unnecessary usage of ObjectSerializationCache > > > Key: KAFKA-14414 > URL: https://issues.apache.org/jira/browse/KAFKA-14414 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Fix For: 3.4.0 > > Attachments: Screenshot 2022-11-21 at 19.23.53.png, > producer-only-cpu-33-clean.html > > > We create an instance of ObjectSerializationCache at > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L113] > which does not get used at all. We always "add" to the cache but never > retrieve from it (as is evident by the fact that we don't store the reference > of the cache anywhere). > Adding information to the cache is expensive because it uses > System.identityHashCode(Object) which is expensive as demonstrated by the > flame graph of producer requests over Apache Kafka 3.3.1 plaintext broker. > {{!Screenshot 2022-11-21 at 19.23.53.png!}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down
hudeqi commented on PR #13473: URL: https://github.com/apache/kafka/pull/13473#issuecomment-1540199229 > bump this pr, Is there any other commiters to help review and merge this pr? I'm sorry I didn't notice the comment ("We're past code freeze for 3.5 so moving this to the next release") in the corresponding [jira](https://issues.apache.org/jira/browse/KAFKA-14866) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 opened a new pull request, #13693: MINOR: fix a small typo in SharedServer.scala
machi1990 opened a new pull request, #13693: URL: https://github.com/apache/kafka/pull/13693 I noticed this typo as I was going through the SharedServer.scala file ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720947#comment-17720947 ] Alexandre Dupriez commented on KAFKA-7739: -- I would think that the absence of the log segment or any mandatory ancillary file results in the stated exception. I think only transaction indexes are optional. > Kafka Tiered Storage > > > Key: KAFKA-7739 > URL: https://issues.apache.org/jira/browse/KAFKA-7739 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Harsha >Assignee: Satish Duggana >Priority: Major > Labels: needs-kip > Fix For: 3.6.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down
hudeqi commented on PR #13473: URL: https://github.com/apache/kafka/pull/13473#issuecomment-1540185668 bump this pr, Is there any other commiters to help review and merge this pr? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
Hangleton commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1188635591 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,32 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * LogSegmentOffsetOverflowException should be thrown while appending the logs if: + * 1. largestOffset < 0 Review Comment: Nit: we can remove (1) since (1) => (2). ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,32 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * LogSegmentOffsetOverflowException should be thrown while appending the logs if: + * 1. largestOffset < 0 + * 2. largestOffset - baseOffset < 0 + * 3. largestOffset - baseOffset > Integer.MAX_VALUE + */ + @ParameterizedTest + @CsvSource(Array( +"0, -2147483648", +"0, 2147483648", +"1, 0", +"100, 10", +"2147483648, 0", +"-2147483648, 0", +"2147483648,4294967296" Review Comment: nit: Note that the max int value on the Java platform is `2147483647` so we are exercising max int + 1. ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,32 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * LogSegmentOffsetOverflowException should be thrown while appending the logs if: + * 1. largestOffset < 0 + * 2. largestOffset - baseOffset < 0 + * 3. largestOffset - baseOffset > Integer.MAX_VALUE + */ + @ParameterizedTest + @CsvSource(Array( +"0, -2147483648", +"0, 2147483648", +"1, 0", +"100, 10", +"2147483648, 0", +"-2147483648, 0", +"2147483648,4294967296" + )) + def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { Review Comment: What is the expected behaviour for `baseOffset == largestOffset`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
akatona84 commented on PR #11565: URL: https://github.com/apache/kafka/pull/11565#issuecomment-1540178836 rebased -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request, #13692: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
hudeqi opened a new pull request, #13692: URL: https://github.com/apache/kafka/pull/13692 When the partitions of ReplicaFetcherThread finished truncating, the ReplicaAlterLogDirsThread to which these partitions belong needs to be marked truncate. The lag value in the newState (PartitionFetchState) obtained in this process is still the original value (state.lag). If the truncationOffset is smaller than the original state.fetchOffset, then the original lag value is incorrect and needs to be updated. It should be the original lag value plus the difference between the original state.fetchOffset and truncationOffset. If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread may set the wrong lag value for fetcherLagStats when executing "processFetchRequest". So it might be more reasonable to recalculate a lag value based on new state.fetchOffset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
[ https://issues.apache.org/jira/browse/KAFKA-14979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-14979: --- Description: When the partitions of ReplicaFetcherThread finished truncating, the ReplicaAlterLogDirsThread to which these partitions belong needs to be marked truncate. The lag value in the newState (PartitionFetchState) obtained in this process is still the original value (state.lag). If the truncationOffset is smaller than the original state.fetchOffset, then the original lag value is incorrect and needs to be updated. It should be the original lag value plus the difference between the original state.fetchOffset and truncationOffset. If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread may set the wrong lag value for fetcherLagStats when executing "processFetchRequest". So it might be more reasonable to recalculate a lag value based on new state.fetchOffset. was: When the partitions of ReplicaFetcherThread finished truncating, the ReplicaAlterLogDirsThread to which these partitions belong needs to be marked truncate. The lag value in the newState (PartitionFetchState) obtained in this process is still the original value (state.lag). If the truncationOffset is smaller than the original state.fetchOffset, then the original lag value is incorrect and needs to be updated. It should be the original lag value plus the difference between the original state.fetchOffset and truncationOffset. If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread may set the wrong lag value for fetcherLagStats when executing processFetchRequest. So it might be more reasonable to recalculate a lag value based on new state.fetchOffset. > Incorrect lag was calculated when markPartitionsForTruncation in > ReplicaAlterLogDirsThread > -- > > Key: KAFKA-14979 > URL: https://issues.apache.org/jira/browse/KAFKA-14979 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > > When the partitions of ReplicaFetcherThread finished truncating, the > ReplicaAlterLogDirsThread to which these partitions belong needs to be marked > truncate. The lag value in the newState (PartitionFetchState) obtained in > this process is still the original value (state.lag). If the truncationOffset > is smaller than the original state.fetchOffset, then the original lag value > is incorrect and needs to be updated. It should be the original lag value > plus the difference between the original state.fetchOffset and > truncationOffset. > If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread > may set the wrong lag value for fetcherLagStats when executing > "processFetchRequest". So it might be more reasonable to recalculate a lag > value based on new state.fetchOffset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
[ https://issues.apache.org/jira/browse/KAFKA-14979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-14979: --- Description: When the partitions of ReplicaFetcherThread finished truncating, the ReplicaAlterLogDirsThread to which these partitions belong needs to be marked truncate. The lag value in the newState (PartitionFetchState) obtained in this process is still the original value (state.lag). If the truncationOffset is smaller than the original state.fetchOffset, then the original lag value is incorrect and needs to be updated. It should be the original lag value plus the difference between the original state.fetchOffset and truncationOffset. If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread may set the wrong lag value for fetcherLagStats when executing processFetchRequest. So it might be more reasonable to recalculate a lag value based on new state.fetchOffset. > Incorrect lag was calculated when markPartitionsForTruncation in > ReplicaAlterLogDirsThread > -- > > Key: KAFKA-14979 > URL: https://issues.apache.org/jira/browse/KAFKA-14979 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > > When the partitions of ReplicaFetcherThread finished truncating, the > ReplicaAlterLogDirsThread to which these partitions belong needs to be marked > truncate. The lag value in the newState (PartitionFetchState) obtained in > this process is still the original value (state.lag). If the truncationOffset > is smaller than the original state.fetchOffset, then the original lag value > is incorrect and needs to be updated. It should be the original lag value > plus the difference between the original state.fetchOffset and > truncationOffset. > If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread > may set the wrong lag value for fetcherLagStats when executing > processFetchRequest. So it might be more reasonable to recalculate a lag > value based on new state.fetchOffset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720936#comment-17720936 ] Jorge Esteban Quilcate Otoya commented on KAFKA-7739: - [~satish.duggana] could you help clarifying the following: RemoteStorageManager#fetchIndex and RemoteStorageManager#fetchLogSegment state: > * @throws RemoteResourceNotFoundException when there are no resources > associated with the given remoteLogSegmentMetadata. Though when fetching indexes: Transactional Index is optional. So when returning a response should it be null? or throw RemoteResourceNotFoundException? >From the javadoc is unclear whether resources means a specific object/index or >the whole segment log + metadata. > Kafka Tiered Storage > > > Key: KAFKA-7739 > URL: https://issues.apache.org/jira/browse/KAFKA-7739 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Harsha >Assignee: Satish Duggana >Priority: Major > Labels: needs-kip > Fix For: 3.6.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes
fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1188609885 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +try { +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +KafkaProducer producer = +new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); + +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +KafkaConsumer consumer = new Consumer( +"processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null) +.createKafkaConsumer(); + +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } - -Map offsets = consumerOffsets(); - -// Checkpoint the progress by sending offsets to group coordinator broker. -// Note that this API is only available for broker >= 2.5. -producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); - -// Finish the transaction. All sent records should be visible for consumption now. -producer.commitTransaction(); -messageProcessed += records.count(); +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { Review Comment: Well, this is consistent with the consumer. The exception list here is shorter and much more compact (no repeated instanceof). If you don't
[GitHub] [kafka] fvaleri commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes
fvaleri commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1188609481 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +try { +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +KafkaProducer producer = +new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); + +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +KafkaConsumer consumer = new Consumer( +"processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null) +.createKafkaConsumer(); + +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } - -Map offsets = consumerOffsets(); - -// Checkpoint the progress by sending offsets to group coordinator broker. -// Note that this API is only available for broker >= 2.5. -producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); - -// Finish the transaction. All sent records should be visible for consumption now. -producer.commitTransaction(); -messageProcessed += records.count(); +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +Utils.printErr(e.getMessage()); +// we can't recover from these exceptions +shutdown(); +
[jira] [Assigned] (KAFKA-14965) Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new consumer threading refactor
[ https://issues.apache.org/jira/browse/KAFKA-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-14965: -- Assignee: Lianet Magrans > Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new > consumer threading refactor > - > > Key: KAFKA-14965 > URL: https://issues.apache.org/jira/browse/KAFKA-14965 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > > This task introduces new functionality for handling ListOffsetsRequests for > the new consumer implementation, as part for the ongoing work for the > consumer threading model refactor. > This task introduces a new class named {{ListOffsetsRequestManager, > }}responsible of handling ListOffsets requests performed by the consumer to > expose functionality like beginningOffsets, endOffsets and offsetsForTimes. > The Offset{{{}Fetcher{}}} class is used internally by the {{KafkaConsumer}} > to list offsets, so this task will be based on a refactored > Offset{{{}Fetcher{}}}, reusing the fetching logic as much as possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14966) Extract reusable common logic from OffsetFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-14966: -- Assignee: Lianet Magrans > Extract reusable common logic from OffsetFetcher > > > Key: KAFKA-14966 > URL: https://issues.apache.org/jira/browse/KAFKA-14966 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > > The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, > validate and reset positions. > For the new consumer based on a refactored threading model, similar > functionality will be needed by the ListOffsetsRequestManager component. > This task aims at identifying and extracting the OffsetFetcher functionality > that can be reused by the new consumer implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188566365 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig, _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | - _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), -divergingEpoch = None, -highWatermark = UnifiedLog.UnknownOffset, -leaderLogStartOffset = UnifiedLog.UnknownOffset, -leaderLogEndOffset = UnifiedLog.UnknownOffset, -followerLogStartOffset = UnifiedLog.UnknownOffset, -fetchTimeMs = -1L, -lastStableOffset = None, -exception = Some(e)) + createLogReadResult(e) +case e: OffsetOutOfRangeException => + // In case of offset out of range errors, check for remote log manager for non-compacted topics + // to fetch from remote storage. `log` instance should not be null here as that would have been caught earlier + // with NotLeaderForPartitionException or ReplicaNotAvailableException. + // If it is from a follower then send the offset metadata only as the data is already available in remote + // storage. + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && +// Check that the fetch offset is within the offset range within the remote storage layer. +log.logStartOffset <= offset && offset < log.localLogStartOffset()) { +// For follower fetch requests, throw an error saying that this offset is moved to tiered storage. +val highWatermark = log.highWatermark +val leaderLogStartOffset = log.logStartOffset Review Comment: It is fine as the offset can always be updated, we will send whatever is the value available at that time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
[ https://issues.apache.org/jira/browse/KAFKA-14979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi reassigned KAFKA-14979: -- Assignee: hudeqi > Incorrect lag was calculated when markPartitionsForTruncation in > ReplicaAlterLogDirsThread > -- > > Key: KAFKA-14979 > URL: https://issues.apache.org/jira/browse/KAFKA-14979 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
hudeqi created KAFKA-14979: -- Summary: Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread Key: KAFKA-14979 URL: https://issues.apache.org/jira/browse/KAFKA-14979 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 3.3.2 Reporter: hudeqi -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rhauch merged pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog
rhauch merged PR #13688: URL: https://github.com/apache/kafka/pull/13688 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-14974: -- Fix Version/s: 3.6.0 > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0 > > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188479417 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig, _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | - _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), -divergingEpoch = None, -highWatermark = UnifiedLog.UnknownOffset, -leaderLogStartOffset = UnifiedLog.UnknownOffset, -leaderLogEndOffset = UnifiedLog.UnknownOffset, -followerLogStartOffset = UnifiedLog.UnknownOffset, -fetchTimeMs = -1L, -lastStableOffset = None, -exception = Some(e)) + createLogReadResult(e) +case e: OffsetOutOfRangeException => + // In case of offset out of range errors, check for remote log manager for non-compacted topics + // to fetch from remote storage. `log` instance should not be null here as that would have been caught earlier + // with NotLeaderForPartitionException or ReplicaNotAvailableException. + // If it is from a follower then send the offset metadata only as the data is already available in remote + // storage. + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && +// Check that the fetch offset is within the offset range within the remote storage layer. +log.logStartOffset <= offset && offset < log.localLogStartOffset()) { Review Comment: That should work fine because it will eventually throw offset out-of-range error if the target offset does not exist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188470147 ## core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogReaderTest { +RemoteLogManager mockRLM = mock(RemoteLogManager.class); +LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100); +Records records = mock(Records.class); + + +@Test +public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOException { +FetchDataInfo fetchDataInfo = new FetchDataInfo(logOffsetMetadata, records); + when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo); + +Consumer callback = mock(Consumer.class); +RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false); +RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback); +remoteLogReader.call(); + +// verify the callback did get invoked with the expected remoteLogReadResult +ArgumentCaptor remoteLogReadResultArg = ArgumentCaptor.forClass(RemoteLogReadResult.class); +verify(callback, times(1)).accept(remoteLogReadResultArg.capture()); +RemoteLogReadResult actualRemoteLogReadResult = remoteLogReadResultArg.getValue(); +assertFalse(actualRemoteLogReadResult.error.isPresent()); +assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent()); +assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get()); +} + +@Test +public void testRemoteLogReaderWithError() throws RemoteStorageException, IOException { Review Comment: We will add more tests in followup PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
lanshiqin commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1188451651 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,29 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * If the maximum offset beyond index, appended to the log section, it throws LogSegmentOffsetOverflowException Review Comment: Good idea. I've modified it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
Hangleton commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188387671 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +623,210 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int firstBatchSize = firstBatch.sizeInBytes(); +// An empty record is sent instead of an incomplete batch when +// - there is no minimum-one-message constraint and +// - the first batch size is more than maximum bytes that can be sent. +// - for FetchRequest version 3 or above and +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +
[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
Hangleton commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188387671 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +623,210 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int firstBatchSize = firstBatch.sizeInBytes(); +// An empty record is sent instead of an incomplete batch when +// - there is no minimum-one-message constraint and +// - the first batch size is more than maximum bytes that can be sent. +// - for FetchRequest version 3 or above and +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +
[GitHub] [kafka] showuon commented on pull request #13691: MINOR: remove kraft readme link
showuon commented on PR #13691: URL: https://github.com/apache/kafka/pull/13691#issuecomment-1539857562 @dengziming , good point! I've updated it to `the KRaft section for details.` to link it to the KRaft section in the doc. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
showuon commented on PR #13679: URL: https://github.com/apache/kafka/pull/13679#issuecomment-1539844838 Also, checkstyle failed. Please fix them. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse
showuon commented on code in PR #13679: URL: https://github.com/apache/kafka/pull/13679#discussion_r1188407891 ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -80,7 +81,18 @@ class SimpleApiVersionManager( private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion) override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = { -ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled) +throw new UnsupportedOperationException("This method is not supported in SimpleApiVersionManager, use apiVersionResponse(throttleTimeMs, finalizedFeatures, epoch) instead") Review Comment: Actually it's not clear when to use `SimpleApiVersionManager` and when to use `DefaultApiVersionManager`. Could we add some comments above each class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics
urbandan commented on PR #13690: URL: https://github.com/apache/kafka/pull/13690#issuecomment-1539809972 Thanks @viktorsomogyi ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes
showuon commented on PR #13517: URL: https://github.com/apache/kafka/pull/13517#issuecomment-1539806138 Please resolve the conflict. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes
showuon commented on code in PR #13516: URL: https://github.com/apache/kafka/pull/13516#discussion_r1188380635 ## examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java: ## @@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String inputTopic, @Override public void run() { -// Init transactions call should always happen first in order to clear zombie transactions from previous generation. -producer.initTransactions(); - -final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - -consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { -@Override -public void onPartitionsRevoked(Collection partitions) { -printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); -} - -@Override -public void onPartitionsAssigned(Collection partitions) { -printWithTxnId("Received partition assignment after rebalancing: " + partitions); -messageRemaining.set(messagesRemaining(consumer)); -} -}); - -int messageProcessed = 0; -while (messageRemaining.get() > 0) { -try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); -if (records.count() > 0) { -// Begin a new transaction session. -producer.beginTransaction(); -for (ConsumerRecord record : records) { -// Process the record and send to downstream. -ProducerRecord customizedRecord = transform(record); -producer.send(customizedRecord); +int processedRecords = 0; +long remainingRecords = Long.MAX_VALUE; +try { +// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster +int transactionTimeoutMs = 10_000; +KafkaProducer producer = +new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); + +// consumer must be in read_committed mode, which means it won't be able to read uncommitted data +boolean readCommitted = true; +KafkaConsumer consumer = new Consumer( +"processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null) +.createKafkaConsumer(); + +// called first and once to fence zombies and abort any pending transaction +producer.initTransactions(); + +consumer.subscribe(singleton(inputTopic), this); + +Utils.printOut("Processing new records"); +while (!closed && remainingRecords > 0) { +try { +ConsumerRecords records = consumer.poll(ofMillis(200)); +if (!records.isEmpty()) { +// begin a new transaction session +producer.beginTransaction(); + +for (ConsumerRecord record : records) { +// process the record and send downstream +ProducerRecord newRecord = +new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); +producer.send(newRecord); +} + +// checkpoint the progress by sending offsets to group coordinator broker +// note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + +// commit the transaction including offsets +producer.commitTransaction(); +processedRecords += records.count(); } - -Map offsets = consumerOffsets(); - -// Checkpoint the progress by sending offsets to group coordinator broker. -// Note that this API is only available for broker >= 2.5. -producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); - -// Finish the transaction. All sent records should be visible for consumption now. -producer.commitTransaction(); -messageProcessed += records.count(); +} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { +Utils.printErr(e.getMessage()); +// we can't recover from these exceptions +shutdown(); +
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188387610 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188387610 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188386919 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertEquals(Collections.emptyMap(), groupAssignment.members()); +} + +@Test +public void testOneConsumerSubscribedToNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap() +) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new TreeMap<>(); + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap() +)); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +expectedAssignment.put(consumerA, mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1))); +expectedAssignment.put(consumerB, mkAssignment(mkTopicAssignment(topic1Uuid, 2))); +// Topic 3 Partitions Assignment + expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 0))); + expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid, 1))); + +