lucasbru commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1407452521


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,128 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
             this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
         }
     }
+
+    /**
+     * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+     * the protocol, but subsequent requests do not send information which has 
not changed. This
+     * is important to ensure that reconciliation completes successfully.
+     */
+    static class HeartbeatState {
+        private final SubscriptionState subscriptions;
+        private final MembershipManager membershipManager;
+        private final int rebalanceTimeoutMs;
+        private final SentFields sentFields;
+
+        public HeartbeatState(
+            final SubscriptionState subscriptions,
+            final MembershipManager membershipManager,
+            final int rebalanceTimeoutMs) {
+            this.subscriptions = subscriptions;
+            this.membershipManager = membershipManager;
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            this.sentFields = new SentFields();
+        }
+
+
+        public void reset() {
+            sentFields.reset();
+        }
+
+        public ConsumerGroupHeartbeatRequestData buildRequestData() {
+            ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData();
+
+            // GroupId - always sent
+            data.setGroupId(membershipManager.groupId());
+
+            // MemberId - always sent, empty until it has been received from 
the coordinator
+            data.setMemberId(membershipManager.memberId());
+
+            // MemberEpoch - always sent
+            data.setMemberEpoch(membershipManager.memberEpoch());
+
+            // InstanceId - sent if hasn't changed since the last heartbeat
+            membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
+                if (!groupInstanceId.equals(sentFields.instanceId)) {
+                    data.setInstanceId(groupInstanceId);
+                    sentFields.instanceId = groupInstanceId;
+                }
+            });
+
+            // RebalanceTimeoutMs - sent if hasn't changed since the last 
heartbeat
+            if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
+                data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+                sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            }
+
+            if (!this.subscriptions.hasPatternSubscription()) {
+                // SubscribedTopicNames - sent if hasn't changed since the 
last heartbeat
+                TreeSet<String> subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+                if 
(!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
+                    data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+                    sentFields.subscribedTopicNames = subscribedTopicNames;
+                }
+            } else {
+                // SubscribedTopicRegex - sent if hasn't changed since the 
last heartbeat
+                //                      - not supported yet
+            }
+
+            // ServerAssignor - sent if hasn't changed since the last heartbeat
+            this.membershipManager.serverAssignor().ifPresent(serverAssignor 
-> {
+                if (!serverAssignor.equals(sentFields.serverAssignor)) {
+                    data.setServerAssignor(serverAssignor);
+                    sentFields.serverAssignor = serverAssignor;
+                }
+            });
+
+            // ClientAssignors - not supported yet
+
+            // TopicPartitions - sent if hasn't changed since the last 
heartbeat
+            TreeSet<String> assignedPartitions = new 
TreeSet<>(membershipManager.currentAssignment().stream()
+                    .map(tp -> tp.topicId() + "-" + 
tp.partition()).collect(Collectors.toList()));

Review Comment:
   `.collect(Collectors.toCollection(TreeSet::new)` and skip the intermediate 
list. 
   
   Also, you seem to avoid using `TopicIdPartition.toString` or just 
`TopicIdPartition.equals` here - is it to support topic renaming? If so, I 
think it's quite subtle and would deserve a little comment.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -128,13 +136,15 @@ public HeartbeatRequestManager(
         final CoordinatorRequestManager coordinatorRequestManager,
         final SubscriptionState subscriptions,
         final MembershipManager membershipManager,
+        final HeartbeatState heartbeatState,
         final HeartbeatRequestState heartbeatRequestState,
         final BackgroundEventHandler backgroundEventHandler) {
         this.logger = logContext.logger(this.getClass());
         this.subscriptions = subscriptions;

Review Comment:
   Looks like the subscription field is now unused.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,128 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
             this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
         }
     }
+
+    /**
+     * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+     * the protocol, but subsequent requests do not send information which has 
not changed. This
+     * is important to ensure that reconciliation completes successfully.
+     */
+    static class HeartbeatState {
+        private final SubscriptionState subscriptions;
+        private final MembershipManager membershipManager;
+        private final int rebalanceTimeoutMs;
+        private final SentFields sentFields;
+
+        public HeartbeatState(
+            final SubscriptionState subscriptions,
+            final MembershipManager membershipManager,
+            final int rebalanceTimeoutMs) {
+            this.subscriptions = subscriptions;
+            this.membershipManager = membershipManager;
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            this.sentFields = new SentFields();
+        }
+
+
+        public void reset() {
+            sentFields.reset();
+        }
+
+        public ConsumerGroupHeartbeatRequestData buildRequestData() {
+            ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData();
+
+            // GroupId - always sent
+            data.setGroupId(membershipManager.groupId());
+
+            // MemberId - always sent, empty until it has been received from 
the coordinator
+            data.setMemberId(membershipManager.memberId());
+
+            // MemberEpoch - always sent
+            data.setMemberEpoch(membershipManager.memberEpoch());
+
+            // InstanceId - sent if hasn't changed since the last heartbeat

Review Comment:
   I think in this comment and the similar ones below, you wanted to say `send 
if it _has_ changed`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to