rhauch commented on a change in pull request #10014:
URL: https://github.com/apache/kafka/pull/10014#discussion_r590534609
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -402,7 +402,7 @@ public void tick() {
log.debug("Scheduled rebalance at: {} (now: {}
nextRequestTimeoutMs: {}) ",
scheduledRebalance, now, nextRequestTimeoutMs);
}
- if (internalRequestValidationEnabled() && keyExpiration <
Long.MAX_VALUE) {
+ if (isLeader() && internalRequestValidationEnabled() && keyExpiration
< Long.MAX_VALUE) {
Review comment:
IIUC, `keyExpiration` is set to `Long.MAX_VALUE` above (line 365) when
the key is to be rotated, but then after that is done this herder is the leader
and its `keyExpiration` field is modified only through its own
`ConfigUpdateListener`. But it's also possible that when all requests to be
processed on lines 380-397 take less time than the config store requires to
consume its own session key record, then this block is not entered until the
next `tick()` call? If so, then isn't it possible that the tick blocks longer
than the time until the next key expiration (assuming there are no further
config updates or rebalance-inducing conditions)?
##########
File path:
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
##########
@@ -2071,6 +2073,84 @@ public void testPutConnectorConfig() throws Exception {
PowerMock.verifyAll();
}
+ @Test
+ public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ // First rebalance: poll indefinitely as no key has been read yet, so
expiration doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ expectRebalance(2, Collections.emptyList(), Collections.emptyList());
+ SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class),
0);
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(2,
initialKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ expectPostRebalanceCatchup(snapshotWithKey);
+ // Second rebalance: poll indefinitely as worker is follower, so
expiration still doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ expectRebalance(2, Collections.emptyList(), Collections.emptyList(),
"member", MEMBER_URL);
+ Capture<SessionKey> updatedKey = EasyMock.newCapture();
+ configBackingStore.putSessionKey(EasyMock.capture(updatedKey));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ configUpdateListener.onSessionKeyUpdate(updatedKey.getValue());
+ return null;
+ });
Review comment:
This is the condition that in the test (and probably the majority of
real-world cases) always happens quickly and immediately, but which at least
theoretically could take a while if the config backing store's consumer is
somehow blocked. If that were the case, then the leader's `member.poll(...)`
invocation blocks indefinitely. If there are no rebalance-inducing events,
config changes, or even herder requests, then isn't is possible that even if
the config back store calls the listener regarding the session key update
_after_ the `member.poll(...)` is called, leading to the session key expiring
before the herder's `tick()` method returns?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]