This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c25b311cbe KAFKA-14814: Skip Connect target state updates when the
configs store has same state (#13426)
3c25b311cbe is described below
commit 3c25b311cbe5c6c77b764bd9dbac28ee2c0b4f94
Author: Chaitanya Mukka
AuthorDate: Thu Mar 23 20:53:38 2023 +0530
KAFKA-14814: Skip Connect target state updates when the configs store has
same state (#13426)
Reviewers: Yash Mayya , Chris Egerton
---
.../connect/storage/KafkaConfigBackingStore.java | 6 ++-
.../connect/storage/MemoryConfigBackingStore.java | 3 +-
.../storage/KafkaConfigBackingStoreTest.java | 49 ++
3 files changed, 55 insertions(+), 3 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 5d44953ec16..f33b1aef7b3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -908,6 +908,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
private void processTargetStateRecord(String connectorName, SchemaAndValue
value) {
boolean removed = false;
+boolean stateChanged = true;
synchronized (lock) {
if (value.value() == null) {
// When connector configs are removed, we also write
tombstones for the target state.
@@ -935,7 +936,8 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
try {
TargetState state = TargetState.valueOf((String)
targetState);
log.debug("Setting target state for connector '{}' to {}",
connectorName, targetState);
-connectorTargetStates.put(connectorName, state);
+TargetState prevState =
connectorTargetStates.put(connectorName, state);
+stateChanged = !state.equals(prevState);
} catch (IllegalArgumentException e) {
log.error("Invalid target state for connector '{}': {}",
connectorName, targetState);
return;
@@ -945,7 +947,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
// Note that we do not notify the update listener if the target state
has been removed.
// Instead we depend on the removal callback of the connector config
itself to notify the worker.
-if (started && !removed)
+if (started && !removed && stateChanged)
updateListener.onConnectorTargetStateChange(connectorName);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index eadb873b45a..dcdfd71296b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -147,9 +147,10 @@ public class MemoryConfigBackingStore implements
ConfigBackingStore {
if (connectorState == null)
throw new IllegalArgumentException("No connector `" + connector +
"` configured");
+TargetState prevState = connectorState.targetState;
connectorState.targetState = state;
-if (updateListener != null)
+if (updateListener != null && !state.equals(prevState))
updateListener.onConnectorTargetStateChange(connector);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index bda35bd3ed1..63fe460685f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -146,6 +146,7 @@ public class KafkaConfigBackingStoreTest {
new
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
);
private static final Struct TARGET_STATE_PAUSED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
+private static final Struct TARGET_STATE_STARTED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
= new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@@ -889,6