[kafka] branch trunk updated: KAFKA-13837; Return an error from Fetch if follower is not a valid replica (#12150)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8efdbce523 KAFKA-13837; Return an error from Fetch if follower is not a valid replica (#12150) 8efdbce523 is described below commit 8efdbce5231f3b5ef61deb827c41b0a8c50aa84a Author: Jason Gustafson AuthorDate: Wed May 18 20:58:20 2022 -0700 KAFKA-13837; Return an error from Fetch if follower is not a valid replica (#12150) When a partition leader receives a `Fetch` request from a replica which is not in the current replica set, the behavior today is to return a successful fetch response, but with empty data. This causes the follower to retry until metadata converges without updating any state on the leader side. It is clearer in this case to return an error, so that the metadata inconsistency is visible in logging and so that the follower backs off before retrying. In this patch, we use `UNKNOWN_LEADER_EPOCH` when the `Fetch` request includes the current leader epoch. The way we see this is that the leader is validating the (replicaId, leaderEpoch) tuple. When the leader returns `UNKNOWN_LEADER_EPOCH`, it means that the leader does not expect the given leaderEpoch from that replica. If the request does not include a leader epoch, then we use `NOT_LEADER_OR_FOLLOWER`. We can take a similar interpretation for this case: the leader is rejecting the [...] As a part of this patch, I have refactored the way that the leader updates follower fetch state. Previously, the process is a little convoluted. We send the fetch from `ReplicaManager` down to `Partition.readRecords`, then we iterate over the results and call `Partition.updateFollowerFetchState`. It is more straightforward to update state directly as a part of `readRecords`. All we need to do is pass through the `FetchParams`. This also prevents an unnecessary copy of the read results. Reviewers: David Jacot --- core/src/main/scala/kafka/cluster/Partition.scala | 234 +++ core/src/main/scala/kafka/log/UnifiedLog.scala | 2 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 17 +- .../main/scala/kafka/server/FetchDataInfo.scala| 13 +- .../main/scala/kafka/server/ReplicaManager.scala | 116 ++ .../kafka/server/DelayedFetchTest.scala| 28 +- .../unit/kafka/cluster/AbstractPartitionTest.scala | 5 +- .../unit/kafka/cluster/PartitionLockTest.scala | 90 +++-- .../scala/unit/kafka/cluster/PartitionTest.scala | 435 + .../FetchRequestDownConversionConfigTest.scala | 145 --- .../kafka/server/ReplicaManagerQuotasTest.scala| 68 +--- .../unit/kafka/server/ReplicaManagerTest.scala | 32 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 74 ++-- .../UpdateFollowerFetchStateBenchmark.java | 13 +- 14 files changed, 760 insertions(+), 512 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9864480e78..61d5f707dc 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -440,8 +440,10 @@ class Partition(val topicPartition: TopicPartition, leaderReplicaIdOpt.filter(_ == localBrokerId) } - private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer], - requireLeader: Boolean): UnifiedLog = { + private def localLogWithEpochOrThrow( +currentLeaderEpoch: Optional[Integer], +requireLeader: Boolean + ): UnifiedLog = { getLocalLog(currentLeaderEpoch, requireLeader) match { case Left(localLog) => localLog case Right(error) => @@ -719,55 +721,51 @@ class Partition(val topicPartition: TopicPartition, * Update the follower's state in the leader based on the last fetch request. See * [[Replica.updateFetchState()]] for details. * - * @return true if the follower's fetch state was updated, false if the followerId is not recognized + * This method is visible for performance testing (see `UpdateFollowerFetchStateBenchmark`) */ - def updateFollowerFetchState(followerId: Int, - followerFetchOffsetMetadata: LogOffsetMetadata, - followerStartOffset: Long, - followerFetchTimeMs: Long, - leaderEndOffset: Long): Boolean = { -getReplica(followerId) match { - case Some(followerReplica) => -// No need to calculate low watermark if there is no delayed DeleteRecordsRequest -val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L -val prevFollowerEndOffset = followerReplica.stateSnapshot.logEndOffset -
[kafka] branch trunk updated: MINOR: Fix typo in ReplicaManagerTest (#12178)
This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b4f35c9ce0 MINOR: Fix typo in ReplicaManagerTest (#12178) b4f35c9ce0 is described below commit b4f35c9ce0ec902123fa6b5ad7c84c63118c7358 Author: bozhao12 <102274736+bozha...@users.noreply.github.com> AuthorDate: Thu May 19 10:28:47 2022 +0800 MINOR: Fix typo in ReplicaManagerTest (#12178) Reviewer: Luke Chen --- core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index aa28ce7269..977da6c69c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -3374,7 +3374,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) try { - // Make the local replica the follower + // Make the local replica the leader val leaderTopicsDelta = topicsCreateDelta(localId, true) val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) @@ -3411,7 +3411,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) try { - // Make the local replica the follower + // Make the local replica the leader val leaderTopicsDelta = topicsCreateDelta(localId, true) val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
[kafka] branch trunk updated (1135f22eaf -> 1802c6dcb5)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 1135f22eaf KAFKA-13830 MetadataVersion integration for KRaft controller (#12050) add 1802c6dcb5 MINOR: Enable KRaft in `TransactionsTest` (#12176) No new revisions were added by this update. Summary of changes: .../integration/kafka/api/TransactionsTest.scala | 178 +++-- 1 file changed, 96 insertions(+), 82 deletions(-)
[kafka] branch trunk updated: MINOR: Replace string literal with constant in RequestChannel (#12134)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new cf34a2e4b0 MINOR: Replace string literal with constant in RequestChannel (#12134) cf34a2e4b0 is described below commit cf34a2e4b083ae0b6fe43bee720d9347c0e18b86 Author: runom AuthorDate: Thu May 19 03:31:15 2022 +0900 MINOR: Replace string literal with constant in RequestChannel (#12134) Replace the "RequestsPerSec" literal value with the pre-existing constant `RequestsPerSec`. Reviewers: Divij Vaidya , Jason Gustafson --- core/src/main/scala/kafka/network/RequestChannel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 07da59c5f2..4fa611206a 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -530,7 +530,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error))) def requestRate(version: Short): Meter = { -requestRateInternal.getAndMaybePut(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) +requestRateInternal.getAndMaybePut(version, newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) } class ErrorMeter(name: String, error: Errors) {
[kafka-site] branch asf-site updated: MINOR: Fix DSL typo in streams docs (#412)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new e3def51b MINOR: Fix DSL typo in streams docs (#412) e3def51b is described below commit e3def51b3d357663e2546d6a5b11c4d6f6144247 Author: Milind Mantri AuthorDate: Wed May 18 22:44:55 2022 +0530 MINOR: Fix DSL typo in streams docs (#412) Reviewers: Guozhang Wang --- 32/streams/developer-guide/dsl-topology-naming.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/32/streams/developer-guide/dsl-topology-naming.html b/32/streams/developer-guide/dsl-topology-naming.html index 9e687f9a..cd11c132 100644 --- a/32/streams/developer-guide/dsl-topology-naming.html +++ b/32/streams/developer-guide/dsl-topology-naming.html @@ -41,7 +41,7 @@ you are required to explicitly name each one. - At the DLS layer, there are operators. A single DSL operator may + At the DSL layer, there are operators. A single DSL operator may compile down to multiple Processors and State Stores, and if required repartition topics. But with the Kafka Streams DSL, all these names are generated for you. There is a relationship between
[kafka] branch trunk updated: MINOR: Enable some AdminClient integration tests (#12110)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 67d00e25e9 MINOR: Enable some AdminClient integration tests (#12110) 67d00e25e9 is described below commit 67d00e25e941f73be8b959c6732ac4db1d1083bf Author: dengziming AuthorDate: Thu May 19 00:39:26 2022 +0800 MINOR: Enable some AdminClient integration tests (#12110) Enable KRaft in `AdminClientWithPoliciesIntegrationTes`t and `PlaintextAdminIntegrationTest`. There are some tests not enabled or not as expected yet: - testNullConfigs, see KAFKA-13863 - testDescribeCluster and testMetadataRefresh, currently we don't get the real controller in KRaft mode so the test may not run as expected This patch also changes the exception type raised from invalid `IncrementalAlterConfig` requests with the `SUBTRACT` and `APPEND` operations. When the configuration value type is not a list, we now raise `INVALID_CONFIG` instead of `INVALID_REQUEST`. Reviewers: Luke Chen , Jason Gustafson --- .../scala/kafka/server/ConfigAdminManager.scala| 4 +- .../AdminClientWithPoliciesIntegrationTest.scala | 16 +- .../kafka/api/PlaintextAdminIntegrationTest.scala | 558 - 3 files changed, 324 insertions(+), 254 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala index e7d6c33ab2..cc7a98179d 100644 --- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala +++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala @@ -494,7 +494,7 @@ object ConfigAdminManager { case OpType.DELETE => configProps.remove(alterConfigOp.configEntry.name) case OpType.APPEND => { if (!listType(alterConfigOp.configEntry.name, configKeys)) -throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}") +throw new InvalidConfigurationException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}") val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name)) .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST))) .getOrElse("") @@ -505,7 +505,7 @@ object ConfigAdminManager { } case OpType.SUBTRACT => { if (!listType(alterConfigOp.configEntry.name, configKeys)) -throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}") +throw new InvalidConfigurationException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}") val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name)) .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST))) .getOrElse("") diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index fb1b0d248d..c9d40cadb0 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -15,18 +15,18 @@ package kafka.api import java.util import java.util.Properties -import java.util.concurrent.ExecutionException import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.server.{Defaults, KafkaConfig} +import kafka.utils.TestUtils.assertFutureExceptionTypeEquals import kafka.utils.{Logging, TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry} import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.policy.AlterConfigPolicy -import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -143,10 +143,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with ).asJava) assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet) -
[kafka] branch trunk updated: MINOR: Remove extraneous code in LocalLogManager (#12168)
This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 62ba4d3d4e MINOR: Remove extraneous code in LocalLogManager (#12168) 62ba4d3d4e is described below commit 62ba4d3d4e42cf65174472f81caf3a18d8d7ed13 Author: David Arthur AuthorDate: Wed May 18 10:58:08 2022 -0400 MINOR: Remove extraneous code in LocalLogManager (#12168) Reviewers: Kvicii , dengziming , Divij Vaidya --- metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java | 5 - 1 file changed, 5 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 3c495adff9..36fe3ec2c1 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -243,11 +243,6 @@ public final class LocalLogManager implements RaftClient, } public synchronized long append(LocalBatch batch) { -try { -throw new RuntimeException("foo"); -} catch (Exception e) { -log.info("WATERMELON: appending {}", batch, e); -} prevOffset += batch.size(); log.debug("append(batch={}, prevOffset={})", batch, prevOffset); batches.put(prevOffset, batch);
[kafka] branch trunk updated (f36de0744b -> a1cd1d1839)
This is an automated email from the ASF dual-hosted git repository. dajac pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from f36de0744b MINOR: Remove redundant metric reset in KafkaController (#12158) add a1cd1d1839 MINOR: Followers should not have any remote replica states left over from previous leadership (#12138) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/cluster/Partition.scala | 52 - .../scala/unit/kafka/cluster/PartitionTest.scala | 86 +- .../server/HighwatermarkPersistenceTest.scala | 3 +- .../unit/kafka/server/IsrExpirationTest.scala | 3 +- .../kafka/server/ReplicaManagerQuotasTest.scala| 3 +- 5 files changed, 120 insertions(+), 27 deletions(-)