kirktrue commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1406656043


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -159,11 +159,19 @@ public NetworkClientDelegate.PollResult poll(final long 
currentTimeMs) {
             return EMPTY;
 
         List<NetworkClientDelegate.UnsentRequest> requests = 
pendingRequests.drain(currentTimeMs);
+        return new 
NetworkClientDelegate.PollResult(timeUntilNextPoll(currentTimeMs), requests);
+    }
+
+    /**
+     * Returns the delay before the next network request for this request 
manager. Used to ensure that
+     * waiting in the application thread does not delay beyond the point that 
a result can be returned.
+     */
+    @Override
+    public long timeUntilNextPoll(long currentTimeMs) {
         // min of the remainingBackoffMs of all the request that are still 
backing off
-        final long timeUntilNextPoll = Math.min(
-            findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
-            findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
-        return new NetworkClientDelegate.PollResult(timeUntilNextPoll, 
requests);
+        return Math.min(
+                findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
+                findMinTime(unsentOffsetFetchRequests(), currentTimeMs));

Review Comment:
   Nit: leaving the indentation as is conforms to the existing 'four-spaces per 
tab' style.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -181,36 +191,19 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
         return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
     }
 
-    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
-        // TODO: extract this logic for building the 
ConsumerGroupHeartbeatRequestData to a
-        //  stateful builder (HeartbeatState), that will keep the last data 
sent, and determine
-        //  the fields that changed and need to be included in the next HB 
(ex. check
-        //  subscriptionState changed from last sent to include assignment). 
It should also
-        //  ensure that all fields are sent on failure.
-        ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData()
-            .setGroupId(membershipManager.groupId())
-            .setMemberEpoch(membershipManager.memberEpoch())
-            .setRebalanceTimeoutMs(rebalanceTimeoutMs);
-
-        if (membershipManager.memberId() != null) {
-            data.setMemberId(membershipManager.memberId());
-        }
-
-        membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
-
-        if (this.subscriptions.hasPatternSubscription()) {
-            // TODO: Pass the string to the GC if server side regex is used.
-        } else {
-            data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
-            List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
topicPartitions =
-                    
buildTopicPartitionsList(membershipManager.currentAssignment());
-            data.setTopicPartitions(topicPartitions);
-        }
-
-        
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
+    /**
+     * Returns the delay before the next network request for this request 
manager. Used to ensure that
+     * waiting in the application thread does not delay beyond the point that 
a result can be returned.
+     */
+    @Override
+    public long timeUntilNextPoll(long currentTimeMs) {
+        boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
+        return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
+    }
 
+    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
         NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new ConsumerGroupHeartbeatRequest.Builder(data),
+            new 
ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),

Review Comment:
   Nit: we got knocked in other reviews for unnecessarily qualifying with 
`this`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
             this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
         }
     }
