kirktrue commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1411218465


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -800,6 +811,11 @@ public void resetTimer() {
             this.timer.reset(autoCommitInterval);
         }
 
+        public long remainingMs(final long currentTimeMs) {
+            this.timer.update(currentTimeMs);
+            return this.timer.remainingMs();
+        }
+

Review Comment:
   Example, this is updating the Timer from both threads.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1074,6 +1074,7 @@ boolean reconciliationInProgress() {
     public void onUpdate(ClusterResource clusterResource) {
         resolveMetadataForUnresolvedAssignment();
         if (!assignmentReadyToReconcile.isEmpty()) {
+            transitionTo(MemberState.RECONCILING);

Review Comment:
   OK, I think I'm running into this too. In my tests, the state machine was 
preemptively `RECONCILING` only to later find out the assignments were the 
same, after which it returned from `reconcile()` without resetting the state.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -205,6 +205,26 @@ public void wakeup() {
             networkClientDelegate.wakeup();
     }
 
+    /**
+     * Returns the delay for which the application thread can safely wait 
before it should be responsive
+     * to results from the request managers. For example, the subscription 
state can change when heartbeats
+     * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
+     * responsive to changes.
+     *
+     * @return The maximum delay in milliseconds
+     */
+    public long maximumTimeToWait() {
+        final long currentTimeMs = time.milliseconds();
+        if (requestManagers == null) {
+            return MAX_POLL_TIMEOUT_MS;
+        }
+        return requestManagers.entries().stream()
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .map(rm -> rm.maximumTimeToWait(currentTimeMs))
+                .reduce(Long.MAX_VALUE, Math::min);
+    }
+

Review Comment:
   This will invoke the request managers directly from the application thread, 
right?
   
   If so, I'm a little concerned by this approach because the implementation of 
`maximumTimeToWait` in each `RequestManager` will be executed by the 
application thread. In the implementations elsewhere in this PR we're reading 
and writing state in the request managers that is only intended to be read or 
written by the network I/O thread.
   
   We need to be careful here 🤔 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
             this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
         }
     }
+
+    /**
+     * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+     * the protocol, but subsequent requests do not send information which has 
not changed. This
+     * is important to ensure that reconciliation completes successfully.
+     */
+    static class HeartbeatState {

Review Comment:
   Fair enough 😄 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -205,6 +205,26 @@ public void wakeup() {
             networkClientDelegate.wakeup();
     }
 
+    /**
+     * Returns the delay for which the application thread can safely wait 
before it should be responsive
+     * to results from the request managers. For example, the subscription 
state can change when heartbeats
+     * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
+     * responsive to changes.
+     *
+     * @return The maximum delay in milliseconds
+     */
+    public long maximumTimeToWait() {
+        final long currentTimeMs = time.milliseconds();
+        if (requestManagers == null) {
+            return MAX_POLL_TIMEOUT_MS;
+        }
+        return requestManagers.entries().stream()
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .map(rm -> rm.maximumTimeToWait(currentTimeMs))
+                .reduce(Long.MAX_VALUE, Math::min);
+    }
+

Review Comment:
   This will invoke the request managers directly from the application thread, 
right?
   
   If so, I'm a little concerned by this approach because the implementation of 
`maximumTimeToWait` in each `RequestManager` will be executed by the 
application thread. In the implementations elsewhere in this PR we're reading 
and writing state in the request managers that is only intended to be read or 
written by the network I/O thread.
   
   We need to be careful here 🤔 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long 
heartbeatIntervalMs) {
             this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
         }
     }
+
+    /**
+     * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
+     * the protocol, but subsequent requests do not send information which has 
not changed. This
+     * is important to ensure that reconciliation completes successfully.
+     */
+    static class HeartbeatState {

Review Comment:
   Fair enough 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to