rhauch commented on a change in pull request #10014:
URL: https://github.com/apache/kafka/pull/10014#discussion_r590633040
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -402,7 +402,7 @@ public void tick() {
log.debug("Scheduled rebalance at: {} (now: {}
nextRequestTimeoutMs: {}) ",
scheduledRebalance, now, nextRequestTimeoutMs);
}
- if (internalRequestValidationEnabled() && keyExpiration <
Long.MAX_VALUE) {
+ if (isLeader() && internalRequestValidationEnabled() && keyExpiration
< Long.MAX_VALUE) {
Review comment:
Actually, the `configLog.readToEnd().get(30000, TimeUnit.MILLISECONDS)`
effective call only blocks up to 30 seconds for the consumer to read to the
end. So it is *possible* for the `putSessionKey(...)` to return without the
consumer having caught up and the session key notification method called. One
example scenario is a network event after the producer successfully sends the
record, but before the consumer is able to read to the end of the log, at which
point the consumer continues retrying with effectively no timeout.
It still seems like it's possible in worker with no effective changes for it
to block much longer than the key expiration. Obviously, if anything does cause
the leader to wake up, it should be recover. But my question is what happens to
one of the followers (which maybe isn't partitioned from the Kafka cluster)
when they discover the session key is expired? Do they do nothing until they
are called (again, assuming no change in membership and no config changes)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]