+
+    /**
+     * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+     * the protocol, but subsequent requests do not send information which has 
not changed. This
+     * is important to ensure that reconciliation completes successfully.
+     */
+    static class HeartbeatState {
+        private final SubscriptionState subscriptions;
+        private final MembershipManager membershipManager;
+        private final int rebalanceTimeoutMs;
+
+        // Fields of ConsumerHeartbeatRequest sent in the most recent request
+        private String sentInstanceId;
+        private int sentRebalanceTimeoutMs;
+        private TreeSet<String> sentSubscribedTopicNames;
+        // private String sentSubscribedTopicRegex;
+        private String sentServerAssignor;
+        private TreeSet<String> sentTopicPartitions;
+
+        public HeartbeatState(
+            final SubscriptionState subscriptions,
+            final MembershipManager membershipManager,
+            final int rebalanceTimeoutMs) {
+            this.subscriptions = subscriptions;
+            this.membershipManager = membershipManager;
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            this.sentInstanceId = null;
+            this.sentRebalanceTimeoutMs = -1;
+            this.sentSubscribedTopicNames = null;
+            this.sentServerAssignor = null;
+            this.sentTopicPartitions = null;
+        }
+
+
+        public void reset() {
+            sentInstanceId = null;
+            sentRebalanceTimeoutMs = -1;
+            sentSubscribedTopicNames = null;
+            sentServerAssignor = null;
+            sentTopicPartitions = null;
+        }
+
+        public ConsumerGroupHeartbeatRequestData buildRequestData() {
+            ConsumerGroupHeartbeatRequestData data = new 
ConsumerGroupHeartbeatRequestData();
+
+            // GroupId - always sent
+            data.setGroupId(membershipManager.groupId());
+
+            // MemberId - always sent, empty until it has been received from 
the coordinator
+            data.setMemberId(membershipManager.memberId());
+
+            // MemberEpoch - always sent
+            data.setMemberEpoch(membershipManager.memberEpoch());
+
+            // InstanceId - sent if hasn't changed since the last heartbeat
+            membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
+                if (!groupInstanceId.equals(sentInstanceId)) {
+                    data.setInstanceId(groupInstanceId);
+                    sentInstanceId = groupInstanceId;
+                }
+            });
+
+            // RebalanceTimeoutMs - sent if hasn't changed since the last 
heartbeat
+            if (sentRebalanceTimeoutMs != rebalanceTimeoutMs) {
+                data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+                sentRebalanceTimeoutMs = rebalanceTimeoutMs;
+            }
+
+            if (!this.subscriptions.hasPatternSubscription()) {
+                // SubscribedTopicNames - sent if hasn't changed since the 
last heartbeat
+                TreeSet<String> subscribedTopicNames = new 
TreeSet<>(this.subscriptions.subscription());
+                if (!subscribedTopicNames.equals(sentSubscribedTopicNames)) {
+                    data.setSubscribedTopicNames(new 
ArrayList<>(this.subscriptions.subscription()));
+                    sentSubscribedTopicNames = subscribedTopicNames;
+                }
+            } else {
+                // SubscribedTopicRegex - sent if hasn't changed since the 
last heartbeat
+                //                      - not supported yet
+            }
+
+            // ServerAssignor - sent if hasn't changed since the last heartbeat
+            this.membershipManager.serverAssignor().ifPresent(serverAssignor 
-> {
+                if (!serverAssignor.equals(sentServerAssignor)) {
+                    data.setServerAssignor(serverAssignor);
+                    sentServerAssignor = serverAssignor;
+                }
+            });
+
+            // ClientAssignors - not supported yet
+
+            // TopicPartitions - sent if hasn't changed since the last 
heartbeat
+            TreeSet<String> assignedPartitions = new 
TreeSet<>(membershipManager.currentAssignment().stream()
+                    .map(tp -> tp.topicId() + "-" + 
tp.partition()).collect(Collectors.toList()));
+            if (!assignedPartitions.equals(sentTopicPartitions)) {
+                List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
topicPartitions =
+                        
buildTopicPartitionsList(membershipManager.currentAssignment());
+                data.setTopicPartitions(topicPartitions);
+                sentTopicPartitions = assignedPartitions;
+            }
+
+            return data;
+        }
+
+        private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
buildTopicPartitionsList(Set<TopicIdPartition> topicIdPartitions) {
+            List<ConsumerGroupHeartbeatRequestData.TopicPartitions> result = 
new ArrayList<>();
+            Map<Uuid, List<Integer>> partitionsPerTopicId = new HashMap<>();
+            for (TopicIdPartition topicIdPartition : topicIdPartitions) {
+                Uuid topicId = topicIdPartition.topicId();
+                if (!partitionsPerTopicId.containsKey(topicId)) {
+                    partitionsPerTopicId.put(topicId, new ArrayList<>());
+                }
+                
partitionsPerTopicId.get(topicId).add(topicIdPartition.partition());

Review Comment:
   ```suggestion
                   partitionsPerTopicId.computeIfAbsent(topicId, __ -> new 
ArrayList<>()).add(partitionId);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
             this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
         }
     }
+
+    /**
+     * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+     * the protocol, but subsequent requests do not send information which has 
not changed. This
+     * is important to ensure that reconciliation completes successfully.
+     */
+    static class HeartbeatState {
+        private final SubscriptionState subscriptions;
+        private final MembershipManager membershipManager;
+        private final int rebalanceTimeoutMs;
+
+        // Fields of ConsumerHeartbeatRequest sent in the most recent request
+        private String sentInstanceId;
+        private int sentRebalanceTimeoutMs;
+        private TreeSet<String> sentSubscribedTopicNames;
+        // private String sentSubscribedTopicRegex;
+        private String sentServerAssignor;
+        private TreeSet<String> sentTopicPartitions;

Review Comment:
   IMO, we should group all of the 'last sent' attributes in a separate little 
struct/class, just to keep their relation and their (re-)initializing logic 
together.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
             this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
         }
     }
+
+    /**
+     * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+     * the protocol, but subsequent requests do not send information which has 
not changed. This
+     * is important to ensure that reconciliation completes successfully.
+     */
+    static class HeartbeatState {

Review Comment:
   Any reason `HeartbeatState` needs to be `static`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to