Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-13 Thread via GitHub


dajac merged PR #15785:
URL: https://github.com/apache/kafka/pull/15785


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1589199642


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,17 +1305,25 @@ private 
CoordinatorResult consumerGr
 }
 }
 
-// The subscription metadata is updated in two cases:
-// 1) The member has updated its subscriptions;
-// 2) The refresh deadline has been reached.
+int groupEpoch = group.groupEpoch();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
 Map subscribedTopicNamesMap = 
group.subscribedTopicNames();
+SubscriptionType subscriptionType = group.subscriptionType();
+
 if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
 subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
 subscriptionMetadata = group.computeSubscriptionMetadata(
 subscribedTopicNamesMap,
 metadataImage.topics(),
 metadataImage.cluster()
 );
+subscriptionType = ConsumerGroup.subscriptionType(
+subscribedTopicNamesMap,
+group.numMembers()

Review Comment:
   I just merged it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-03 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1589109858


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,17 +1305,25 @@ private 
CoordinatorResult consumerGr
 }
 }
 
-// The subscription metadata is updated in two cases:
-// 1) The member has updated its subscriptions;
-// 2) The refresh deadline has been reached.
+int groupEpoch = group.groupEpoch();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
 Map subscribedTopicNamesMap = 
group.subscribedTopicNames();
+SubscriptionType subscriptionType = group.subscriptionType();
+
 if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
 subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
 subscriptionMetadata = group.computeSubscriptionMetadata(
 subscribedTopicNamesMap,
 metadataImage.topics(),
 metadataImage.cluster()
 );
+subscriptionType = ConsumerGroup.subscriptionType(
+subscribedTopicNamesMap,
+group.numMembers()

Review Comment:
   Once https://github.com/apache/kafka/pull/15847 is merged, I think that we 
can easily compute the numMembers as follow:
   
   ```
   int numMembers = group.numMembers();
   if (!group.hasMember(updatedMember.memberId() && !staticMemberReplaced) {
  numMembers++;
   }
   ```
   
   If the member did not exist and we are not replacing a static member, we 
increment by one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587922375


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   > we need to somehow get the accurate total member count and use this only
   
   We can get the current count and update it locally when a new member is 
created and we don't replace a static member, I suppose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587912985


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   > For checking the referenceCount, is it possible to have something like
   > 
   > ```
   > member A - topic-1, topic-2
   > member B - topic-2, topic-3
   > member C - topic-1, topic-3
   > ```
   
   Good point. So we need to use the group size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587912168


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   > > thinking about #15798, it may not be true
   > 
   > Yes this isn't true. I can add a check to prevent empty subscribed topic 
list form joining.
   
   This is not possible as we must support the current protocol as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587887974


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   ah yes! I remember now that this is why I chose to do total number of 
members in the group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587889094


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   we need to somehow get the accurate total member count and use this only



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587887974


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   ah yes! I remember now that this is why I chose to do total number of 
members 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587887974


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   ah yes! I remember now that this is why I chose to do number of members 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dongnuo123 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587655470


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   > thinking about https://github.com/apache/kafka/pull/15798, it may not be 
true
   
   Yes this isn't true. I can add a check to prevent empty subscribed topic 
list form joining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dongnuo123 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587657454


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   For checking the referenceCount, is it possible to have
   ```
   member A - topic-1, topic-2
   member B - topic-2, topic-3
   member C - topic-1, topic-3
   ```



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   For checking the referenceCount, is it possible to have something like
   ```
   member A - topic-1, topic-2
   member B - topic-2, topic-3
   member C - topic-1, topic-3
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dongnuo123 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587655470


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   > thinking about https://github.com/apache/kafka/pull/15798, it may not be 
true
   
   Yes this is correct. I can add a check to prevent empty subscribed topic 
list form joining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587228256


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
+Map subscribedTopicNamesMap = 
group.subscribedTopicNames();

Review Comment:
   While here, could we move those two 
[lines](https://github.com/apache/kafka/pull/15785/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R1275)
 to here. It makes sense to keep all those together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587225960


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,7 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+
.withSubscriptionType(ConsumerGroup.subscriptionType(subscribedTopicNamesMap))

Review Comment:
   I was thinking a little more about this one and I still believe that we 
should rather do it right after computing `subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);` because there are 
cases where we would end up here but without needing to re-compute it (e.g. 
staticMemberReplaced is true).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587222161


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   We don't allow subscribing with no topic names with the consumer protocol so 
this logic works in this case. However, thinking about 
https://github.com/apache/kafka/pull/15798, it may not be true when the classic 
protocol is used to join a consumer group. @dongnuo123 Do you confirm?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-02 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1587219947


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.

Review Comment:
   Sorry. I was not clear in my previous comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-05-01 Thread via GitHub


dongnuo123 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1586420616


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,53 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ *
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames
+) {
+if (subscribedTopicNames.isEmpty()) {
+return HOMOGENEOUS;
+}
+
+// Take the subscriber count of the first topic as the reference.
+int referenceCount = subscribedTopicNames.values().iterator().next();

Review Comment:
   Do we need to check if referenceCount is equal to member count?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-30 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1585160959


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.

Review Comment:
   I thought you wanted it above the if statement in the last comment haha, I 
think it was there originally
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-30 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1585158392


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
+Map subscribedTopicNamesMap = 
group.subscribedTopicNames();

Review Comment:
   we can't cause the list is also named subscribedTopicNames right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-30 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1585155736


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,55 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * If all the members are subscribed to the same set of topics, the type 
is homogeneous.
+ * Otherwise, it is heterogeneous.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;

Review Comment:
   nit: Let's add an empty line before this one in order to match the style in 
the file.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.
+Map subscribedTopicNamesMap = 
group.subscribedTopicNames();

Review Comment:
   nit: Should we use `subscribedTopicNames` too?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+// The subscription metadata is updated in two cases:
+// 1) The member has updated its subscriptions;
+// 2) The refresh deadline has been reached.

Review Comment:
   nit: Could we move it to right before `subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +979,55 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map computeSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
