This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c25b311cbe KAFKA-14814: Skip Connect target state updates when the 
configs store has same state (#13426)
3c25b311cbe is described below

commit 3c25b311cbe5c6c77b764bd9dbac28ee2c0b4f94
Author: Chaitanya Mukka <chaitanya.mvs2...@gmail.com>
AuthorDate: Thu Mar 23 20:53:38 2023 +0530

    KAFKA-14814: Skip Connect target state updates when the configs store has 
same state (#13426)
    
    Reviewers: Yash Mayya <yash.ma...@gmail.com>, Chris Egerton 
<chr...@aiven.io>
---
 .../connect/storage/KafkaConfigBackingStore.java   |  6 ++-
 .../connect/storage/MemoryConfigBackingStore.java  |  3 +-
 .../storage/KafkaConfigBackingStoreTest.java       | 49 ++++++++++++++++++++++
 3 files changed, 55 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 5d44953ec16..f33b1aef7b3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -908,6 +908,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
     private void processTargetStateRecord(String connectorName, SchemaAndValue 
value) {
         boolean removed = false;
+        boolean stateChanged = true;
         synchronized (lock) {
             if (value.value() == null) {
                 // When connector configs are removed, we also write 
tombstones for the target state.
@@ -935,7 +936,8 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
                 try {
                     TargetState state = TargetState.valueOf((String) 
targetState);
                     log.debug("Setting target state for connector '{}' to {}", 
connectorName, targetState);
-                    connectorTargetStates.put(connectorName, state);
+                    TargetState prevState = 
connectorTargetStates.put(connectorName, state);
+                    stateChanged = !state.equals(prevState);
                 } catch (IllegalArgumentException e) {
                     log.error("Invalid target state for connector '{}': {}", 
connectorName, targetState);
                     return;
@@ -945,7 +947,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
         // Note that we do not notify the update listener if the target state 
has been removed.
         // Instead we depend on the removal callback of the connector config 
itself to notify the worker.
-        if (started && !removed)
+        if (started && !removed && stateChanged)
             updateListener.onConnectorTargetStateChange(connectorName);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index eadb873b45a..dcdfd71296b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -147,9 +147,10 @@ public class MemoryConfigBackingStore implements 
ConfigBackingStore {
         if (connectorState == null)
             throw new IllegalArgumentException("No connector `" + connector + 
"` configured");
 
+        TargetState prevState = connectorState.targetState;
         connectorState.targetState = state;
 
-        if (updateListener != null)
+        if (updateListener != null && !state.equals(prevState))
             updateListener.onConnectorTargetStateChange(connector);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index bda35bd3ed1..63fe460685f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -146,6 +146,7 @@ public class KafkaConfigBackingStoreTest {
             new 
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
     );
     private static final Struct TARGET_STATE_PAUSED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
+    private static final Struct TARGET_STATE_STARTED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
 
     private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
             = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@@ -889,6 +890,54 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSameTargetState() throws Exception {
+        // verify that we handle target state changes correctly when they come 
up through the log
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+            new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 
0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 
0, 0, TASK_CONFIG_KEYS.get(0),
+                CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 
0, 0, TASK_CONFIG_KEYS.get(1),
+                CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+            new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 
0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        // on resume update listener shouldn't be called
+        
configUpdateListener.onConnectorTargetStateChange(EasyMock.anyString());
+        EasyMock.expectLastCall().andStubThrow(new AssertionError("unexpected 
call to onConnectorTargetStateChange"));
+
+        expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), 
TARGET_STATE_STARTED);
+
+        expectPartitionCount(1);
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        configStorage.start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.refresh(0, TimeUnit.SECONDS);
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testBackgroundConnectorDeletion() throws Exception {
         // verify that we handle connector deletions correctly when they come 
up through the log

Reply via email to