rhauch commented on a change in pull request #10014:
URL: https://github.com/apache/kafka/pull/10014#discussion_r590633040



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -402,7 +402,7 @@ public void tick() {
             log.debug("Scheduled rebalance at: {} (now: {} 
nextRequestTimeoutMs: {}) ",
                     scheduledRebalance, now, nextRequestTimeoutMs);
         }
-        if (internalRequestValidationEnabled() && keyExpiration < 
Long.MAX_VALUE) {
+        if (isLeader() && internalRequestValidationEnabled() && keyExpiration 
< Long.MAX_VALUE) {

Review comment:
       Actually, the `configLog.readToEnd().get(30000, TimeUnit.MILLISECONDS)` 
effective call only blocks up to 30 seconds for the consumer to read to the 
end. So it is *possible* for the `putSessionKey(...)` to return without the 
consumer having caught up and the session key notification method called. One 
example scenario is a network event after the producer successfully sends the 
record, but before the consumer is able to read to the end of the log, at which 
point the consumer continues retrying with effectively no timeout.
   
   It still seems like it's possible in worker with no effective changes for it 
to block much longer than the key expiration. Obviously, if anything does cause 
the leader to wake up, it should be recover. But my question is what happens to 
one of the followers (which maybe isn't partitioned from the Kafka cluster) 
when they discover the session key is expired? Do they do nothing until they 
are called (again, assuming no change in membership and no config changes)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to