lucasbru commented on code in PR #20735:
URL: https://github.com/apache/kafka/pull/20735#discussion_r2445969927


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1587,11 +1587,21 @@ public void handleStreamsRebalanceData() {
     }
 
     private void handleMissingSourceTopicsWithTimeout(final String 
missingTopicsDetail) {
+        // Determine the timeout: use 2 * heartbeatIntervalMs if available, 
otherwise fall back to maxPollTimeMs

Review Comment:
   I think heartbeatIntervalMs must be defined at this stage. I would simplify 
the code, there is no need for some default.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1587,11 +1587,21 @@ public void handleStreamsRebalanceData() {
     }
 
     private void handleMissingSourceTopicsWithTimeout(final String 
missingTopicsDetail) {
+        // Determine the timeout: use 2 * heartbeatIntervalMs if available, 
otherwise fall back to maxPollTimeMs
+        long timeoutMs = maxPollTimeMs;
+        if (streamsRebalanceData.isPresent()) {

Review Comment:
   isn't this only called when streamsRebalanceData is present?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -395,4 +398,14 @@ public List<StreamsGroupHeartbeatResponseData.Status> 
statuses() {
         return statuses.get();
     }
 
+    /** Updated whenever a heartbeat response is received from the broker. */
+    public void setHeartbeatIntervalMs(final int heartbeatIntervalMs) {
+        this.heartbeatIntervalMs.set(heartbeatIntervalMs);
+    }
+
+    /** Returns the heartbeat interval in milliseconds, or -1 if not yet set. 
*/
+    public int getHeartbeatIntervalMs() {

Review Comment:
   We don't use the `get` prefix here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to