+maybeUpdateSubscribedTopicNames(
+subscribedTopicNames,
+oldMember,
+newMember
+);
+return subscribedTopicNames;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * If all the members are subscribed to the same set of topics, the type 
is homogeneous.
+ * Otherwise, it is heterogeneous.

Review Comment:
   nit: We could remove this as it is already in the `@return`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583450532


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map updateSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicCount = new 
HashMap<>(this.subscribedTopicNames);
+if (oldMember != null) {
+oldMember.subscribedTopicNames().forEach(topicName ->
+subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
+);
+}
+
+if (newMember != null) {
+newMember.subscribedTopicNames().forEach(topicName ->
+subscribedTopicCount.compute(topicName, 
ConsumerGroup::incValue)
+);
+}
+
+return subscribedTopicCount;
+}
+
+/**
+ * Compute the subscription type of the consumer group.
+ *
+ * If all members are subscribed to the same set of topics, the type is 
homogeneous.
+ * Otherwise, it is heterogeneous.
+ *
+ * @param subscribedTopicNames  A map of topic names to the count of 
members subscribed to each topic.
+ * @param numOfMembers  The total number of members in the 
group.
+ * @param subscriptionType  The current subscription type of the 
group.
+ * @return {@link SubscriptionType#HOMOGENEOUS} if all members are 
subscribed to exactly the same topics;
+ * otherwise, {@link SubscriptionType#HETEROGENEOUS}.
+ */
+public static SubscriptionType subscriptionType(
+Map subscribedTopicNames,
+int numOfMembers,
+SubscriptionType subscriptionType
+) {
+if (subscribedTopicNames.isEmpty()) return subscriptionType;

Review Comment:
   discussed offline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I initialized it in the method to update, and we return the map. Currently 
there is no getter for the map, I can create another getter to return the map 
and not just the list of subscribed topic names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I initialized it in the method to update, and we return the map. Currently 
there is no getter for the map, I can create another getter to return the map 
and not just the subscribed topic names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I initialized it in the method to update, and we return the map. THe map 
itself is private so I can't initialize it directly, unless I create another 
getter to return the map and not just the subscribed topic names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r158025


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+.withSubscriptionType(ConsumerGroup.subscriptionType(
+subscribedTopicsMemberCount,
+group.numMembers(),

Review Comment:
   An alternative may be to compare the counts within the Map without 
considering the group size. This could work because we do not accept empty 
subscriptions from a member. -> this works as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583332135


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+.withSubscriptionType(ConsumerGroup.subscriptionType(
+subscribedTopicsMemberCount,
+group.numMembers(),

Review Comment:
   yeah I was actually thinking the same, I saw that the member map is updated 
in the getOrCreateMember step so I figured the memberCount is updated, unless a 
member is removed, I'm not sure if that updates elsewhere, or I might be 
overlooking something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583314465


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I think it was originally named like that since we pass a list of 
subscribedTopic names in the heartbeat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583305142


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+.withSubscriptionType(ConsumerGroup.subscriptionType(
+subscribedTopicsMemberCount,
+group.numMembers(),
+group.subscriptionType()
+))

Review Comment:
   I figured we don't need to compute it if it's not necessary since we aren't 
updating the actual value anyways. It is only required in this if block when 
the target assignment builder is initialized right right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583305142


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+.withSubscriptionType(ConsumerGroup.subscriptionType(
+subscribedTopicsMemberCount,
+group.numMembers(),
+group.subscriptionType()
+))

Review Comment:
   I figured we don't need to compute it if it's not necessary since we aren't 
updating the actual value anyways. It is only required in this if block right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I initialized it in the method to update, and we return the map



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   I initialized it in the method to update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-29 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1582678791


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();

Review Comment:
   This is incorrect, I think. We should initialize it with the current 
subscribed topic names. Otherwise, we will use an empty map later one. I wonder 
if we should also call it `subscribedTopicNames` to be consistent.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+.withSubscriptionType(ConsumerGroup.subscriptionType(
+subscribedTopicsMemberCount,
+group.numMembers(),

Review Comment:
   Note that the number of members may be incorrect here. For instance, when a 
new member joins, it is not accounted into it yet. I wonder if we could do the 
same without it. Or, we need to adjust it base on whether the member is new or 
not.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map updateSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicCount = new 
HashMap<>(this.subscribedTopicNames);
+if (oldMember != null) {
+oldMember.subscribedTopicNames().forEach(topicName ->
+subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
+);
+}
+
+if (newMember != null) {
+newMember.subscribedTopicNames().forEach(topicName ->
+subscribedTopicCount.compute(topicName, 
ConsumerGroup::incValue)
+);
+}
+

Review Comment:
   Could we reuse `maybeUpdateSubscribedTopicNames`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1350,6 +1351,11 @@ private 
CoordinatorResult consumerGr
 .withMembers(group.members())
 .withStaticMembers(group.staticMembers())
 .withSubscriptionMetadata(subscriptionMetadata)
+.withSubscriptionType(ConsumerGroup.subscriptionType(
+subscribedTopicsMemberCount,
+group.numMembers(),
+group.subscriptionType()
+))

Review Comment:
   I would prefer to have this right after calling `updateSubscribedTopicNames` 
in order to keep all the updates together. I would also call it 
`computeSubscriptionType`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1307,13 +1307,14 @@ private 
CoordinatorResult consumerGr
 }
 }
 
+Map subscribedTopicsMemberCount = new HashMap<>();
 if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+subscribedTopicsMemberCount = 
group.updateSubscribedTopicNames(member, updatedMember);

Review Comment:
   nit: Should we call it `computeSubscribedTopicNames` to follow 
`computeSubscriptionMetadata`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription count.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ *
+ * @return Copy of the map of topics to the count of number of subscribers.
+ */
+public Map updateSubscribedTopicNames(
+ConsumerGroupMember oldMember,
+ConsumerGroupMember newMember
+) {
+Map subscribedTopicCount = new 
HashMap<>(this.subscribedTopicNames);
+if (oldMember != null) {
+oldMember.subscribedTopicNames().forEach(topicName ->
+subscribedTopicCount.compute(topicName, 
ConsumerGroup::decValue)
+);
+}
+
+if (newMember != null) {
+

Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-26 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1581140616


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +987,23 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription type, iff necessary.
+ *
+ * If all members are subscribed to the same set of topics, the type is 
homogeneous.
+ * Otherwise, it is heterogeneous.
+ */
+private void maybeUpdateGroupSubscriptionType() {

Review Comment:
   hmm theoretically yes, but what if it stays homo/hetero even after the 
update? maybe it doesn't matter cause the variable is reassigned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-26 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1581134610


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -620,9 +637,10 @@ public Map 
computeSubscriptionMetadata(
 TopicsImage topicsImage,
 ClusterImage clusterImage
 ) {
-// Copy and update the current subscriptions.
+// Copy and update the current subscription information.
 Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
 maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
+maybeUpdateGroupSubscriptionType();

Review Comment:
   o that makes sense, I wasn't sure what the reasoning was, okie I'll 
remove it, thanks for explaining



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-26 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1581132398


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -204,7 +198,7 @@ private void createAssignmentSpec() {
 }
 }
 
-this.assignmentSpec = new AssignmentSpec(members);
+this.assignmentSpec = new AssignmentSpec(members, subscriptionType);

Review Comment:
   hmm this is weird lol, I could've sworn I removed em all



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-25 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1580133686


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -687,7 +688,7 @@ public void 
testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme
 currentAssignmentForC
 ));
 
-AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, 
HETEROGENEOUS);

Review Comment:
   Yeah it covers it. Nothing changed in the hetero path anyways



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-25 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1580115582


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -90,17 +91,29 @@ private Map> membersPerTopic(final 
AssignmentSpec assignmentS
 Map> membersPerTopic = new HashMap<>();
 Map membersData = 
assignmentSpec.members();
 
-membersData.forEach((memberId, memberMetadata) -> {
-Collection topics = memberMetadata.subscribedTopicIds();
+if (assignmentSpec.groupSubscriptionModel().equals(HOMOGENEOUS)) {
+List allMembers = new ArrayList<>(membersData.keySet());

Review Comment:
   Thanks! Made the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-25 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1579102085


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+/**
+ * The subscription model followed by the consumer group.
+ *
+ * A homogeneous subscription model means that all the members
+ * of the group are subscribed to the same set of topics.
+ *
+ * The model is heterogeneous otherwise.
+ */
+public enum ConsumerGroupSubscriptionModel {

Review Comment:
   nit: Should we call it `SubscriptionType`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -90,17 +91,29 @@ private Map> membersPerTopic(final 
AssignmentSpec assignmentS
 Map> membersPerTopic = new HashMap<>();
 Map membersData = 
assignmentSpec.members();
 
-membersData.forEach((memberId, memberMetadata) -> {
-Collection topics = memberMetadata.subscribedTopicIds();
+if (assignmentSpec.groupSubscriptionModel().equals(HOMOGENEOUS)) {
+List allMembers = new ArrayList<>(membersData.keySet());

Review Comment:
   I wonder if we could change the return type of the method from `Map>` to `Map>` and avoid this copy here. It 
seems possible because we only iterate over the member ids later on. This could 
be a nice performance improvement too while we are here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+/**
+ * The subscription model followed by the consumer group.
+ *
+ * A homogeneous subscription model means that all the members
+ * of the group are subscribed to the same set of topics.
+ *
+ * The model is heterogeneous otherwise.
+ */
+public enum ConsumerGroupSubscriptionModel {
+HOMOGENEOUS("Homogeneous"),
+HETEROGENEOUS("Heterogeneous");
+private final String name;

Review Comment:
   nit: Let's add an empty line before this one.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -620,9 +637,9 @@ public Map 
computeSubscriptionMetadata(
 TopicsImage topicsImage,
 ClusterImage clusterImage
 ) {
-// Copy and update the current subscriptions.
+// Copy and update the current subscription information.
 Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
-maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
+
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(subscribedTopicNames, 
oldMember, newMember);

Review Comment:
   Hum... We need to be careful here because  we are not suppose to update the 
internal state of the group on this code path. It may be better to keep it as 
it was before and to have another method to update the subscription type.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupSubscriptionModel.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. 

Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-23 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1576317772


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -918,40 +935,53 @@ private void 
maybeUpdateClassicProtocolMembersSupportedProtocols(
 
 /**
  * Updates the subscribed topic names count.
+ * Changes to the subscription model as a consequence
+ * of this update is reflected as well.
  *
  * @param oldMember The old member.
  * @param newMember The new member.
  */
-private void maybeUpdateSubscribedTopicNames(
+private void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(
 ConsumerGroupMember oldMember,
 ConsumerGroupMember newMember
 ) {
-maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
+
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(subscribedTopicNames, 
oldMember, newMember);
 }
 
 /**
- * Updates the subscription count.
+ * Updates the subscription count and the subscription model, if required.
  *
  * @param subscribedTopicCount  The map to update.
  * @param oldMember The old member.
  * @param newMember The new member.
  */
-private static void maybeUpdateSubscribedTopicNames(
+private void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(

Review Comment:
   Should we add unit test to validate this logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org