[kafka] branch trunk updated (e07cc127e12 -> f79c2a6e041)

2023-03-23 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from e07cc127e12 MINOR: Fix remote_kraft -> isolated_kraft in kafkatest 
(#13439)
 add f79c2a6e041 MINOR:Incorrect/canonical use of constants in 
AdminClientConfig and StreamsConfigTest (#13427)

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/kafka/clients/admin/AdminClientConfig.java   | 4 ++--
 streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)



[kafka] branch trunk updated (3c25b311cbe -> e07cc127e12)

2023-03-23 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 3c25b311cbe KAFKA-14814: Skip Connect target state updates when the 
configs store has same state (#13426)
 add e07cc127e12 MINOR: Fix remote_kraft -> isolated_kraft in kafkatest 
(#13439)

No new revisions were added by this update.

Summary of changes:
 tests/kafkatest/sanity_checks/test_verifiable_producer.py | 8 
 tests/kafkatest/tests/connect/connect_test.py | 4 ++--
 tests/kafkatest/tests/core/authorizer_test.py | 6 +++---
 tests/kafkatest/tests/core/round_trip_fault_test.py   | 4 ++--
 tests/kafkatest/tests/core/security_test.py   | 2 +-
 tests/kafkatest/tests/streams/streams_broker_bounce_test.py   | 2 +-
 .../tests/streams/streams_broker_down_resilience_test.py  | 8 
 tests/kafkatest/tests/streams/streams_eos_test.py | 8 
 .../tests/streams/streams_named_repartition_topic_test.py | 2 +-
 tests/kafkatest/tests/streams/streams_optimized_test.py   | 2 +-
 tests/kafkatest/tests/streams/streams_relational_smoke_test.py| 2 +-
 tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py   | 2 +-
 tests/kafkatest/tests/streams/streams_standby_replica_test.py | 2 +-
 tests/kafkatest/tests/streams/streams_static_membership_test.py   | 2 +-
 14 files changed, 27 insertions(+), 27 deletions(-)



[kafka] branch trunk updated: KAFKA-14814: Skip Connect target state updates when the configs store has same state (#13426)

2023-03-23 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3c25b311cbe KAFKA-14814: Skip Connect target state updates when the 
configs store has same state (#13426)
3c25b311cbe is described below

commit 3c25b311cbe5c6c77b764bd9dbac28ee2c0b4f94
Author: Chaitanya Mukka 
AuthorDate: Thu Mar 23 20:53:38 2023 +0530

KAFKA-14814: Skip Connect target state updates when the configs store has 
same state (#13426)

Reviewers: Yash Mayya , Chris Egerton 

---
 .../connect/storage/KafkaConfigBackingStore.java   |  6 ++-
 .../connect/storage/MemoryConfigBackingStore.java  |  3 +-
 .../storage/KafkaConfigBackingStoreTest.java   | 49 ++
 3 files changed, 55 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 5d44953ec16..f33b1aef7b3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -908,6 +908,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
 private void processTargetStateRecord(String connectorName, SchemaAndValue 
value) {
 boolean removed = false;
+boolean stateChanged = true;
 synchronized (lock) {
 if (value.value() == null) {
 // When connector configs are removed, we also write 
tombstones for the target state.
@@ -935,7 +936,8 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 try {
 TargetState state = TargetState.valueOf((String) 
targetState);
 log.debug("Setting target state for connector '{}' to {}", 
connectorName, targetState);
-connectorTargetStates.put(connectorName, state);
+TargetState prevState = 
connectorTargetStates.put(connectorName, state);
+stateChanged = !state.equals(prevState);
 } catch (IllegalArgumentException e) {
 log.error("Invalid target state for connector '{}': {}", 
connectorName, targetState);
 return;
@@ -945,7 +947,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
 // Note that we do not notify the update listener if the target state 
has been removed.
 // Instead we depend on the removal callback of the connector config 
itself to notify the worker.
-if (started && !removed)
+if (started && !removed && stateChanged)
 updateListener.onConnectorTargetStateChange(connectorName);
 }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index eadb873b45a..dcdfd71296b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -147,9 +147,10 @@ public class MemoryConfigBackingStore implements 
ConfigBackingStore {
 if (connectorState == null)
 throw new IllegalArgumentException("No connector `" + connector + 
"` configured");
 
+TargetState prevState = connectorState.targetState;
 connectorState.targetState = state;
 
-if (updateListener != null)
+if (updateListener != null && !state.equals(prevState))
 updateListener.onConnectorTargetStateChange(connector);
 }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index bda35bd3ed1..63fe460685f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -146,6 +146,7 @@ public class KafkaConfigBackingStoreTest {
 new 
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
 );
 private static final Struct TARGET_STATE_PAUSED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
+private static final Struct TARGET_STATE_STARTED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
 
 private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
 = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@@ -889,6