This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3c25b311cbe KAFKA-14814: Skip Connect target state updates when the configs store has same state (#13426) 3c25b311cbe is described below commit 3c25b311cbe5c6c77b764bd9dbac28ee2c0b4f94 Author: Chaitanya Mukka <chaitanya.mvs2...@gmail.com> AuthorDate: Thu Mar 23 20:53:38 2023 +0530 KAFKA-14814: Skip Connect target state updates when the configs store has same state (#13426) Reviewers: Yash Mayya <yash.ma...@gmail.com>, Chris Egerton <chr...@aiven.io> --- .../connect/storage/KafkaConfigBackingStore.java | 6 ++- .../connect/storage/MemoryConfigBackingStore.java | 3 +- .../storage/KafkaConfigBackingStoreTest.java | 49 ++++++++++++++++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 5d44953ec16..f33b1aef7b3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -908,6 +908,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { private void processTargetStateRecord(String connectorName, SchemaAndValue value) { boolean removed = false; + boolean stateChanged = true; synchronized (lock) { if (value.value() == null) { // When connector configs are removed, we also write tombstones for the target state. @@ -935,7 +936,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { try { TargetState state = TargetState.valueOf((String) targetState); log.debug("Setting target state for connector '{}' to {}", connectorName, targetState); - connectorTargetStates.put(connectorName, state); + TargetState prevState = connectorTargetStates.put(connectorName, state); + stateChanged = !state.equals(prevState); } catch (IllegalArgumentException e) { log.error("Invalid target state for connector '{}': {}", connectorName, targetState); return; @@ -945,7 +947,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { // Note that we do not notify the update listener if the target state has been removed. // Instead we depend on the removal callback of the connector config itself to notify the worker. - if (started && !removed) + if (started && !removed && stateChanged) updateListener.onConnectorTargetStateChange(connectorName); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java index eadb873b45a..dcdfd71296b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java @@ -147,9 +147,10 @@ public class MemoryConfigBackingStore implements ConfigBackingStore { if (connectorState == null) throw new IllegalArgumentException("No connector `" + connector + "` configured"); + TargetState prevState = connectorState.targetState; connectorState.targetState = state; - if (updateListener != null) + if (updateListener != null && !state.equals(prevState)) updateListener.onConnectorTargetStateChange(connector); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index bda35bd3ed1..63fe460685f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -146,6 +146,7 @@ public class KafkaConfigBackingStoreTest { new Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9) ); private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED"); + private static final Struct TARGET_STATE_STARTED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED"); private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); @@ -889,6 +890,54 @@ public class KafkaConfigBackingStoreTest { PowerMock.verifyAll(); } + @Test + public void testSameTargetState() throws Exception { + // verify that we handle target state changes correctly when they come up through the log + + expectConfigure(); + List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(1), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1), + CONFIGS_SERIALIZED.get(2), new RecordHeaders(), Optional.empty()), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), + CONFIGS_SERIALIZED.get(3), new RecordHeaders(), Optional.empty())); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserialized); + + // on resume update listener shouldn't be called + configUpdateListener.onConnectorTargetStateChange(EasyMock.anyString()); + EasyMock.expectLastCall().andStubThrow(new AssertionError("unexpected call to onConnectorTargetStateChange")); + + expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_STARTED); + + expectPartitionCount(1); + expectStop(); + + PowerMock.replayAll(); + + configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); + configStorage.start(); + + // Should see a single connector with initial state paused + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + + configStorage.refresh(0, TimeUnit.SECONDS); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + @Test public void testBackgroundConnectorDeletion() throws Exception { // verify that we handle connector deletions correctly when they come up through the log