[jira] [Updated] (KAFKA-14256) Update to Scala 2.13.10

2023-05-09 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14256:
--
Summary: Update to Scala 2.13.10  (was: Update to Scala 2.13.9)

> Update to Scala 2.13.10
> ---
>
> Key: KAFKA-14256
> URL: https://issues.apache.org/jira/browse/KAFKA-14256
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Assignee: Matthew de Detrich
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14256) Update to Scala 2.13.10

2023-05-09 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14256:
--
Fix Version/s: 3.4.0

> Update to Scala 2.13.10
> ---
>
> Key: KAFKA-14256
> URL: https://issues.apache.org/jira/browse/KAFKA-14256
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Assignee: Matthew de Detrich
>Priority: Minor
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi opened a new pull request, #13697: MINOR:code optimization in QuorumController

2023-05-09 Thread via GitHub


hudeqi opened a new pull request, #13697:
URL: https://github.com/apache/kafka/pull/13697

   1. add hint in switch item "BROKER_LOGGER" in 
ConfigResourceExistenceChecker, otherwise, it will be classified as default 
break and deleted directly. I don’t know if adding hint is better than deleting 
directly.
   2. delete some unused variables and methods.
   3. add the "@Disabled" mark to a method in unit test that is not tested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-05-09 Thread via GitHub


showuon commented on code in PR #13679:
URL: https://github.com/apache/kafka/pull/13679#discussion_r1189322325


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -84,18 +105,30 @@ class SimpleApiVersionManager(
 throw new UnsupportedOperationException("This method is not supported in 
SimpleApiVersionManager, use apiVersionResponse(throttleTimeMs, 
finalizedFeatures, epoch) instead")
   }
 
-  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: 
Map[String, java.lang.Short], epoch: Long): ApiVersionsResponse = {
+  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: 
Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): 
ApiVersionsResponse = {
 ApiVersionsResponse.createApiVersionsResponse(
   throttleTimeMs,
   apiVersions,
   brokerFeatures,
   finalizedFeatures.asJava,
-  epoch,
+  finalizedFeaturesEpoch,
   zkMigrationEnabled
 )
   }
 }
 
+/**
+ * The default ApiVersionManager that supports forwarding and has metadata 
cache, used in broker and zk controller.
+ * When forwarding is enabled, the enabled apis are determined by the broker 
listener type and the controller apis,
+ * otherwise the enabled apis are determined by the broker listener type, 
which is the same with SimpleApiVersionManager.
+ *
+ * @param listenerType the listener type
+ * @param forwardingManager the forwarding manager,
+ * @param features
+ * @param metadataCache
+ * @param enableUnstableLastVersion
+ * @param zkMigrationEnabled

Review Comment:
   @dengziming thanks for adding the javadocs for these 2 classes. Very clear. 
But please remember to complete them. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi closed pull request #13617: MINOR:code optimization in QuorumController

2023-05-09 Thread via GitHub


hudeqi closed pull request #13617: MINOR:code optimization in QuorumController
URL: https://github.com/apache/kafka/pull/13617


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13617: MINOR:code optimization in QuorumController

2023-05-09 Thread via GitHub


hudeqi commented on PR #13617:
URL: https://github.com/apache/kafka/pull/13617#issuecomment-1541277135

   Too many invalid commits, re-submit another


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721163#comment-17721163
 ] 

hudeqi commented on KAFKA-14979:


[GitHub Pull Request #13692|https://github.com/apache/kafka/pull/13692] is 
expired.

> Incorrect lag was calculated when markPartitionsForTruncation in 
> ReplicaAlterLogDirsThread
> --
>
> Key: KAFKA-14979
> URL: https://issues.apache.org/jira/browse/KAFKA-14979
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> When the partitions of ReplicaFetcherThread finished truncating, the 
> ReplicaAlterLogDirsThread to which these partitions belong needs to be marked 
> truncate. The lag value in the newState (PartitionFetchState) obtained in 
> this process is still the original value (state.lag). If the truncationOffset 
> is smaller than the original state.fetchOffset, then the original lag value 
> is incorrect and needs to be updated. It should be the original lag value 
> plus the difference between the original state.fetchOffset and 
> truncationOffset.
> If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread 
> may set the wrong lag value for fetcherLagStats when executing 
> "processFetchRequest". So it might be more reasonable to recalculate a lag 
> value based on new state.fetchOffset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13692: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread via GitHub


hudeqi commented on PR #13692:
URL: https://github.com/apache/kafka/pull/13692#issuecomment-1541271115

   Too many invalid commits, re-submit another 
[PR](https://github.com/apache/kafka/pull/13696)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi closed pull request #13692: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread via GitHub


hudeqi closed pull request #13692: KAFKA-14979:Incorrect lag was calculated 
when markPartitionsForTruncation in ReplicaAlterLogDirsThread
URL: https://github.com/apache/kafka/pull/13692


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread via GitHub


hudeqi opened a new pull request, #13696:
URL: https://github.com/apache/kafka/pull/13696

   When the partitions of ReplicaFetcherThread finished truncating, the 
ReplicaAlterLogDirsThread to which these partitions belong needs to be marked 
truncate. The lag value in the newState (PartitionFetchState) obtained in this 
process is still the original value (state.lag). If the truncationOffset is 
smaller than the original state.fetchOffset, then the original lag value is 
incorrect and needs to be updated. It should be the original lag value plus the 
difference between the original state.fetchOffset and truncationOffset.
   
   If the original lag value is used incorrectly, then 
ReplicaAlterLogDirsThread may set the wrong lag value for fetcherLagStats when 
executing "processFetchRequest". So it might be more reasonable to recalculate 
a lag value based on new state.fetchOffset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189297499


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */

