[kafka] branch 3.1 updated: MINOR: reload4j build dependency fixes (#12144)

2022-05-10 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
 new bafa69edbb MINOR: reload4j build dependency fixes (#12144)
bafa69edbb is described below

commit bafa69edbbf772752009ac02f0f271230a9a2482
Author: Ismael Juma 
AuthorDate: Tue May 10 20:14:21 2022 -0700

MINOR: reload4j build dependency fixes (#12144)

* Replace `log4j` with `reload4j` in `copyDependantLibs`. Since we have
  some projects that have an explicit `reload4j` dependency, it
  was included in the final release release tar - i.e. it was effectively
  a workaround for this bug.
* Exclude `log4j` and `slf4j-log4j12` transitive dependencies for
  `streams:upgrade-system-tests`. Versions 0100 and 0101
  had a transitive dependency to `log4j` and `slf4j-log4j12` via
  `zkclient` and `zookeeper`. This avoids classpath conflicts that lead
  to [NoSuchFieldError](https://github.com/qos-ch/reload4j/issues/41) in
  system tests.

Reviewers: Jason Gustafson 
---
 build.gradle | 32 +++-
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/build.gradle b/build.gradle
index 308d7182fc..050c182e3e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -903,7 +903,7 @@ project(':core') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -1628,7 +1628,7 @@ project(':tools') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -1678,7 +1678,7 @@ project(':trogdor') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -1985,7 +1985,10 @@ project(':streams:upgrade-system-tests-0100') {
   archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
 
   dependencies {
-testImplementation libs.kafkaStreams_0100
+testImplementation(libs.kafkaStreams_0100) {
+  exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+  exclude group: 'log4j', module: 'log4j'
+}
 testRuntimeOnly libs.junitJupiter
   }
 
@@ -1998,7 +2001,10 @@ project(':streams:upgrade-system-tests-0101') {
   archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
 
   dependencies {
-testImplementation libs.kafkaStreams_0101
+testImplementation(libs.kafkaStreams_0101) {
+  exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+  exclude group: 'log4j', module: 'log4j'
+}
 testRuntimeOnly libs.junitJupiter
   }
 
@@ -2278,7 +2284,7 @@ project(':connect:api') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2315,7 +2321,7 @@ project(':connect:transforms') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2355,7 +2361,7 @@ project(':connect:json') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2421,8 +2427,8 @@ project(':connect:runtime') {
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
+  // No need to copy log4j since the module has an explicit dependency on 
that
   include('slf4j-log4j12*')
-  include('log4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2502,7 +2508,7 @@ project(':connect:file') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2541,7 +2547,7 @@ project(':connect:basic-auth-extension') {
   

[kafka] branch 3.2 updated: MINOR: reload4j build dependency fixes (#12144)

2022-05-10 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
 new c761ee0348 MINOR: reload4j build dependency fixes (#12144)
c761ee0348 is described below

commit c761ee034844414016c27d4e8689cc4e6b105df2
Author: Ismael Juma 
AuthorDate: Tue May 10 20:14:21 2022 -0700

MINOR: reload4j build dependency fixes (#12144)

* Replace `log4j` with `reload4j` in `copyDependantLibs`. Since we have
  some projects that have an explicit `reload4j` dependency, it
  was included in the final release release tar - i.e. it was effectively
  a workaround for this bug.
* Exclude `log4j` and `slf4j-log4j12` transitive dependencies for
  `streams:upgrade-system-tests`. Versions 0100 and 0101
  had a transitive dependency to `log4j` and `slf4j-log4j12` via
  `zkclient` and `zookeeper`. This avoids classpath conflicts that lead
  to [NoSuchFieldError](https://github.com/qos-ch/reload4j/issues/41) in
  system tests.

Reviewers: Jason Gustafson 
---
 build.gradle | 32 +++-
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/build.gradle b/build.gradle
index f91c606a00..aa4f94d704 100644
--- a/build.gradle
+++ b/build.gradle
@@ -916,7 +916,7 @@ project(':core') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -1641,7 +1641,7 @@ project(':tools') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -1691,7 +1691,7 @@ project(':trogdor') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2000,7 +2000,10 @@ project(':streams:upgrade-system-tests-0100') {
   archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
 
   dependencies {
-testImplementation libs.kafkaStreams_0100
+testImplementation(libs.kafkaStreams_0100) {
+  exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+  exclude group: 'log4j', module: 'log4j'
+}
 testRuntimeOnly libs.junitJupiter
   }
 
@@ -2013,7 +2016,10 @@ project(':streams:upgrade-system-tests-0101') {
   archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
 
   dependencies {
-testImplementation libs.kafkaStreams_0101
+testImplementation(libs.kafkaStreams_0101) {
+  exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+  exclude group: 'log4j', module: 'log4j'
+}
 testRuntimeOnly libs.junitJupiter
   }
 
@@ -2319,7 +2325,7 @@ project(':connect:api') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2356,7 +2362,7 @@ project(':connect:transforms') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2396,7 +2402,7 @@ project(':connect:json') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2462,8 +2468,8 @@ project(':connect:runtime') {
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
+  // No need to copy log4j since the module has an explicit dependency on 
that
   include('slf4j-log4j12*')
-  include('log4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2542,7 +2548,7 @@ project(':connect:file') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2581,7 +2587,7 @@ project(':connect:basic-auth-extension') {
   

[kafka] branch trunk updated: MINOR: reload4j build dependency fixes (#12144)

2022-05-10 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 2b14a0bb6d MINOR: reload4j build dependency fixes (#12144)
2b14a0bb6d is described below

commit 2b14a0bb6dd11290d40c033c8c7ac203df72c988
Author: Ismael Juma 
AuthorDate: Tue May 10 20:14:21 2022 -0700

MINOR: reload4j build dependency fixes (#12144)

* Replace `log4j` with `reload4j` in `copyDependantLibs`. Since we have
  some projects that have an explicit `reload4j` dependency, it
  was included in the final release release tar - i.e. it was effectively
  a workaround for this bug.
* Exclude `log4j` and `slf4j-log4j12` transitive dependencies for
  `streams:upgrade-system-tests`. Versions 0100 and 0101
  had a transitive dependency to `log4j` and `slf4j-log4j12` via
  `zkclient` and `zookeeper`. This avoids classpath conflicts that lead
  to [NoSuchFieldError](https://github.com/qos-ch/reload4j/issues/41) in
  system tests.

Reviewers: Jason Gustafson 
---
 build.gradle | 32 +++-
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git a/build.gradle b/build.gradle
index 68dbbf513d..b80de61385 100644
--- a/build.gradle
+++ b/build.gradle
@@ -922,7 +922,7 @@ project(':core') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -1701,7 +1701,7 @@ project(':tools') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -1751,7 +1751,7 @@ project(':trogdor') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2071,7 +2071,10 @@ project(':streams:upgrade-system-tests-0100') {
   archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
 
   dependencies {
-testImplementation libs.kafkaStreams_0100
+testImplementation(libs.kafkaStreams_0100) {
+  exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+  exclude group: 'log4j', module: 'log4j'
+}
 testRuntimeOnly libs.junitJupiter
   }
 
@@ -2084,7 +2087,10 @@ project(':streams:upgrade-system-tests-0101') {
   archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
 
   dependencies {
-testImplementation libs.kafkaStreams_0101
+testImplementation(libs.kafkaStreams_0101) {
+  exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+  exclude group: 'log4j', module: 'log4j'
+}
 testRuntimeOnly libs.junitJupiter
   }
 
@@ -2391,7 +2397,7 @@ project(':connect:api') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2428,7 +2434,7 @@ project(':connect:transforms') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2468,7 +2474,7 @@ project(':connect:json') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2534,8 +2540,8 @@ project(':connect:runtime') {
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
+  // No need to copy log4j since the module has an explicit dependency on 
that
   include('slf4j-log4j12*')
-  include('log4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2614,7 +2620,7 @@ project(':connect:file') {
   tasks.create(name: "copyDependantLibs", type: Copy) {
 from (configurations.testRuntimeClasspath) {
   include('slf4j-log4j12*')
-  include('log4j*jar')
+  include('reload4j*jar')
 }
 from (configurations.runtimeClasspath) {
   exclude('kafka-clients*')
@@ -2653,7 +2659,7 @@ project(':connect:basic-auth-extension') {
   

[kafka] branch trunk updated (7730476603 -> 040b11d705)

2022-05-10 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 7730476603 MINOR: Create case class to encapsulate fetch parameters 
and simplify handling (#12082)
 add 040b11d705  KAFKA-13892: Fix bug where multiple remove records are 
generated for one ACL

No new revisions were added by this update.

Summary of changes:
 .../apache/kafka/controller/AclControlManager.java |  9 ---
 .../kafka/controller/AclControlManagerTest.java| 30 ++
 2 files changed, 36 insertions(+), 3 deletions(-)



[kafka] branch update-readme-regarding-authorizer created (now 16c4d59629)

2022-05-10 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch update-readme-regarding-authorizer
in repository https://gitbox.apache.org/repos/asf/kafka.git


  at 16c4d59629 MINOR: Remove kraft authorizer from list of missing features

No new revisions were added by this update.



[kafka] branch trunk updated: MINOR: Create case class to encapsulate fetch parameters and simplify handling (#12082)

2022-05-10 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 7730476603 MINOR: Create case class to encapsulate fetch parameters 
and simplify handling (#12082)
7730476603 is described below

commit 773047660359bf6b551d06763eeff80bc551b58a
Author: Jason Gustafson 
AuthorDate: Tue May 10 13:24:23 2022 -0700

MINOR: Create case class to encapsulate fetch parameters and simplify 
handling (#12082)

This patch adds a new case class `FetchParams` which encapsulates the 
parameters of the fetch request. It then uses this class in `DelayedFetch` 
directly instead of `FetchMetadata`. The intent is to reduce the number of 
things we need to change whenever we need to pass through new parameters. The 
patch also cleans up `ReplicaManagerTest` for more consistent usage.

Reviewers: David Jacot 
---
 core/src/main/scala/kafka/api/Request.scala|   4 +
 .../src/main/scala/kafka/server/DelayedFetch.scala |  86 ++--
 .../main/scala/kafka/server/FetchDataInfo.scala|  60 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  74 ++--
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  25 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  56 +--
 .../kafka/server/DelayedFetchTest.scala|  55 +--
 .../scala/unit/kafka/server/KafkaApisTest.scala|  28 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala |  54 ++-
 .../server/ReplicaManagerConcurrencyTest.scala |  20 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala|  50 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala | 487 +++--
 12 files changed, 538 insertions(+), 461 deletions(-)

diff --git a/core/src/main/scala/kafka/api/Request.scala 
b/core/src/main/scala/kafka/api/Request.scala
index 653b5f653a..6c405a45b0 100644
--- a/core/src/main/scala/kafka/api/Request.scala
+++ b/core/src/main/scala/kafka/api/Request.scala
@@ -25,6 +25,10 @@ object Request {
   // Broker ids are non-negative int.
   def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0
 
+  def isConsumer(replicaId: Int): Boolean = {
+replicaId < 0 && replicaId != FutureLocalReplicaId
+  }
+
   def describeReplicaId(replicaId: Int): String = {
 replicaId match {
   case OrdinaryConsumerId => "consumer"
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 8d38ef8b6d..3eb8eedf4c 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -23,7 +23,6 @@ import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 
@@ -38,36 +37,23 @@ case class FetchPartitionStatus(startOffsetMetadata: 
LogOffsetMetadata, fetchInf
   }
 }
 
-/**
- * The fetch metadata maintained by the delayed fetch operation
- */
-case class FetchMetadata(fetchMinBytes: Int,
- fetchMaxBytes: Int,
- hardMaxBytesLimit: Boolean,
- fetchOnlyLeader: Boolean,
- fetchIsolation: FetchIsolation,
- isFromFollower: Boolean,
- replicaId: Int,
- fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)]) {
-
-  override def toString = "FetchMetadata(minBytes=" + fetchMinBytes + ", " +
-"maxBytes=" + fetchMaxBytes + ", " +
-"onlyLeader=" + fetchOnlyLeader + ", " +
-"fetchIsolation=" + fetchIsolation + ", " +
-"replicaId=" + replicaId + ", " +
-"partitionStatus=" + fetchPartitionStatus + ")"
-}
 /**
  * A delayed fetch operation that can be created by the replica manager and 
watched
  * in the fetch operation purgatory
  */
-class DelayedFetch(delayMs: Long,
-   fetchMetadata: FetchMetadata,
-   replicaManager: ReplicaManager,
-   quota: ReplicaQuota,
-   clientMetadata: Option[ClientMetadata],
-   responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit)
-  extends DelayedOperation(delayMs) {
+class DelayedFetch(
+  params: FetchParams,
+  fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
+  replicaManager: ReplicaManager,
+  quota: ReplicaQuota,
+  responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
+) extends DelayedOperation(params.maxWaitMs) {
+
+  override def toString: String = {
+s"DelayedFetch(params=$params" +
+  s", 

[kafka] branch trunk updated (0c1cde1080 -> 1cfc7c25fd)

2022-05-10 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 0c1cde1080 KAFKA-13862; Support Append/Subtract multiple config values 
in KRaft mode (#12108)
 add 1cfc7c25fd MINOR: install Exit.exit handler in 
BrokerMetadataPublisherTest (#12142)

No new revisions were added by this update.

Summary of changes:
 .../metadata/BrokerMetadataPublisherTest.scala | 23 --
 1 file changed, 21 insertions(+), 2 deletions(-)



[kafka] branch trunk updated: KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)

2022-05-10 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 0c1cde1080 KAFKA-13862; Support Append/Subtract multiple config values 
in KRaft mode (#12108)
0c1cde1080 is described below

commit 0c1cde10802456b1bc3f5f12f5e4d3d6ae400edd
Author: dengziming 
AuthorDate: Wed May 11 03:41:17 2022 +0800

KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode 
(#12108)

We can append/subtract multiple config values in kraft mode using the 
`IncrementalAlterConfig` RPC. For example: append/subtract topic config 
"cleanup.policy" with value="delete,compact" will end up treating 
"delete,compact" as a value not 2 values. This patch fixes the problem. 
Additionally, it update the zk logic to correctly handle duplicate additions.

Reviewers: Akhilesh Chaganti , Jason 
Gustafson 
---
 .../scala/kafka/server/ConfigAdminManager.scala|  3 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  9 +++
 .../kafka/server/metadata/MetadataPublisher.scala  |  5 ++
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 71 +++---
 .../metadata/BrokerMetadataListenerTest.scala  |  4 ++
 .../test/scala/unit/kafka/utils/TestUtils.scala| 19 ++
 .../controller/ConfigurationControlManager.java| 16 +++--
 .../ConfigurationControlManagerTest.java   | 67 +---
 8 files changed, 170 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala 
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index a7f5c6bdef..e7d6c33ab2 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -499,7 +499,8 @@ object ConfigAdminManager {
 
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
 ConfigDef.Type.LIST)))
 .getOrElse("")
 .split(",").toList
-  val newValueList = oldValueList ::: 
alterConfigOp.configEntry.value.split(",").toList
+  val appendingValueList = 
alterConfigOp.configEntry.value.split(",").toList.filter(value => 
!oldValueList.contains(value))
+  val newValueList = oldValueList ::: appendingValueList
   configProps.setProperty(alterConfigOp.configEntry.name, 
newValueList.mkString(","))
 }
 case OpType.SUBTRACT => {
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index e653e6e5b2..fb6bb61544 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -18,6 +18,7 @@
 package kafka.server.metadata
 
 import java.util.Properties
+import java.util.concurrent.atomic.AtomicLong
 
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
@@ -118,6 +119,11 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
*/
   var _firstPublish = true
 
+  /**
+   * This is updated after all components (e.g. LogManager) has finished 
publishing the new metadata delta
+   */
+  val publishedOffsetAtomic = new AtomicLong(-1)
+
   override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
 val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
 
@@ -249,6 +255,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
   if (_firstPublish) {
 finishInitializingReplicaManager(newImage)
   }
+  publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
 } catch {
   case t: Throwable => error(s"Error publishing broker metadata at 
$highestOffsetAndEpoch", t)
 throw t
@@ -257,6 +264,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
 }
   }
 
+  override def publishedOffset: Long = publishedOffsetAtomic.get()
+
   def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
 conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
   }
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
index 104d164d9c..b63a2c056c 100644
--- a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
@@ -30,4 +30,9 @@ trait MetadataPublisher {
*   delta to the previous image.
*/
   def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
+
+  /**
+   * The highest offset of metadata topic which has been published
+   */
+  def publishedOffset: Long
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 

[kafka] branch trunk updated: MINOR: Update release versions for upgrade tests with 3.2.0 release (#12143)

2022-05-10 Thread cadonna
This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 020ff2fe0e MINOR: Update release versions for upgrade tests with 3.2.0 
release (#12143)
020ff2fe0e is described below

commit 020ff2fe0ebb063bb40bca35286e1bd5e6ca42dd
Author: Bruno Cadonna 
AuthorDate: Tue May 10 14:47:46 2022 +0200

MINOR: Update release versions for upgrade tests with 3.2.0 release (#12143)

Updates release versions in files that are used for upgrade test with the 
3.2.0 release version.

Reviewer: David Jacot 
---
 gradle/dependencies.gradle | 1 +
 tests/docker/Dockerfile| 2 ++
 vagrant/base.sh| 3 +++
 3 files changed, 6 insertions(+)

diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 066d62bc85..6ce71917b7 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -100,6 +100,7 @@ versions += [
   kafka_28: "2.8.1",
   kafka_30: "3.0.1",
   kafka_31: "3.1.0",
+  kafka_32: "3.2.0",
   lz4: "1.8.0",
   mavenArtifact: "3.8.4",
   metrics: "2.2.0",
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index 3f312d254c..25d39b8b29 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -65,6 +65,7 @@ RUN mkdir -p "/opt/kafka-2.7.1" && chmod a+rw 
/opt/kafka-2.7.1 && curl -s "$KAFK
 RUN mkdir -p "/opt/kafka-2.8.1" && chmod a+rw /opt/kafka-2.8.1 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-2.8.1.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-2.8.1"
 RUN mkdir -p "/opt/kafka-3.0.1" && chmod a+rw /opt/kafka-3.0.1 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-3.0.1.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-3.0.1"
 RUN mkdir -p "/opt/kafka-3.1.0" && chmod a+rw /opt/kafka-3.1.0 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-3.1.0.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-3.1.0"
+RUN mkdir -p "/opt/kafka-3.2.0" && chmod a+rw /opt/kafka-3.2.0 && curl -s 
"$KAFKA_MIRROR/kafka_2.12-3.2.0.tgz" | tar xz --strip-components=1 -C 
"/opt/kafka-3.2.0"
 
 # Streams test dependencies
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o 
/opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
@@ -84,6 +85,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.7.1-test.jar" -o 
/opt/kafka-2.7.1/lib
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.8.1-test.jar" -o 
/opt/kafka-2.8.1/libs/kafka-streams-2.8.1-test.jar
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.0.1-test.jar" -o 
/opt/kafka-3.0.1/libs/kafka-streams-3.0.1-test.jar
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.1.0-test.jar" -o 
/opt/kafka-3.1.0/libs/kafka-streams-3.1.0-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.2.0-test.jar" -o 
/opt/kafka-3.1.0/libs/kafka-streams-3.2.0-test.jar
 
 # The version of Kibosh to use for testing.
 # If you update this, also update vagrant/base.sh
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 13cc9ff0b7..3cf19e4d79 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -152,6 +152,9 @@ get_kafka 3.0.1 2.12
 chmod a+rw /opt/kafka-3.0.1
 get_kafka 3.1.0 2.12
 chmod a+rw /opt/kafka-3.1.0
+get_kafka 3.2.0 2.12
+chmod a+rw /opt/kafka-3.2.0
+
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On 
local
 # VMs, we can just create it if it doesn't exist and use it like we'd use



[kafka] branch trunk updated: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131)

2022-05-10 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new eeb1e702eb KAFKA-13879: Reconnect exponential backoff is ineffective 
in some cases (#12131)
eeb1e702eb is described below

commit eeb1e702eb7a43d88f11458f739672e2b7aa4871
Author: chern 
AuthorDate: Tue May 10 03:36:42 2022 -0700

KAFKA-13879: Reconnect exponential backoff is ineffective in some cases 
(#12131)

When a client connects to a SSL listener using PLAINTEXT security protocol, 
after the TCP connection is setup, the client considers the channel setup is 
complete. In reality the channel setup is not complete yet. The client then 
resets reconnect exponential backoff and issues API version request. Since the 
broker expects SSL handshake, the API version request will cause the connection 
to disconnect. Client reconnects without exponential backoff since it has been 
reset.

This commit removes the reset of reconnect exponential backoff when sending 
API version request. In the good case where the channel setup is complete, 
reconnect exponential backoff will be reset when the node becomes ready, which 
is after getting the API version response. Inter-broker clients which do not 
send API version request and go directly to ready state continue to reset 
backoff before any  successful requests.

Reviewers: Rajini Sivaram 
---
 .../kafka/clients/ClusterConnectionStates.java |  1 -
 .../kafka/clients/ClusterConnectionStatesTest.java | 38 ++
 .../apache/kafka/clients/NetworkClientTest.java| 31 ++
 3 files changed, 48 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 95efdbeae4..f4d9092258 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -246,7 +246,6 @@ final class ClusterConnectionStates {
 public void checkingApiVersions(String id) {
 NodeConnectionState nodeState = nodeState(id);
 nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
-resetReconnectBackoff(nodeState);
 resetConnectionSetupTimeout(nodeState);
 connectingNodes.remove(id);
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 72cc123921..96fe89ca11 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -231,20 +231,8 @@ public class ClusterConnectionStatesTest {
 
 @Test
 public void testExponentialReconnectBackoff() {
-double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / 
(double) Math.max(reconnectBackoffMs, 1))
-/ Math.log(reconnectBackoffExpBase);
-
-// Run through 10 disconnects and check that reconnect backoff value 
is within expected range for every attempt
-for (int i = 0; i < 10; i++) {
-connectionStates.connecting(nodeId1, time.milliseconds(), 
"localhost");
-connectionStates.disconnected(nodeId1, time.milliseconds());
-// Calculate expected backoff value without jitter
-long expectedBackoff = 
Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, 
reconnectBackoffMaxExp))
-* reconnectBackoffMs);
-long currentBackoff = connectionStates.connectionDelay(nodeId1, 
time.milliseconds());
-assertEquals(expectedBackoff, currentBackoff, 
reconnectBackoffJitter * expectedBackoff);
-time.sleep(connectionStates.connectionDelay(nodeId1, 
time.milliseconds()) + 1);
-}
+verifyReconnectExponentialBackoff(false);
+verifyReconnectExponentialBackoff(true);
 }
 
 @Test
@@ -426,4 +414,26 @@ public class ClusterConnectionStatesTest {
 this.connectionStates = new 
ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
 connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new 
LogContext(), this.multipleIPHostResolver);
 }
+
+private void verifyReconnectExponentialBackoff(boolean 
enterCheckingApiVersionState) {
+double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / 
(double) Math.max(reconnectBackoffMs, 1))
+/ Math.log(reconnectBackoffExpBase);
+
+connectionStates.remove(nodeId1);
+// Run through 10 disconnects and check that reconnect backoff value 
is within expected range for every attempt
+for (int i = 0; i < 10; i++) {
+

svn commit: r54409 - in /release/kafka: ./ 3.2.0/

2022-05-10 Thread dajac
Author: dajac
Date: Tue May 10 07:42:32 2022
New Revision: 54409

Log:
Release 3.2.0

Added:
release/kafka/3.2.0/
release/kafka/3.2.0/RELEASE_NOTES.html
release/kafka/3.2.0/RELEASE_NOTES.html.asc
release/kafka/3.2.0/RELEASE_NOTES.html.md5
release/kafka/3.2.0/RELEASE_NOTES.html.sha1
release/kafka/3.2.0/RELEASE_NOTES.html.sha512
release/kafka/3.2.0/kafka-3.2.0-src.tgz   (with props)
release/kafka/3.2.0/kafka-3.2.0-src.tgz.asc
release/kafka/3.2.0/kafka-3.2.0-src.tgz.md5
release/kafka/3.2.0/kafka-3.2.0-src.tgz.sha1
release/kafka/3.2.0/kafka-3.2.0-src.tgz.sha512
release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz   (with props)
release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz.asc
release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz.md5
release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz.sha1
release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz.sha512
release/kafka/3.2.0/kafka_2.12-3.2.0.tgz   (with props)
release/kafka/3.2.0/kafka_2.12-3.2.0.tgz.asc
release/kafka/3.2.0/kafka_2.12-3.2.0.tgz.md5
release/kafka/3.2.0/kafka_2.12-3.2.0.tgz.sha1
release/kafka/3.2.0/kafka_2.12-3.2.0.tgz.sha512
release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz   (with props)
release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz.asc
release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz.md5
release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz.sha1
release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz.sha512
release/kafka/3.2.0/kafka_2.13-3.2.0.tgz   (with props)
release/kafka/3.2.0/kafka_2.13-3.2.0.tgz.asc
release/kafka/3.2.0/kafka_2.13-3.2.0.tgz.md5
release/kafka/3.2.0/kafka_2.13-3.2.0.tgz.sha1
release/kafka/3.2.0/kafka_2.13-3.2.0.tgz.sha512
Modified:
release/kafka/KEYS

Added: release/kafka/3.2.0/RELEASE_NOTES.html
==
--- release/kafka/3.2.0/RELEASE_NOTES.html (added)
+++ release/kafka/3.2.0/RELEASE_NOTES.html Tue May 10 07:42:32 2022
@@ -0,0 +1,155 @@
+Release Notes - Kafka - Version 3.2.0
+Below is a summary of the JIRA issues addressed in the 3.2.0 release of 
Kafka. For full documentation of the
+release, a guide to get started, and information about the project, see 
the https://kafka.apache.org/;>Kafka
+project site.
+
+Note about upgrades: Please carefully review the
+https://kafka.apache.org/32/documentation.html#upgrade;>upgrade 
documentation for this release thoroughly
+before upgrading your cluster. The upgrade notes discuss any critical 
information about incompatibilities and breaking
+changes, performance changes, and any other changes that might impact your 
production deployment of Kafka.
+
+The documentation for the most recent release can be found at
+https://kafka.apache.org/documentation.html;>https://kafka.apache.org/documentation.html.
+New Feature
+
+[https://issues.apache.org/jira/browse/KAFKA-6718;>KAFKA-6718] - Rack 
Aware Stand-by Task Assignment for Kafka Streams
+[https://issues.apache.org/jira/browse/KAFKA-13479;>KAFKA-13479] - 
Interactive Query v2
+
+Improvement
+
+[https://issues.apache.org/jira/browse/KAFKA-7077;>KAFKA-7077] - 
KIP-318: Make Kafka Connect Source idempotent
+[https://issues.apache.org/jira/browse/KAFKA-7589;>KAFKA-7589] - Allow 
configuring num.network.threads per listener
+[https://issues.apache.org/jira/browse/KAFKA-9648;>KAFKA-9648] - Add 
configuration to adjust listen backlog size for Acceptor
+[https://issues.apache.org/jira/browse/KAFKA-9847;>KAFKA-9847] - Add 
config to set default store type
+[https://issues.apache.org/jira/browse/KAFKA-12959;>KAFKA-12959] - 
Prioritize assigning standby tasks to threads without any active tasks
+[https://issues.apache.org/jira/browse/KAFKA-12980;>KAFKA-12980] - 
Allow consumers to return from poll when position advances due to aborted 
transactions
+[https://issues.apache.org/jira/browse/KAFKA-13200;>KAFKA-13200] - Fix 
version of MirrorMaker2 connectors
+[https://issues.apache.org/jira/browse/KAFKA-13323;>KAFKA-13323] - The 
words are ambiguous
+[https://issues.apache.org/jira/browse/KAFKA-13348;>KAFKA-13348] - 
Allow Source Tasks to Handle Producer Exceptions
+[https://issues.apache.org/jira/browse/KAFKA-13426;>KAFKA-13426] - Add 
recordMetadata to StateStoreContext
+[https://issues.apache.org/jira/browse/KAFKA-13441;>KAFKA-13441] - 
improve upgrade doc
+[https://issues.apache.org/jira/browse/KAFKA-13445;>KAFKA-13445] - Add 
ECDSA test for JWT validation
+[https://issues.apache.org/jira/browse/KAFKA-13449;>KAFKA-13449] - 
Comment optimization for parameter log.cleaner.delete.retention.ms 
+[https://issues.apache.org/jira/browse/KAFKA-13451;>KAFKA-13451] - Add 
reason to JoinGroupRequest and LeaveGroupRequest
+[https://issues.apache.org/jira/browse/KAFKA-13455;>KAFKA-13455] - The 
Apache Kafka quickstart guide does not contain any steps for running Kafka 
Connect

[kafka] branch trunk updated: MINOR: Small cleanups in connect/mirror (#12113)

2022-05-10 Thread showuon
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 989d3ce07f MINOR: Small cleanups in connect/mirror (#12113)
989d3ce07f is described below

commit 989d3ce07f1848d4c0b9fbb116ff0cf9b3b382d7
Author: Mickael Maison 
AuthorDate: Tue May 10 08:49:56 2022 +0200

MINOR: Small cleanups in connect/mirror (#12113)

Reviewers: Luke Chen , Divij Vaidya 

---
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  2 +-
 .../apache/kafka/connect/mirror/OffsetSync.java|  6 +++---
 .../kafka/connect/mirror/OffsetSyncStore.java  |  6 +++---
 .../org/apache/kafka/connect/mirror/Scheduler.java | 22 +++---
 .../MirrorConnectorsIntegrationBaseTest.java   |  2 +-
 5 files changed, 19 insertions(+), 19 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 47631998fb..30fb695d92 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -105,7 +105,7 @@ public class MirrorCheckpointTask extends SourceTask {
 }
 
 @Override
-public void commit() throws InterruptedException {
+public void commit() {
 // nop
 }
 
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
index 68e6441f18..e1ecb1e1db 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
@@ -39,9 +39,9 @@ public class OffsetSync {
 new Field(TOPIC_KEY, Type.STRING),
 new Field(PARTITION_KEY, Type.INT32));
 
-private TopicPartition topicPartition;
-private long upstreamOffset;
-private long downstreamOffset;
+private final TopicPartition topicPartition;
+private final long upstreamOffset;
+private final long downstreamOffset;
 
 public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long 
downstreamOffset) {
 this.topicPartition = topicPartition;
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 600dda46f3..9152cd5aa0 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -30,9 +30,9 @@ import java.time.Duration;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-private KafkaConsumer consumer;
-private Map offsetSyncs = new HashMap<>();
-private TopicPartition offsetSyncTopicPartition;
+private final KafkaConsumer consumer;
+private final Map offsetSyncs = new 
HashMap<>();
+private final TopicPartition offsetSyncTopicPartition;
 
 OffsetSyncStore(MirrorConnectorConfig config) {
 consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
index 20f2ca7e2c..0644d6a6c6 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class Scheduler implements AutoCloseable {
-private static Logger log = LoggerFactory.getLogger(Scheduler.class);
+private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
 
 private final String name;
 private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
@@ -62,11 +62,11 @@ class Scheduler implements AutoCloseable {
 try {
 executor.submit(() -> executeThread(task, 
description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
 } catch (InterruptedException e) {
-log.warn("{} was interrupted running task: {}", name, description);
+LOG.warn("{} was interrupted running task: {}", name, description);
 } catch (TimeoutException e) {
-log.error("{} timed out running task: {}", name, description);
+LOG.error("{} timed out running task: {}", name, description);
 } catch (Throwable e) {
-log.error("{} caught exception in task: {}", name, description, e);
+