This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push: new fe18043 KAFKA-12474: Handle failure to write new session keys gracefully (#10396) fe18043 is described below commit fe1804370680b965a68fdd2978e2afa450daafe4 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Thu Apr 1 13:26:01 2021 -0400 KAFKA-12474: Handle failure to write new session keys gracefully (#10396) If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end. At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader). Verified with new unit tests for both cases (failure to write, failure to read back after write). Author: Chris Egerton <chr...@confluent.io> Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rha...@gmail.com> --- .../runtime/distributed/DistributedHerder.java | 20 ++++-- .../runtime/distributed/DistributedHerderTest.java | 82 ++++++++++++++++++++++ 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index b6dab25..93f6141 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -364,10 +364,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (checkForKeyRotation(now)) { log.debug("Distributing new session key"); keyExpiration = Long.MAX_VALUE; - configBackingStore.putSessionKey(new SessionKey( - keyGenerator.generateKey(), - now - )); + try { + configBackingStore.putSessionKey(new SessionKey( + keyGenerator.generateKey(), + now + )); + } catch (Exception e) { + log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying"); + canReadConfigs = false; + return; + } } // Process any external requests @@ -1186,7 +1192,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @return true if successful, false if timed out */ private boolean readConfigToEnd(long timeoutMs) { - log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); + if (configState.offset() < assignment.offset()) { + log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); + } else { + log.info("Reading to end of config log; current config state offset: {}", configState.offset()); + } try { configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS); configState = configBackingStore.snapshot(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index bed46a3..7ededef 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -23,12 +23,14 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.CloseableConnectorContext; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.MockConnectMetrics; +import org.apache.kafka.connect.runtime.SessionKey; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; @@ -69,6 +71,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import javax.crypto.SecretKey; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -86,8 +89,10 @@ import java.util.concurrent.TimeoutException; import static java.util.Collections.singletonList; import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.newCapture; import static org.junit.Assert.assertEquals; @@ -2271,6 +2276,83 @@ public class DistributedHerderTest { } @Test + public void testFailedToWriteSessionKey() throws Exception { + // First tick -- after joining the group, we try to write a new + // session key to the config topic, and fail + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2); + expectRebalance(1, Collections.emptyList(), Collections.emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + configBackingStore.putSessionKey(anyObject(SessionKey.class)); + EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!")); + + // Second tick -- we read to the end of the config topic first, + // then ensure we're still active in the group + // then try a second time to write a new session key, + // then finally begin polling for group activity + expectPostRebalanceCatchup(SNAPSHOT); + member.ensureActive(); + PowerMock.expectLastCall(); + configBackingStore.putSessionKey(anyObject(SessionKey.class)); + EasyMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + herder.tick(); + + PowerMock.verifyAll(); + } + + @Test + public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception { + SecretKey secretKey = EasyMock.niceMock(SecretKey.class); + EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT); + EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]); + SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds()); + ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, sessionKey, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, Collections.emptySet()); + + // First tick -- after joining the group, we try to write a new session key to + // the config topic, and fail (in this case, we're trying to simulate that we've + // actually written the key successfully, but haven't been able to read it back + // from the config topic, so to the herder it looks the same as if it'd just failed + // to write the key) + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2); + expectRebalance(1, Collections.emptyList(), Collections.emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + configBackingStore.putSessionKey(anyObject(SessionKey.class)); + EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!")); + + // Second tick -- we read to the end of the config topic first, and pick up + // the session key that we were able to write the last time, + // then ensure we're still active in the group + // then finally begin polling for group activity + // Importantly, we do not try to write a new session key this time around + configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + EasyMock.expectLastCall().andAnswer(() -> { + configUpdateListener.onSessionKeyUpdate(sessionKey); + return null; + }); + EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithSessionKey); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(secretKey); + + herder.tick(); + herder.tick(); + + PowerMock.verifyAll(); + } + + @Test public void testKeyExceptionDetection() { assertFalse(herder.isPossibleExpiredKeyException( time.milliseconds(),