Review Comment:
   Looks good, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189296779


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of the 

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189296233


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of the 

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189295560


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of the 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189237663


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189235728


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189220058


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189231570


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189231170


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189231412


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189229928


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189229707


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189229522


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189220058


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ 

[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted

2023-05-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721114#comment-17721114
 ] 

Matthias J. Sax commented on KAFKA-14981:
-

Very interesting idea – given that we persist the thread-id (aka process-id) in 
the state directory on local disk, it could help. And even if we don't persist 
it (because there is no local storage), it seems no harm would be done if the 
id changes every single time.

Wondering if we would need a KIP for this. By gut feeling is no, but not sure.

> Set `group.instance.id` in streams consumer so that rebalance will not happen 
> if a instance is restarted
> 
>
> Key: KAFKA-14981
> URL: https://issues.apache.org/jira/browse/KAFKA-14981
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>
> `group.instance.id` enables static membership so that if a consumer is 
> restarted within `session.timeout.ms`, rebalance will not be triggered and 
> originally assignment can be returned directly from broker. We can set this 
> id in Kafka streams using `threadId` so that no rebalance is trigger within 
> `session.timeout.ms`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189216494


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189216148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189214614


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ 

[jira] [Created] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted

2023-05-09 Thread Hao Li (Jira)
Hao Li created KAFKA-14981:
--

 Summary: Set `group.instance.id` in streams consumer so that 
rebalance will not happen if a instance is restarted
 Key: KAFKA-14981
 URL: https://issues.apache.org/jira/browse/KAFKA-14981
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Hao Li


`group.instance.id` enables static membership so that if a consumer is 
restarted within `session.timeout.ms`, rebalance will not be triggered and 
originally assignment can be returned directly from broker. We can set this id 
in Kafka streams using `threadId` so that no rebalance is trigger within 
`session.timeout.ms`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189213049


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+ 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189212267


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */

Review Comment:
   I did it in a way that looks best to me in the next commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1189210485


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */

Review Comment:
   It looks a bit wonky after formatting it like that, I don't think there's a 
great way to add this html 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #13695: HOTFIX: fix the VersionedKeyValueToBytesStoreAdapter#isOpen API

2023-05-09 Thread via GitHub


ableegoldman commented on PR #13695:
URL: https://github.com/apache/kafka/pull/13695#issuecomment-1540876914

   @vcrfxia @mjsax pulled this fix out for the 3.5 release (cc @mimaison )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #13682: MINOR: improved exception/warn logging for stream-stream join store settings

2023-05-09 Thread via GitHub


ableegoldman commented on code in PR #13682:
URL: https://github.com/apache/kafka/pull/13682#discussion_r1189116797


