[kafka] branch 3.1 updated: MINOR: reload4j build dependency fixes (#12144)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new bafa69edbb MINOR: reload4j build dependency fixes (#12144) bafa69edbb is described below commit bafa69edbbf772752009ac02f0f271230a9a2482 Author: Ismael Juma AuthorDate: Tue May 10 20:14:21 2022 -0700 MINOR: reload4j build dependency fixes (#12144) * Replace `log4j` with `reload4j` in `copyDependantLibs`. Since we have some projects that have an explicit `reload4j` dependency, it was included in the final release release tar - i.e. it was effectively a workaround for this bug. * Exclude `log4j` and `slf4j-log4j12` transitive dependencies for `streams:upgrade-system-tests`. Versions 0100 and 0101 had a transitive dependency to `log4j` and `slf4j-log4j12` via `zkclient` and `zookeeper`. This avoids classpath conflicts that lead to [NoSuchFieldError](https://github.com/qos-ch/reload4j/issues/41) in system tests. Reviewers: Jason Gustafson --- build.gradle | 32 +++- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/build.gradle b/build.gradle index 308d7182fc..050c182e3e 100644 --- a/build.gradle +++ b/build.gradle @@ -903,7 +903,7 @@ project(':core') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1628,7 +1628,7 @@ project(':tools') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1678,7 +1678,7 @@ project(':trogdor') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1985,7 +1985,10 @@ project(':streams:upgrade-system-tests-0100') { archivesBaseName = "kafka-streams-upgrade-system-tests-0100" dependencies { -testImplementation libs.kafkaStreams_0100 +testImplementation(libs.kafkaStreams_0100) { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'log4j', module: 'log4j' +} testRuntimeOnly libs.junitJupiter } @@ -1998,7 +2001,10 @@ project(':streams:upgrade-system-tests-0101') { archivesBaseName = "kafka-streams-upgrade-system-tests-0101" dependencies { -testImplementation libs.kafkaStreams_0101 +testImplementation(libs.kafkaStreams_0101) { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'log4j', module: 'log4j' +} testRuntimeOnly libs.junitJupiter } @@ -2278,7 +2284,7 @@ project(':connect:api') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2315,7 +2321,7 @@ project(':connect:transforms') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2355,7 +2361,7 @@ project(':connect:json') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2421,8 +2427,8 @@ project(':connect:runtime') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { + // No need to copy log4j since the module has an explicit dependency on that include('slf4j-log4j12*') - include('log4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2502,7 +2508,7 @@ project(':connect:file') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2541,7 +2547,7 @@ project(':connect:basic-auth-extension') {
[kafka] branch 3.2 updated: MINOR: reload4j build dependency fixes (#12144)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new c761ee0348 MINOR: reload4j build dependency fixes (#12144) c761ee0348 is described below commit c761ee034844414016c27d4e8689cc4e6b105df2 Author: Ismael Juma AuthorDate: Tue May 10 20:14:21 2022 -0700 MINOR: reload4j build dependency fixes (#12144) * Replace `log4j` with `reload4j` in `copyDependantLibs`. Since we have some projects that have an explicit `reload4j` dependency, it was included in the final release release tar - i.e. it was effectively a workaround for this bug. * Exclude `log4j` and `slf4j-log4j12` transitive dependencies for `streams:upgrade-system-tests`. Versions 0100 and 0101 had a transitive dependency to `log4j` and `slf4j-log4j12` via `zkclient` and `zookeeper`. This avoids classpath conflicts that lead to [NoSuchFieldError](https://github.com/qos-ch/reload4j/issues/41) in system tests. Reviewers: Jason Gustafson --- build.gradle | 32 +++- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/build.gradle b/build.gradle index f91c606a00..aa4f94d704 100644 --- a/build.gradle +++ b/build.gradle @@ -916,7 +916,7 @@ project(':core') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1641,7 +1641,7 @@ project(':tools') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1691,7 +1691,7 @@ project(':trogdor') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2000,7 +2000,10 @@ project(':streams:upgrade-system-tests-0100') { archivesBaseName = "kafka-streams-upgrade-system-tests-0100" dependencies { -testImplementation libs.kafkaStreams_0100 +testImplementation(libs.kafkaStreams_0100) { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'log4j', module: 'log4j' +} testRuntimeOnly libs.junitJupiter } @@ -2013,7 +2016,10 @@ project(':streams:upgrade-system-tests-0101') { archivesBaseName = "kafka-streams-upgrade-system-tests-0101" dependencies { -testImplementation libs.kafkaStreams_0101 +testImplementation(libs.kafkaStreams_0101) { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'log4j', module: 'log4j' +} testRuntimeOnly libs.junitJupiter } @@ -2319,7 +2325,7 @@ project(':connect:api') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2356,7 +2362,7 @@ project(':connect:transforms') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2396,7 +2402,7 @@ project(':connect:json') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2462,8 +2468,8 @@ project(':connect:runtime') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { + // No need to copy log4j since the module has an explicit dependency on that include('slf4j-log4j12*') - include('log4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2542,7 +2548,7 @@ project(':connect:file') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2581,7 +2587,7 @@ project(':connect:basic-auth-extension') {
[kafka] branch trunk updated: MINOR: reload4j build dependency fixes (#12144)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2b14a0bb6d MINOR: reload4j build dependency fixes (#12144) 2b14a0bb6d is described below commit 2b14a0bb6dd11290d40c033c8c7ac203df72c988 Author: Ismael Juma AuthorDate: Tue May 10 20:14:21 2022 -0700 MINOR: reload4j build dependency fixes (#12144) * Replace `log4j` with `reload4j` in `copyDependantLibs`. Since we have some projects that have an explicit `reload4j` dependency, it was included in the final release release tar - i.e. it was effectively a workaround for this bug. * Exclude `log4j` and `slf4j-log4j12` transitive dependencies for `streams:upgrade-system-tests`. Versions 0100 and 0101 had a transitive dependency to `log4j` and `slf4j-log4j12` via `zkclient` and `zookeeper`. This avoids classpath conflicts that lead to [NoSuchFieldError](https://github.com/qos-ch/reload4j/issues/41) in system tests. Reviewers: Jason Gustafson --- build.gradle | 32 +++- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/build.gradle b/build.gradle index 68dbbf513d..b80de61385 100644 --- a/build.gradle +++ b/build.gradle @@ -922,7 +922,7 @@ project(':core') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1701,7 +1701,7 @@ project(':tools') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1751,7 +1751,7 @@ project(':trogdor') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2071,7 +2071,10 @@ project(':streams:upgrade-system-tests-0100') { archivesBaseName = "kafka-streams-upgrade-system-tests-0100" dependencies { -testImplementation libs.kafkaStreams_0100 +testImplementation(libs.kafkaStreams_0100) { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'log4j', module: 'log4j' +} testRuntimeOnly libs.junitJupiter } @@ -2084,7 +2087,10 @@ project(':streams:upgrade-system-tests-0101') { archivesBaseName = "kafka-streams-upgrade-system-tests-0101" dependencies { -testImplementation libs.kafkaStreams_0101 +testImplementation(libs.kafkaStreams_0101) { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'log4j', module: 'log4j' +} testRuntimeOnly libs.junitJupiter } @@ -2391,7 +2397,7 @@ project(':connect:api') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2428,7 +2434,7 @@ project(':connect:transforms') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2468,7 +2474,7 @@ project(':connect:json') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2534,8 +2540,8 @@ project(':connect:runtime') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { + // No need to copy log4j since the module has an explicit dependency on that include('slf4j-log4j12*') - include('log4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2614,7 +2620,7 @@ project(':connect:file') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2653,7 +2659,7 @@ project(':connect:basic-auth-extension') {
[kafka] branch trunk updated (7730476603 -> 040b11d705)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 7730476603 MINOR: Create case class to encapsulate fetch parameters and simplify handling (#12082) add 040b11d705 KAFKA-13892: Fix bug where multiple remove records are generated for one ACL No new revisions were added by this update. Summary of changes: .../apache/kafka/controller/AclControlManager.java | 9 --- .../kafka/controller/AclControlManagerTest.java| 30 ++ 2 files changed, 36 insertions(+), 3 deletions(-)
[kafka] branch update-readme-regarding-authorizer created (now 16c4d59629)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch update-readme-regarding-authorizer in repository https://gitbox.apache.org/repos/asf/kafka.git at 16c4d59629 MINOR: Remove kraft authorizer from list of missing features No new revisions were added by this update.
[kafka] branch trunk updated: MINOR: Create case class to encapsulate fetch parameters and simplify handling (#12082)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7730476603 MINOR: Create case class to encapsulate fetch parameters and simplify handling (#12082) 7730476603 is described below commit 773047660359bf6b551d06763eeff80bc551b58a Author: Jason Gustafson AuthorDate: Tue May 10 13:24:23 2022 -0700 MINOR: Create case class to encapsulate fetch parameters and simplify handling (#12082) This patch adds a new case class `FetchParams` which encapsulates the parameters of the fetch request. It then uses this class in `DelayedFetch` directly instead of `FetchMetadata`. The intent is to reduce the number of things we need to change whenever we need to pass through new parameters. The patch also cleans up `ReplicaManagerTest` for more consistent usage. Reviewers: David Jacot --- core/src/main/scala/kafka/api/Request.scala| 4 + .../src/main/scala/kafka/server/DelayedFetch.scala | 86 ++-- .../main/scala/kafka/server/FetchDataInfo.scala| 60 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 74 ++-- .../kafka/server/ReplicaAlterLogDirsThread.scala | 25 +- .../main/scala/kafka/server/ReplicaManager.scala | 56 +-- .../kafka/server/DelayedFetchTest.scala| 55 +-- .../scala/unit/kafka/server/KafkaApisTest.scala| 28 +- .../server/ReplicaAlterLogDirsThreadTest.scala | 54 ++- .../server/ReplicaManagerConcurrencyTest.scala | 20 +- .../kafka/server/ReplicaManagerQuotasTest.scala| 50 ++- .../unit/kafka/server/ReplicaManagerTest.scala | 487 +++-- 12 files changed, 538 insertions(+), 461 deletions(-) diff --git a/core/src/main/scala/kafka/api/Request.scala b/core/src/main/scala/kafka/api/Request.scala index 653b5f653a..6c405a45b0 100644 --- a/core/src/main/scala/kafka/api/Request.scala +++ b/core/src/main/scala/kafka/api/Request.scala @@ -25,6 +25,10 @@ object Request { // Broker ids are non-negative int. def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0 + def isConsumer(replicaId: Int): Boolean = { +replicaId < 0 && replicaId != FutureLocalReplicaId + } + def describeReplicaId(replicaId: Int): String = { replicaId match { case OrdinaryConsumerId => "consumer" diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 8d38ef8b6d..3eb8eedf4c 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -23,7 +23,6 @@ import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} @@ -38,36 +37,23 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf } } -/** - * The fetch metadata maintained by the delayed fetch operation - */ -case class FetchMetadata(fetchMinBytes: Int, - fetchMaxBytes: Int, - hardMaxBytesLimit: Boolean, - fetchOnlyLeader: Boolean, - fetchIsolation: FetchIsolation, - isFromFollower: Boolean, - replicaId: Int, - fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]) { - - override def toString = "FetchMetadata(minBytes=" + fetchMinBytes + ", " + -"maxBytes=" + fetchMaxBytes + ", " + -"onlyLeader=" + fetchOnlyLeader + ", " + -"fetchIsolation=" + fetchIsolation + ", " + -"replicaId=" + replicaId + ", " + -"partitionStatus=" + fetchPartitionStatus + ")" -} /** * A delayed fetch operation that can be created by the replica manager and watched * in the fetch operation purgatory */ -class DelayedFetch(delayMs: Long, - fetchMetadata: FetchMetadata, - replicaManager: ReplicaManager, - quota: ReplicaQuota, - clientMetadata: Option[ClientMetadata], - responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit) - extends DelayedOperation(delayMs) { +class DelayedFetch( + params: FetchParams, + fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], + replicaManager: ReplicaManager, + quota: ReplicaQuota, + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit +) extends DelayedOperation(params.maxWaitMs) { + + override def toString: String = { +s"DelayedFetch(params=$params" + + s",
[kafka] branch trunk updated (0c1cde1080 -> 1cfc7c25fd)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 0c1cde1080 KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108) add 1cfc7c25fd MINOR: install Exit.exit handler in BrokerMetadataPublisherTest (#12142) No new revisions were added by this update. Summary of changes: .../metadata/BrokerMetadataPublisherTest.scala | 23 -- 1 file changed, 21 insertions(+), 2 deletions(-)
[kafka] branch trunk updated: KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0c1cde1080 KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108) 0c1cde1080 is described below commit 0c1cde10802456b1bc3f5f12f5e4d3d6ae400edd Author: dengziming AuthorDate: Wed May 11 03:41:17 2022 +0800 KAFKA-13862; Support Append/Subtract multiple config values in KRaft mode (#12108) We can append/subtract multiple config values in kraft mode using the `IncrementalAlterConfig` RPC. For example: append/subtract topic config "cleanup.policy" with value="delete,compact" will end up treating "delete,compact" as a value not 2 values. This patch fixes the problem. Additionally, it update the zk logic to correctly handle duplicate additions. Reviewers: Akhilesh Chaganti , Jason Gustafson --- .../scala/kafka/server/ConfigAdminManager.scala| 3 +- .../server/metadata/BrokerMetadataPublisher.scala | 9 +++ .../kafka/server/metadata/MetadataPublisher.scala | 5 ++ .../kafka/api/PlaintextAdminIntegrationTest.scala | 71 +++--- .../metadata/BrokerMetadataListenerTest.scala | 4 ++ .../test/scala/unit/kafka/utils/TestUtils.scala| 19 ++ .../controller/ConfigurationControlManager.java| 16 +++-- .../ConfigurationControlManagerTest.java | 67 +--- 8 files changed, 170 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala index a7f5c6bdef..e7d6c33ab2 100644 --- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala +++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala @@ -499,7 +499,8 @@ object ConfigAdminManager { .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST))) .getOrElse("") .split(",").toList - val newValueList = oldValueList ::: alterConfigOp.configEntry.value.split(",").toList + val appendingValueList = alterConfigOp.configEntry.value.split(",").toList.filter(value => !oldValueList.contains(value)) + val newValueList = oldValueList ::: appendingValueList configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(",")) } case OpType.SUBTRACT => { diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index e653e6e5b2..fb6bb61544 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -18,6 +18,7 @@ package kafka.server.metadata import java.util.Properties +import java.util.concurrent.atomic.AtomicLong import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator @@ -118,6 +119,11 @@ class BrokerMetadataPublisher(conf: KafkaConfig, */ var _firstPublish = true + /** + * This is updated after all components (e.g. LogManager) has finished publishing the new metadata delta + */ + val publishedOffsetAtomic = new AtomicLong(-1) + override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = { val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch() @@ -249,6 +255,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig, if (_firstPublish) { finishInitializingReplicaManager(newImage) } + publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset) } catch { case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t) throw t @@ -257,6 +264,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig, } } + override def publishedOffset: Long = publishedOffsetAtomic.get() + def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = { conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props) } diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala index 104d164d9c..b63a2c056c 100644 --- a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala @@ -30,4 +30,9 @@ trait MetadataPublisher { * delta to the previous image. */ def publish(delta: MetadataDelta, newImage: MetadataImage): Unit + + /** + * The highest offset of metadata topic which has been published + */ + def publishedOffset: Long } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
[kafka] branch trunk updated: MINOR: Update release versions for upgrade tests with 3.2.0 release (#12143)
This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 020ff2fe0e MINOR: Update release versions for upgrade tests with 3.2.0 release (#12143) 020ff2fe0e is described below commit 020ff2fe0ebb063bb40bca35286e1bd5e6ca42dd Author: Bruno Cadonna AuthorDate: Tue May 10 14:47:46 2022 +0200 MINOR: Update release versions for upgrade tests with 3.2.0 release (#12143) Updates release versions in files that are used for upgrade test with the 3.2.0 release version. Reviewer: David Jacot --- gradle/dependencies.gradle | 1 + tests/docker/Dockerfile| 2 ++ vagrant/base.sh| 3 +++ 3 files changed, 6 insertions(+) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 066d62bc85..6ce71917b7 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -100,6 +100,7 @@ versions += [ kafka_28: "2.8.1", kafka_30: "3.0.1", kafka_31: "3.1.0", + kafka_32: "3.2.0", lz4: "1.8.0", mavenArtifact: "3.8.4", metrics: "2.2.0", diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 3f312d254c..25d39b8b29 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -65,6 +65,7 @@ RUN mkdir -p "/opt/kafka-2.7.1" && chmod a+rw /opt/kafka-2.7.1 && curl -s "$KAFK RUN mkdir -p "/opt/kafka-2.8.1" && chmod a+rw /opt/kafka-2.8.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.8.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.8.1" RUN mkdir -p "/opt/kafka-3.0.1" && chmod a+rw /opt/kafka-3.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.0.1" RUN mkdir -p "/opt/kafka-3.1.0" && chmod a+rw /opt/kafka-3.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.1.0" +RUN mkdir -p "/opt/kafka-3.2.0" && chmod a+rw /opt/kafka-3.2.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.2.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.2.0" # Streams test dependencies RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar @@ -84,6 +85,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.7.1-test.jar" -o /opt/kafka-2.7.1/lib RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.8.1-test.jar" -o /opt/kafka-2.8.1/libs/kafka-streams-2.8.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.0.1-test.jar" -o /opt/kafka-3.0.1/libs/kafka-streams-3.0.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.1.0-test.jar" -o /opt/kafka-3.1.0/libs/kafka-streams-3.1.0-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.2.0-test.jar" -o /opt/kafka-3.1.0/libs/kafka-streams-3.2.0-test.jar # The version of Kibosh to use for testing. # If you update this, also update vagrant/base.sh diff --git a/vagrant/base.sh b/vagrant/base.sh index 13cc9ff0b7..3cf19e4d79 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -152,6 +152,9 @@ get_kafka 3.0.1 2.12 chmod a+rw /opt/kafka-3.0.1 get_kafka 3.1.0 2.12 chmod a+rw /opt/kafka-3.1.0 +get_kafka 3.2.0 2.12 +chmod a+rw /opt/kafka-3.2.0 + # For EC2 nodes, we want to use /mnt, which should have the local disk. On local # VMs, we can just create it if it doesn't exist and use it like we'd use
[kafka] branch trunk updated: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new eeb1e702eb KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131) eeb1e702eb is described below commit eeb1e702eb7a43d88f11458f739672e2b7aa4871 Author: chern AuthorDate: Tue May 10 03:36:42 2022 -0700 KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131) When a client connects to a SSL listener using PLAINTEXT security protocol, after the TCP connection is setup, the client considers the channel setup is complete. In reality the channel setup is not complete yet. The client then resets reconnect exponential backoff and issues API version request. Since the broker expects SSL handshake, the API version request will cause the connection to disconnect. Client reconnects without exponential backoff since it has been reset. This commit removes the reset of reconnect exponential backoff when sending API version request. In the good case where the channel setup is complete, reconnect exponential backoff will be reset when the node becomes ready, which is after getting the API version response. Inter-broker clients which do not send API version request and go directly to ready state continue to reset backoff before any successful requests. Reviewers: Rajini Sivaram --- .../kafka/clients/ClusterConnectionStates.java | 1 - .../kafka/clients/ClusterConnectionStatesTest.java | 38 ++ .../apache/kafka/clients/NetworkClientTest.java| 31 ++ 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 95efdbeae4..f4d9092258 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -246,7 +246,6 @@ final class ClusterConnectionStates { public void checkingApiVersions(String id) { NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.CHECKING_API_VERSIONS; -resetReconnectBackoff(nodeState); resetConnectionSetupTimeout(nodeState); connectingNodes.remove(id); } diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java index 72cc123921..96fe89ca11 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java @@ -231,20 +231,8 @@ public class ClusterConnectionStatesTest { @Test public void testExponentialReconnectBackoff() { -double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1)) -/ Math.log(reconnectBackoffExpBase); - -// Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt -for (int i = 0; i < 10; i++) { -connectionStates.connecting(nodeId1, time.milliseconds(), "localhost"); -connectionStates.disconnected(nodeId1, time.milliseconds()); -// Calculate expected backoff value without jitter -long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp)) -* reconnectBackoffMs); -long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); -assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff); -time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1); -} +verifyReconnectExponentialBackoff(false); +verifyReconnectExponentialBackoff(true); } @Test @@ -426,4 +414,26 @@ public class ClusterConnectionStatesTest { this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(), this.multipleIPHostResolver); } + +private void verifyReconnectExponentialBackoff(boolean enterCheckingApiVersionState) { +double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1)) +/ Math.log(reconnectBackoffExpBase); + +connectionStates.remove(nodeId1); +// Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt +for (int i = 0; i < 10; i++) { +
svn commit: r54409 - in /release/kafka: ./ 3.2.0/
Author: dajac Date: Tue May 10 07:42:32 2022 New Revision: 54409 Log: Release 3.2.0 Added: release/kafka/3.2.0/ release/kafka/3.2.0/RELEASE_NOTES.html release/kafka/3.2.0/RELEASE_NOTES.html.asc release/kafka/3.2.0/RELEASE_NOTES.html.md5 release/kafka/3.2.0/RELEASE_NOTES.html.sha1 release/kafka/3.2.0/RELEASE_NOTES.html.sha512 release/kafka/3.2.0/kafka-3.2.0-src.tgz (with props) release/kafka/3.2.0/kafka-3.2.0-src.tgz.asc release/kafka/3.2.0/kafka-3.2.0-src.tgz.md5 release/kafka/3.2.0/kafka-3.2.0-src.tgz.sha1 release/kafka/3.2.0/kafka-3.2.0-src.tgz.sha512 release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz (with props) release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz.asc release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz.md5 release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz.sha1 release/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz.sha512 release/kafka/3.2.0/kafka_2.12-3.2.0.tgz (with props) release/kafka/3.2.0/kafka_2.12-3.2.0.tgz.asc release/kafka/3.2.0/kafka_2.12-3.2.0.tgz.md5 release/kafka/3.2.0/kafka_2.12-3.2.0.tgz.sha1 release/kafka/3.2.0/kafka_2.12-3.2.0.tgz.sha512 release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz (with props) release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz.asc release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz.md5 release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz.sha1 release/kafka/3.2.0/kafka_2.13-3.2.0-site-docs.tgz.sha512 release/kafka/3.2.0/kafka_2.13-3.2.0.tgz (with props) release/kafka/3.2.0/kafka_2.13-3.2.0.tgz.asc release/kafka/3.2.0/kafka_2.13-3.2.0.tgz.md5 release/kafka/3.2.0/kafka_2.13-3.2.0.tgz.sha1 release/kafka/3.2.0/kafka_2.13-3.2.0.tgz.sha512 Modified: release/kafka/KEYS Added: release/kafka/3.2.0/RELEASE_NOTES.html == --- release/kafka/3.2.0/RELEASE_NOTES.html (added) +++ release/kafka/3.2.0/RELEASE_NOTES.html Tue May 10 07:42:32 2022 @@ -0,0 +1,155 @@ +Release Notes - Kafka - Version 3.2.0 +Below is a summary of the JIRA issues addressed in the 3.2.0 release of Kafka. For full documentation of the +release, a guide to get started, and information about the project, see the https://kafka.apache.org/;>Kafka +project site. + +Note about upgrades: Please carefully review the +https://kafka.apache.org/32/documentation.html#upgrade;>upgrade documentation for this release thoroughly +before upgrading your cluster. The upgrade notes discuss any critical information about incompatibilities and breaking +changes, performance changes, and any other changes that might impact your production deployment of Kafka. + +The documentation for the most recent release can be found at +https://kafka.apache.org/documentation.html;>https://kafka.apache.org/documentation.html. +New Feature + +[https://issues.apache.org/jira/browse/KAFKA-6718;>KAFKA-6718] - Rack Aware Stand-by Task Assignment for Kafka Streams +[https://issues.apache.org/jira/browse/KAFKA-13479;>KAFKA-13479] - Interactive Query v2 + +Improvement + +[https://issues.apache.org/jira/browse/KAFKA-7077;>KAFKA-7077] - KIP-318: Make Kafka Connect Source idempotent +[https://issues.apache.org/jira/browse/KAFKA-7589;>KAFKA-7589] - Allow configuring num.network.threads per listener +[https://issues.apache.org/jira/browse/KAFKA-9648;>KAFKA-9648] - Add configuration to adjust listen backlog size for Acceptor +[https://issues.apache.org/jira/browse/KAFKA-9847;>KAFKA-9847] - Add config to set default store type +[https://issues.apache.org/jira/browse/KAFKA-12959;>KAFKA-12959] - Prioritize assigning standby tasks to threads without any active tasks +[https://issues.apache.org/jira/browse/KAFKA-12980;>KAFKA-12980] - Allow consumers to return from poll when position advances due to aborted transactions +[https://issues.apache.org/jira/browse/KAFKA-13200;>KAFKA-13200] - Fix version of MirrorMaker2 connectors +[https://issues.apache.org/jira/browse/KAFKA-13323;>KAFKA-13323] - The words are ambiguous +[https://issues.apache.org/jira/browse/KAFKA-13348;>KAFKA-13348] - Allow Source Tasks to Handle Producer Exceptions +[https://issues.apache.org/jira/browse/KAFKA-13426;>KAFKA-13426] - Add recordMetadata to StateStoreContext +[https://issues.apache.org/jira/browse/KAFKA-13441;>KAFKA-13441] - improve upgrade doc +[https://issues.apache.org/jira/browse/KAFKA-13445;>KAFKA-13445] - Add ECDSA test for JWT validation +[https://issues.apache.org/jira/browse/KAFKA-13449;>KAFKA-13449] - Comment optimization for parameter log.cleaner.delete.retention.ms +[https://issues.apache.org/jira/browse/KAFKA-13451;>KAFKA-13451] - Add reason to JoinGroupRequest and LeaveGroupRequest +[https://issues.apache.org/jira/browse/KAFKA-13455;>KAFKA-13455] - The Apache Kafka quickstart guide does not contain any steps for running Kafka Connect
[kafka] branch trunk updated: MINOR: Small cleanups in connect/mirror (#12113)
This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 989d3ce07f MINOR: Small cleanups in connect/mirror (#12113) 989d3ce07f is described below commit 989d3ce07f1848d4c0b9fbb116ff0cf9b3b382d7 Author: Mickael Maison AuthorDate: Tue May 10 08:49:56 2022 +0200 MINOR: Small cleanups in connect/mirror (#12113) Reviewers: Luke Chen , Divij Vaidya --- .../kafka/connect/mirror/MirrorCheckpointTask.java | 2 +- .../apache/kafka/connect/mirror/OffsetSync.java| 6 +++--- .../kafka/connect/mirror/OffsetSyncStore.java | 6 +++--- .../org/apache/kafka/connect/mirror/Scheduler.java | 22 +++--- .../MirrorConnectorsIntegrationBaseTest.java | 2 +- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 47631998fb..30fb695d92 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -105,7 +105,7 @@ public class MirrorCheckpointTask extends SourceTask { } @Override -public void commit() throws InterruptedException { +public void commit() { // nop } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java index 68e6441f18..e1ecb1e1db 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java @@ -39,9 +39,9 @@ public class OffsetSync { new Field(TOPIC_KEY, Type.STRING), new Field(PARTITION_KEY, Type.INT32)); -private TopicPartition topicPartition; -private long upstreamOffset; -private long downstreamOffset; +private final TopicPartition topicPartition; +private final long upstreamOffset; +private final long downstreamOffset; public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { this.topicPartition = topicPartition; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index 600dda46f3..9152cd5aa0 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -30,9 +30,9 @@ import java.time.Duration; /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { -private KafkaConsumer consumer; -private Map offsetSyncs = new HashMap<>(); -private TopicPartition offsetSyncTopicPartition; +private final KafkaConsumer consumer; +private final Map offsetSyncs = new HashMap<>(); +private final TopicPartition offsetSyncTopicPartition; OffsetSyncStore(MirrorConnectorConfig config) { consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java index 20f2ca7e2c..0644d6a6c6 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; class Scheduler implements AutoCloseable { -private static Logger log = LoggerFactory.getLogger(Scheduler.class); +private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); private final String name; private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); @@ -62,11 +62,11 @@ class Scheduler implements AutoCloseable { try { executor.submit(() -> executeThread(task, description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { -log.warn("{} was interrupted running task: {}", name, description); +LOG.warn("{} was interrupted running task: {}", name, description); } catch (TimeoutException e) { -log.error("{} timed out running task: {}", name, description); +LOG.error("{} timed out running task: {}", name, description); } catch (Throwable e) { -log.error("{} caught exception in task: {}", name, description, e); +