##
streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java:
##
@@ -115,7 +115,7 @@ public boolean persistent() {
 
 @Override
 public boolean isOpen() {
-return inner.persistent();
+return inner.isOpen();

Review Comment:
   Good point. Here: https://github.com/apache/kafka/pull/13695



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman opened a new pull request, #13695: HOTFIX: fix the VersionedKeyValueToBytesStoreAdapter#isOpen API

2023-05-09 Thread via GitHub


ableegoldman opened a new pull request, #13695:
URL: https://github.com/apache/kafka/pull/13695

   The VersionedKeyValueToBytesStoreAdapter#isOpen API accidentally returns the 
value of `inner.persistent()` when it should be returning `inner.isOpen()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-09 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-14974:
--
Fix Version/s: 3.5.0

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0, 3.4.1, 3.3.3, 3.6.0
>
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2023-05-09 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721025#comment-17721025
 ] 

Alexandre Dupriez commented on KAFKA-7739:
--

Got it. Sorry I misunderstood your suggestion. Thanks for clarifying! (and 
agree with it).

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.6.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188898373


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ *  

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188893117


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ *  

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188893478


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {

Review Comment:
   nit: ` : `.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188891494


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ *  

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188891203


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+ 

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188873259


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+

[jira] [Commented] (KAFKA-14980) MirrorMaker consumers don't get configs prefixed with source.cluster

2023-05-09 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720993#comment-17720993
 ] 

Mickael Maison commented on KAFKA-14980:


cc [~ChrisEgerton] 

> MirrorMaker consumers don't get configs prefixed with source.cluster
> 
>
> Key: KAFKA-14980
> URL: https://issues.apache.org/jira/browse/KAFKA-14980
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: Mickael Maison
>Priority: Blocker
>
> As part of KAFKA-14021, we made a change to 
> MirrorConnectorConfig.sourceConsumerConfig() to grab all configs that start 
> with "source.". Previously it was grabbing configs prefixed with 
> "source.cluster.". 
> This means existing connector configuration stop working, as configurations 
> such as bootstrap.servers are not passed to source consumers.
> For example, the following connector configuration was valid in 3.4 and now 
> makes the connector tasks fail:
> {code:json}
> {
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "name": "source",
> "topics": "test",
> "tasks.max": "30",
> "source.cluster.alias": "one",
> "target.cluster.alias": "two",
> "source.cluster.bootstrap.servers": "localhost:9092",
>"target.cluster.bootstrap.servers": "localhost:29092"
> }
> {code}
> The connector attempts to start source consumers with bootstrap.servers = [] 
> and the task crash with 
> {noformat}
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:837)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:671)
>   at 
> org.apache.kafka.connect.mirror.MirrorUtils.newConsumer(MirrorUtils.java:59)
>   at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:103)
>   at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
>   at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
> bootstrap urls given in bootstrap.servers
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14980) MirrorMaker consumers don't get configs prefixed with source.cluster

2023-05-09 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14980:
--

 Summary: MirrorMaker consumers don't get configs prefixed with 
source.cluster
 Key: KAFKA-14980
 URL: https://issues.apache.org/jira/browse/KAFKA-14980
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.5.0
Reporter: Mickael Maison


As part of KAFKA-14021, we made a change to 
MirrorConnectorConfig.sourceConsumerConfig() to grab all configs that start 
with "source.". Previously it was grabbing configs prefixed with 
"source.cluster.". 

This means existing connector configuration stop working, as configurations 
such as bootstrap.servers are not passed to source consumers.

For example, the following connector configuration was valid in 3.4 and now 
makes the connector tasks fail:

{code:json}
{
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"name": "source",
"topics": "test",
"tasks.max": "30",
"source.cluster.alias": "one",
"target.cluster.alias": "two",
"source.cluster.bootstrap.servers": "localhost:9092",
   "target.cluster.bootstrap.servers": "localhost:29092"
}
{code}


The connector attempts to start source consumers with bootstrap.servers = [] 
and the task crash with 


{noformat}
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:837)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:671)
at 
org.apache.kafka.connect.mirror.MirrorUtils.newConsumer(MirrorUtils.java:59)
at 
org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:103)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
bootstrap urls given in bootstrap.servers
{noformat}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188858930


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188857767


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of the 

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188856580


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of the 

[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-09 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-14974:
--
Fix Version/s: 3.4.1
   3.3.3

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.4.1, 3.3.3, 3.6.0
>
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188832049


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));

Review 

[GitHub] [kafka] kirktrue closed pull request #13495: KAFKA-14274: Introduce FetchRequestManager to integrate fetch into new consumer threading refactor

2023-05-09 Thread via GitHub


kirktrue closed pull request #13495: KAFKA-14274: Introduce FetchRequestManager 
to integrate fetch into new consumer threading refactor
URL: https://github.com/apache/kafka/pull/13495


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188823115


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+ 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188821932


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+ 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188821039


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of members per topic using the given member 
subscriptions.
+ *  Generate a list of members called potentially unfilled members, 
which consists of members that have not met the minimum required quota of 
partitions for the assignment AND
+ * get a list called assigned sticky partitions per topic, which has the 
partitions that will be retained in the new assignment.
+ *  Generate a list of unassigned partitions by calculating the 
difference between the total partitions for the topic and the assigned (sticky) 
partitions. 
+ 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188819218


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188817725


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] urbandan commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics

2023-05-09 Thread via GitHub


urbandan commented on PR #13690:
URL: https://github.com/apache/kafka/pull/13690#issuecomment-1540445965

   Added a unit test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188811581


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * This Range Assignor inherits properties of both the range assignor and the 
sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range)
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+/**
+ * Pair of memberId and remaining partitions to meet the quota.
+ */
+private static class MemberWithRemainingAssignments {
+/**
+ * Member Id.
+ */
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota.
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+}
+
+/**
+ * @return Map of topicIds to a list of members subscribed to them.
+ */
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given 
member subscriptions.
+ *  Generate a list of members 
(potentiallyUnfilledMembers) that have not met the minimum 
required quota of partitions for the assignment AND
+ * get a list (assignedStickyPartitionsPerTopic) of 
partitions that will be retained in the new assignment.
+ *  Add members from the potentiallyUnfilledMembers list 
to the unfilledMembersPerTopic map if they haven't met the total 
required quota
+ * i.e. minRequiredQuota + 1, if the member is designated to receive one 
of 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-09 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1188772104


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala:
##
@@ -160,6 +160,7 @@ class GroupMetadataTest {
 assertThrows(classOf[IllegalStateException], () => 
group.transitionTo(CompletingRebalance))
   }
 
+  @Test

Review Comment:
   created https://github.com/apache/kafka/pull/13694



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13694: MINOR: add test tag for testDeadToDeadIllegalTransition

2023-05-09 Thread via GitHub


jeffkbkim opened a new pull request, #13694:
URL: https://github.com/apache/kafka/pull/13694

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-09 Thread via GitHub


dajac commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1188744749


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala:
##
@@ -160,6 +160,7 @@ class GroupMetadataTest {
 assertThrows(classOf[IllegalStateException], () => 
group.transitionTo(CompletingRebalance))
   }
 
+  @Test

Review Comment:
   Nice catch! Do you mind doing a separate PR for this one? This way we can 
keep changes in this PR scoped in the new module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2023-05-09 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720955#comment-17720955
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-7739:
-

Yes, we know transactional index is optional when copying but not when 
fetching. I agree throwing the exception should be correct, but the javadoc is 
ambiguous as it refers to "no resources associated with the given 
remoteLogSegmentMetadata" that may mean _all_ resources, instead of "the 
requested resource is not found in the remote storage" or similar.

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.6.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on PR #13443:
URL: https://github.com/apache/kafka/pull/13443#issuecomment-1540289311

   @rreddy-22 The build failed due to compilation errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-09 Thread via GitHub


dajac merged PR #13644:
URL: https://github.com/apache/kafka/pull/13644


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-09 Thread via GitHub


jeffkbkim commented on PR #13644:
URL: https://github.com/apache/kafka/pull/13644#issuecomment-1540246709

   @dajac the test failure is unrelated. can we merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics

2023-05-09 Thread via GitHub


C0urante commented on PR #13690:
URL: https://github.com/apache/kafka/pull/13690#issuecomment-1540232079

   Tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14414) Remove unnecessary usage of ObjectSerializationCache

2023-05-09 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14414:
-
Attachment: producer-only-cpu-33-clean.html

> Remove unnecessary usage of ObjectSerializationCache
> 
>
> Key: KAFKA-14414
> URL: https://issues.apache.org/jira/browse/KAFKA-14414
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.4.0
>
> Attachments: Screenshot 2022-11-21 at 19.23.53.png, 
> producer-only-cpu-33-clean.html
>
>
> We create an instance of ObjectSerializationCache  at 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L113]
>  which does not get used at all. We always "add" to the cache but never 
> retrieve from it (as is evident by the fact that we don't store the reference 
> of the cache anywhere).
> Adding information to the cache is expensive because it uses 
> System.identityHashCode(Object) which is expensive as demonstrated by the 
> flame graph of producer requests over Apache Kafka 3.3.1 plaintext broker. 
> {{!Screenshot 2022-11-21 at 19.23.53.png!}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-05-09 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1540199229

   > bump this pr, Is there any other commiters to help review and merge this 
pr?
   
   I'm sorry I didn't notice the comment ("We're past code freeze for 3.5 so 
moving this to the next release") in the corresponding 
[jira](https://issues.apache.org/jira/browse/KAFKA-14866)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13693: MINOR: fix a small typo in SharedServer.scala

2023-05-09 Thread via GitHub


machi1990 opened a new pull request, #13693:
URL: https://github.com/apache/kafka/pull/13693

   I noticed this typo as I was going through the SharedServer.scala file
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2023-05-09 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720947#comment-17720947
 ] 

Alexandre Dupriez commented on KAFKA-7739:
--

I would think that the absence of the log segment or any mandatory ancillary 
file results in the stated exception. I think only transaction indexes are 
optional.

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.6.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-05-09 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1540185668

   bump this pr, Is there any other commiters to help review and merge this pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-09 Thread via GitHub


Hangleton commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1188635591


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,32 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+   * 1. largestOffset < 0

Review Comment:
   Nit: we can remove (1) since (1) => (2).



##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,32 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+   * 1. largestOffset < 0
+   * 2. largestOffset - baseOffset < 0
+   * 3. largestOffset - baseOffset > Integer.MAX_VALUE
+   */
+  @ParameterizedTest
+  @CsvSource(Array(
+"0, -2147483648",
+"0, 2147483648",
+"1, 0",
+"100, 10",
+"2147483648, 0",
+"-2147483648, 0",
+"2147483648,4294967296"

Review Comment:
   nit: Note that the max int value on the Java platform is `2147483647` so we 
are exercising max int + 1.



##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,32 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * LogSegmentOffsetOverflowException should be thrown while appending the 
logs if:
+   * 1. largestOffset < 0
+   * 2. largestOffset - baseOffset < 0
+   * 3. largestOffset - baseOffset > Integer.MAX_VALUE
+   */
+  @ParameterizedTest
+  @CsvSource(Array(
+"0, -2147483648",
+"0, 2147483648",
+"1, 0",
+"100, 10",
+"2147483648, 0",
+"-2147483648, 0",
+"2147483648,4294967296"
+  ))
+  def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, 
largestOffset: Long): Unit = {

Review Comment:
   What is the expected behaviour for `baseOffset == largestOffset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akatona84 commented on pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-05-09 Thread via GitHub


akatona84 commented on PR #11565:
URL: https://github.com/apache/kafka/pull/11565#issuecomment-1540178836

   rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #13692: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread via GitHub


hudeqi opened a new pull request, #13692:
URL: https://github.com/apache/kafka/pull/13692

   When the partitions of ReplicaFetcherThread finished truncating, the 
ReplicaAlterLogDirsThread to which these partitions belong needs to be marked 
truncate. The lag value in the newState (PartitionFetchState) obtained in this 
process is still the original value (state.lag). If the truncationOffset is 
smaller than the original state.fetchOffset, then the original lag value is 
incorrect and needs to be updated. It should be the original lag value plus the 
difference between the original state.fetchOffset and truncationOffset.
   
   If the original lag value is used incorrectly, then 
ReplicaAlterLogDirsThread may set the wrong lag value for fetcherLagStats when 
executing "processFetchRequest". So it might be more reasonable to recalculate 
a lag value based on new state.fetchOffset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-14979:
---
Description: 
When the partitions of ReplicaFetcherThread finished truncating, the 
ReplicaAlterLogDirsThread to which these partitions belong needs to be marked 
truncate. The lag value in the newState (PartitionFetchState) obtained in this 
process is still the original value (state.lag). If the truncationOffset is 
smaller than the original state.fetchOffset, then the original lag value is 
incorrect and needs to be updated. It should be the original lag value plus the 
difference between the original state.fetchOffset and truncationOffset.
If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread 
may set the wrong lag value for fetcherLagStats when executing 
"processFetchRequest". So it might be more reasonable to recalculate a lag 
value based on new state.fetchOffset.

  was:
When the partitions of ReplicaFetcherThread finished truncating, the 
ReplicaAlterLogDirsThread to which these partitions belong needs to be marked 
truncate. The lag value in the newState (PartitionFetchState) obtained in this 
process is still the original value (state.lag). If the truncationOffset is 
smaller than the original state.fetchOffset, then the original lag value is 
incorrect and needs to be updated. It should be the original lag value plus the 
difference between the original state.fetchOffset and truncationOffset.
If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread 
may set the wrong lag value for fetcherLagStats when executing 
processFetchRequest. So it might be more reasonable to recalculate a lag value 
based on new state.fetchOffset.


> Incorrect lag was calculated when markPartitionsForTruncation in 
> ReplicaAlterLogDirsThread
> --
>
> Key: KAFKA-14979
> URL: https://issues.apache.org/jira/browse/KAFKA-14979
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> When the partitions of ReplicaFetcherThread finished truncating, the 
> ReplicaAlterLogDirsThread to which these partitions belong needs to be marked 
> truncate. The lag value in the newState (PartitionFetchState) obtained in 
> this process is still the original value (state.lag). If the truncationOffset 
> is smaller than the original state.fetchOffset, then the original lag value 
> is incorrect and needs to be updated. It should be the original lag value 
> plus the difference between the original state.fetchOffset and 
> truncationOffset.
> If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread 
> may set the wrong lag value for fetcherLagStats when executing 
> "processFetchRequest". So it might be more reasonable to recalculate a lag 
> value based on new state.fetchOffset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-14979:
---
Description: 
When the partitions of ReplicaFetcherThread finished truncating, the 
ReplicaAlterLogDirsThread to which these partitions belong needs to be marked 
truncate. The lag value in the newState (PartitionFetchState) obtained in this 
process is still the original value (state.lag). If the truncationOffset is 
smaller than the original state.fetchOffset, then the original lag value is 
incorrect and needs to be updated. It should be the original lag value plus the 
difference between the original state.fetchOffset and truncationOffset.
If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread 
may set the wrong lag value for fetcherLagStats when executing 
processFetchRequest. So it might be more reasonable to recalculate a lag value 
based on new state.fetchOffset.

> Incorrect lag was calculated when markPartitionsForTruncation in 
> ReplicaAlterLogDirsThread
> --
>
> Key: KAFKA-14979
> URL: https://issues.apache.org/jira/browse/KAFKA-14979
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> When the partitions of ReplicaFetcherThread finished truncating, the 
> ReplicaAlterLogDirsThread to which these partitions belong needs to be marked 
> truncate. The lag value in the newState (PartitionFetchState) obtained in 
> this process is still the original value (state.lag). If the truncationOffset 
> is smaller than the original state.fetchOffset, then the original lag value 
> is incorrect and needs to be updated. It should be the original lag value 
> plus the difference between the original state.fetchOffset and 
> truncationOffset.
> If the original lag value is used incorrectly, then ReplicaAlterLogDirsThread 
> may set the wrong lag value for fetcherLagStats when executing 
> processFetchRequest. So it might be more reasonable to recalculate a lag 
> value based on new state.fetchOffset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2023-05-09 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720936#comment-17720936
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-7739:
-

[~satish.duggana] could you help clarifying the following:

RemoteStorageManager#fetchIndex and RemoteStorageManager#fetchLogSegment state:

>      * @throws RemoteResourceNotFoundException when there are no resources 
> associated with the given remoteLogSegmentMetadata.

Though when fetching indexes: Transactional Index is optional. So when 
returning a response should it be null? or throw 
RemoteResourceNotFoundException?

>From the javadoc is unclear whether resources means a specific object/index or 
>the whole segment log + metadata.

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.6.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

2023-05-09 Thread via GitHub


fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1188609885


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+try {
+// it is recommended to have a relatively short txn timeout in 
order to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+KafkaProducer producer =
+new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();
+
+// consumer must be in read_committed mode, which means it won't 
be able to read uncommitted data
+boolean readCommitted = true;
+KafkaConsumer consumer = new Consumer(
+"processor-consumer", bootstrapServers, inputTopic, 
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+.createKafkaConsumer();
+
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
-
-Map offsets = 
consumerOffsets();
-
-// Checkpoint the progress by sending offsets to group 
coordinator broker.
-// Note that this API is only available for broker >= 2.5.
-producer.sendOffsetsToTransaction(offsets, 
consumer.groupMetadata());
-
-// Finish the transaction. All sent records should be 
visible for consumption now.
-producer.commitTransaction();
-messageProcessed += records.count();
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {

Review Comment:
   Well, this is consistent with the consumer. The exception list here is 
shorter and much more compact (no repeated instanceof). If you don't 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

2023-05-09 Thread via GitHub


fvaleri commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1188609481


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+try {
+// it is recommended to have a relatively short txn timeout in 
order to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+KafkaProducer producer =
+new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();
+
+// consumer must be in read_committed mode, which means it won't 
be able to read uncommitted data
+boolean readCommitted = true;
+KafkaConsumer consumer = new Consumer(
+"processor-consumer", bootstrapServers, inputTopic, 
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+.createKafkaConsumer();
+
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
-
-Map offsets = 
consumerOffsets();
-
-// Checkpoint the progress by sending offsets to group 
coordinator broker.
-// Note that this API is only available for broker >= 2.5.
-producer.sendOffsetsToTransaction(offsets, 
consumer.groupMetadata());
-
-// Finish the transaction. All sent records should be 
visible for consumption now.
-producer.commitTransaction();
-messageProcessed += records.count();
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+Utils.printErr(e.getMessage());
+// we can't recover from these exceptions
+shutdown();
+ 

[jira] [Assigned] (KAFKA-14965) Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new consumer threading refactor

2023-05-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-14965:
--

Assignee: Lianet Magrans

> Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new 
> consumer threading refactor
> -
>
> Key: KAFKA-14965
> URL: https://issues.apache.org/jira/browse/KAFKA-14965
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> This task introduces new functionality for handling ListOffsetsRequests for 
> the new consumer implementation, as part for the ongoing work for the 
> consumer threading model refactor.
> This task introduces a new class named {{ListOffsetsRequestManager, 
> }}responsible of handling ListOffsets requests performed by the consumer to 
> expose functionality like beginningOffsets, endOffsets and offsetsForTimes. 
> The Offset{{{}Fetcher{}}} class is used internally by the {{KafkaConsumer}} 
> to list offsets, so this task will be based on a refactored 
> Offset{{{}Fetcher{}}},  reusing the fetching logic as much as possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14966) Extract reusable common logic from OffsetFetcher

2023-05-09 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-14966:
--

Assignee: Lianet Magrans

> Extract reusable common logic from OffsetFetcher
> 
>
> Key: KAFKA-14966
> URL: https://issues.apache.org/jira/browse/KAFKA-14966
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, 
> validate and reset positions. 
> For the new consumer based on a refactored threading model, similar 
> functionality will be needed by the ListOffsetsRequestManager component. 
> This task aims at identifying and extracting the OffsetFetcher functionality 
> that can be reused by the new consumer implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-09 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1188566365


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig,
  _: FencedLeaderEpochException |
  _: ReplicaNotAvailableException |
  _: KafkaStorageException |
- _: OffsetOutOfRangeException |
  _: InconsistentTopicIdException) =>
-  LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-divergingEpoch = None,
-highWatermark = UnifiedLog.UnknownOffset,
-leaderLogStartOffset = UnifiedLog.UnknownOffset,
-leaderLogEndOffset = UnifiedLog.UnknownOffset,
-followerLogStartOffset = UnifiedLog.UnknownOffset,
-fetchTimeMs = -1L,
-lastStableOffset = None,
-exception = Some(e))
+  createLogReadResult(e)
+case e: OffsetOutOfRangeException =>
+  // In case of offset out of range errors, check for remote log 
manager for non-compacted topics
+  // to fetch from remote storage. `log` instance should not be null 
here as that would have been caught earlier
+  // with NotLeaderForPartitionException or 
ReplicaNotAvailableException.
+  // If it is from a follower then send the offset metadata only as 
the data is already available in remote
+  // storage.
+  if (remoteLogManager.isDefined && log != null && 
log.remoteLogEnabled() &&
+// Check that the fetch offset is within the offset range within 
the remote storage layer.
+log.logStartOffset <= offset && offset < 
log.localLogStartOffset()) {
+// For follower fetch requests, throw an error saying that this 
offset is moved to tiered storage.
+val highWatermark = log.highWatermark
+val leaderLogStartOffset = log.logStartOffset

Review Comment:
   It is fine as the offset can always be updated, we will send whatever is the 
value available at that time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi reassigned KAFKA-14979:
--

Assignee: hudeqi

> Incorrect lag was calculated when markPartitionsForTruncation in 
> ReplicaAlterLogDirsThread
> --
>
> Key: KAFKA-14979
> URL: https://issues.apache.org/jira/browse/KAFKA-14979
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14979) Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-09 Thread hudeqi (Jira)
hudeqi created KAFKA-14979:
--

 Summary: Incorrect lag was calculated when 
markPartitionsForTruncation in ReplicaAlterLogDirsThread
 Key: KAFKA-14979
 URL: https://issues.apache.org/jira/browse/KAFKA-14979
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 3.3.2
Reporter: hudeqi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rhauch merged pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog

2023-05-09 Thread via GitHub


rhauch merged PR #13688:
URL: https://github.com/apache/kafka/pull/13688


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-09 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-14974:
--
Fix Version/s: 3.6.0

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-09 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1188479417


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig,
  _: FencedLeaderEpochException |
  _: ReplicaNotAvailableException |
  _: KafkaStorageException |
- _: OffsetOutOfRangeException |
  _: InconsistentTopicIdException) =>
-  LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-divergingEpoch = None,
-highWatermark = UnifiedLog.UnknownOffset,
-leaderLogStartOffset = UnifiedLog.UnknownOffset,
-leaderLogEndOffset = UnifiedLog.UnknownOffset,
-followerLogStartOffset = UnifiedLog.UnknownOffset,
-fetchTimeMs = -1L,
-lastStableOffset = None,
-exception = Some(e))
+  createLogReadResult(e)
+case e: OffsetOutOfRangeException =>
+  // In case of offset out of range errors, check for remote log 
manager for non-compacted topics
+  // to fetch from remote storage. `log` instance should not be null 
here as that would have been caught earlier
+  // with NotLeaderForPartitionException or 
ReplicaNotAvailableException.
+  // If it is from a follower then send the offset metadata only as 
the data is already available in remote
+  // storage.
+  if (remoteLogManager.isDefined && log != null && 
log.remoteLogEnabled() &&
+// Check that the fetch offset is within the offset range within 
the remote storage layer.
+log.logStartOffset <= offset && offset < 
log.localLogStartOffset()) {

Review Comment:
   That should work fine because it will eventually throw offset out-of-range 
error if the target offset does not exist. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-09 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1188470147


##
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogReaderTest {
+RemoteLogManager mockRLM = mock(RemoteLogManager.class);
+LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
+Records records = mock(Records.class);
+
+
+@Test
+public void testRemoteLogReaderWithoutError() throws 
RemoteStorageException, IOException {
+FetchDataInfo fetchDataInfo = new FetchDataInfo(logOffsetMetadata, 
records);
+
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
+
+Consumer callback = mock(Consumer.class);
+RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, 
false);
+RemoteLogReader remoteLogReader = new 
RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+remoteLogReader.call();
+
+// verify the callback did get invoked with the expected 
remoteLogReadResult
+ArgumentCaptor remoteLogReadResultArg = 
ArgumentCaptor.forClass(RemoteLogReadResult.class);
+verify(callback, times(1)).accept(remoteLogReadResultArg.capture());
+RemoteLogReadResult actualRemoteLogReadResult = 
remoteLogReadResultArg.getValue();
+assertFalse(actualRemoteLogReadResult.error.isPresent());
+assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
+assertEquals(fetchDataInfo, 
actualRemoteLogReadResult.fetchDataInfo.get());
+}
+
+@Test
+public void testRemoteLogReaderWithError() throws RemoteStorageException, 
IOException {

Review Comment:
   We will add more tests in followup PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lanshiqin commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…

2023-05-09 Thread via GitHub


lanshiqin commented on code in PR #13584:
URL: https://github.com/apache/kafka/pull/13584#discussion_r1188451651


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -65,6 +68,29 @@ class LogSegmentTest {
 Utils.delete(logDir)
   }
 
+  /**
+   * If the maximum offset beyond index, appended to the log section, it 
throws LogSegmentOffsetOverflowException

Review Comment:
   Good idea. I've modified it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-09 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1188387671


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +623,210 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int firstBatchSize = firstBatch.sizeInBytes();
+// An empty record is sent instead of an incomplete batch when
+//  - there is no minimum-one-message constraint and
+//  - the first batch size is more than maximum bytes that can be 
sent.
+//  - for FetchRequest version 3 or above and
+if (!remoteStorageFetchInfo.minOneMessage &&
+!remoteStorageFetchInfo.hardMaxBytesLimit &&
+firstBatchSize > maxBytes) {
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+}
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatchSize;
+
+if (remainingBytes > 0) {
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(
+new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+MemoryRecords.readableRecords(buffer));
+ 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-09 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1188387671


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +623,210 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int firstBatchSize = firstBatch.sizeInBytes();
+// An empty record is sent instead of an incomplete batch when
+//  - there is no minimum-one-message constraint and
+//  - the first batch size is more than maximum bytes that can be 
sent.
+//  - for FetchRequest version 3 or above and
+if (!remoteStorageFetchInfo.minOneMessage &&
+!remoteStorageFetchInfo.hardMaxBytesLimit &&
+firstBatchSize > maxBytes) {
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+}
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatchSize;
+
+if (remainingBytes > 0) {
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(
+new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+MemoryRecords.readableRecords(buffer));
+ 

[GitHub] [kafka] showuon commented on pull request #13691: MINOR: remove kraft readme link

2023-05-09 Thread via GitHub


showuon commented on PR #13691:
URL: https://github.com/apache/kafka/pull/13691#issuecomment-1539857562

   @dengziming , good point! I've updated it to `the KRaft 
section for details.` to link it to the KRaft section in the doc. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-05-09 Thread via GitHub


showuon commented on PR #13679:
URL: https://github.com/apache/kafka/pull/13679#issuecomment-1539844838

   Also, checkstyle failed. Please fix them. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-05-09 Thread via GitHub


showuon commented on code in PR #13679:
URL: https://github.com/apache/kafka/pull/13679#discussion_r1188407891


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -80,7 +81,18 @@ class SimpleApiVersionManager(
   private val apiVersions = 
ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
 
   override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse 
= {
-ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
apiVersions, brokerFeatures, zkMigrationEnabled)
+throw new UnsupportedOperationException("This method is not supported in 
SimpleApiVersionManager, use apiVersionResponse(throttleTimeMs, 
finalizedFeatures, epoch) instead")

Review Comment:
   Actually it's not clear when to use `SimpleApiVersionManager` and when to 
use `DefaultApiVersionManager`. Could we add some comments above each class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics

2023-05-09 Thread via GitHub


urbandan commented on PR #13690:
URL: https://github.com/apache/kafka/pull/13690#issuecomment-1539809972

   Thanks @viktorsomogyi !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13517: KAFKA-14752: Kafka examples improvements - demo changes

2023-05-09 Thread via GitHub


showuon commented on PR #13517:
URL: https://github.com/apache/kafka/pull/13517#issuecomment-1539806138

   Please resolve the conflict. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13516: KAFKA-14752: Kafka examples improvements - processor changes

2023-05-09 Thread via GitHub


showuon commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1188380635


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -79,111 +86,114 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+try {
+// it is recommended to have a relatively short txn timeout in 
order to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+KafkaProducer producer =
+new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();
+
+// consumer must be in read_committed mode, which means it won't 
be able to read uncommitted data
+boolean readCommitted = true;
+KafkaConsumer consumer = new Consumer(
+"processor-consumer", bootstrapServers, inputTopic, 
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
+.createKafkaConsumer();
+
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
-
-Map offsets = 
consumerOffsets();
-
-// Checkpoint the progress by sending offsets to group 
coordinator broker.
-// Note that this API is only available for broker >= 2.5.
-producer.sendOffsetsToTransaction(offsets, 
consumer.groupMetadata());
-
-// Finish the transaction. All sent records should be 
visible for consumption now.
-producer.commitTransaction();
-messageProcessed += records.count();
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+Utils.printErr(e.getMessage());
+// we can't recover from these exceptions
+shutdown();
+ 

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188387610


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188387610


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-05-09 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1188386919


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertEquals(Collections.emptyMap(), groupAssignment.members());
+}
+
+@Test
+public void testOneConsumerSubscribedToNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap()
+)
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new TreeMap<>();
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap()
+));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new 
HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.put(consumerA, 
mkAssignment(mkTopicAssignment(topic1Uuid, 0, 1)));
+expectedAssignment.put(consumerB, 
mkAssignment(mkTopicAssignment(topic1Uuid, 2)));
+// Topic 3 Partitions Assignment
+
expectedAssignment.get(consumerA).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 0)));
+
expectedAssignment.get(consumerB).putAll(mkAssignment(mkTopicAssignment(topic3Uuid,
 1)));
+
+

  1   2   >