[kafka] branch trunk updated: KAFKA-13837; Return an error from Fetch if follower is not a valid replica (#12150)

2022-05-18 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 8efdbce523 KAFKA-13837; Return an error from Fetch if follower is not 
a valid replica (#12150)
8efdbce523 is described below

commit 8efdbce5231f3b5ef61deb827c41b0a8c50aa84a
Author: Jason Gustafson 
AuthorDate: Wed May 18 20:58:20 2022 -0700

KAFKA-13837; Return an error from Fetch if follower is not a valid replica 
(#12150)

When a partition leader receives a `Fetch` request from a replica which is 
not in the current replica set, the behavior today is to return a successful 
fetch response, but with empty data. This causes the follower to retry until 
metadata converges without updating any state on the leader side. It is clearer 
in this case to return an error, so that the metadata inconsistency is visible 
in logging and so that the follower backs off before retrying.

In this patch, we use `UNKNOWN_LEADER_EPOCH` when the `Fetch` request 
includes the current leader epoch. The way we see this is that the leader is 
validating the (replicaId, leaderEpoch) tuple. When the leader returns 
`UNKNOWN_LEADER_EPOCH`, it means that the leader does not expect the given 
leaderEpoch from that replica. If the request does not include a leader epoch, 
then we use `NOT_LEADER_OR_FOLLOWER`. We can take a similar interpretation for 
this case: the leader is rejecting the [...]

As a part of this patch, I have refactored the way that the leader updates 
follower fetch state. Previously, the process is a little convoluted. We send 
the fetch from `ReplicaManager` down to `Partition.readRecords`, then we 
iterate over the results and call `Partition.updateFollowerFetchState`. It is 
more straightforward to update state directly as a part of `readRecords`. All 
we need to do is pass through the `FetchParams`. This also prevents an 
unnecessary copy of the read results.

Reviewers: David Jacot 
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 234 +++
 core/src/main/scala/kafka/log/UnifiedLog.scala |   2 +-
 .../src/main/scala/kafka/server/DelayedFetch.scala |  17 +-
 .../main/scala/kafka/server/FetchDataInfo.scala|  13 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 116 ++
 .../kafka/server/DelayedFetchTest.scala|  28 +-
 .../unit/kafka/cluster/AbstractPartitionTest.scala |   5 +-
 .../unit/kafka/cluster/PartitionLockTest.scala |  90 +++--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 435 +
 .../FetchRequestDownConversionConfigTest.scala | 145 ---
 .../kafka/server/ReplicaManagerQuotasTest.scala|  68 +---
 .../unit/kafka/server/ReplicaManagerTest.scala |  32 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala|  74 ++--
 .../UpdateFollowerFetchStateBenchmark.java |  13 +-
 14 files changed, 760 insertions(+), 512 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 9864480e78..61d5f707dc 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -440,8 +440,10 @@ class Partition(val topicPartition: TopicPartition,
 leaderReplicaIdOpt.filter(_ == localBrokerId)
   }
 
-  private def localLogWithEpochOrException(currentLeaderEpoch: 
Optional[Integer],
-   requireLeader: Boolean): UnifiedLog 
= {
+  private def localLogWithEpochOrThrow(
+currentLeaderEpoch: Optional[Integer],
+requireLeader: Boolean
+  ): UnifiedLog = {
 getLocalLog(currentLeaderEpoch, requireLeader) match {
   case Left(localLog) => localLog
   case Right(error) =>
@@ -719,55 +721,51 @@ class Partition(val topicPartition: TopicPartition,
* Update the follower's state in the leader based on the last fetch 
request. See
* [[Replica.updateFetchState()]] for details.
*
-   * @return true if the follower's fetch state was updated, false if the 
followerId is not recognized
+   * This method is visible for performance testing (see 
`UpdateFollowerFetchStateBenchmark`)
*/
-  def updateFollowerFetchState(followerId: Int,
-   followerFetchOffsetMetadata: LogOffsetMetadata,
-   followerStartOffset: Long,
-   followerFetchTimeMs: Long,
-   leaderEndOffset: Long): Boolean = {
-getReplica(followerId) match {
-  case Some(followerReplica) =>
-// No need to calculate low watermark if there is no delayed 
DeleteRecordsRequest
-val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) 
lowWatermarkIfLeader else -1L
-val prevFollowerEndOffset = followerReplica.stateSnapshot.logEndOffset
-

[kafka] branch trunk updated: MINOR: Fix typo in ReplicaManagerTest (#12178)

2022-05-18 Thread showuon
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new b4f35c9ce0 MINOR: Fix typo in ReplicaManagerTest (#12178)
b4f35c9ce0 is described below

commit b4f35c9ce0ec902123fa6b5ad7c84c63118c7358
Author: bozhao12 <102274736+bozha...@users.noreply.github.com>
AuthorDate: Thu May 19 10:28:47 2022 +0800

MINOR: Fix typo in ReplicaManagerTest (#12178)

Reviewer: Luke Chen 
---
 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index aa28ce7269..977da6c69c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3374,7 +3374,7 @@ class ReplicaManagerTest {
 val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
 
 try {
-  // Make the local replica the follower
+  // Make the local replica the leader
   val leaderTopicsDelta = topicsCreateDelta(localId, true)
   val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
   replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
@@ -3411,7 +3411,7 @@ class ReplicaManagerTest {
 val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
 
 try {
-  // Make the local replica the follower
+  // Make the local replica the leader
   val leaderTopicsDelta = topicsCreateDelta(localId, true)
   val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
   replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)



[kafka] branch trunk updated (1135f22eaf -> 1802c6dcb5)

2022-05-18 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 1135f22eaf KAFKA-13830 MetadataVersion integration for KRaft 
controller (#12050)
 add 1802c6dcb5 MINOR: Enable KRaft in `TransactionsTest` (#12176)

No new revisions were added by this update.

Summary of changes:
 .../integration/kafka/api/TransactionsTest.scala   | 178 +++--
 1 file changed, 96 insertions(+), 82 deletions(-)



[kafka] branch trunk updated: MINOR: Replace string literal with constant in RequestChannel (#12134)

2022-05-18 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new cf34a2e4b0 MINOR: Replace string literal with constant in 
RequestChannel (#12134)
cf34a2e4b0 is described below

commit cf34a2e4b083ae0b6fe43bee720d9347c0e18b86
Author: runom 
AuthorDate: Thu May 19 03:31:15 2022 +0900

MINOR: Replace string literal with constant in RequestChannel (#12134)

Replace the "RequestsPerSec" literal value with the pre-existing constant 
`RequestsPerSec`.

Reviewers: Divij Vaidya , Jason Gustafson 

---
 core/src/main/scala/kafka/network/RequestChannel.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 07da59c5f2..4fa611206a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -530,7 +530,7 @@ class RequestMetrics(name: String) extends 
KafkaMetricsGroup {
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, 
error)))
 
   def requestRate(version: Short): Meter = {
-requestRateInternal.getAndMaybePut(version, newMeter("RequestsPerSec", 
"requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+requestRateInternal.getAndMaybePut(version, newMeter(RequestsPerSec, 
"requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
   }
 
   class ErrorMeter(name: String, error: Errors) {



[kafka-site] branch asf-site updated: MINOR: Fix DSL typo in streams docs (#412)

2022-05-18 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new e3def51b MINOR: Fix DSL typo in streams docs (#412)
e3def51b is described below

commit e3def51b3d357663e2546d6a5b11c4d6f6144247
Author: Milind Mantri 
AuthorDate: Wed May 18 22:44:55 2022 +0530

MINOR: Fix DSL typo in streams docs (#412)

Reviewers: Guozhang Wang 
---
 32/streams/developer-guide/dsl-topology-naming.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/32/streams/developer-guide/dsl-topology-naming.html 
b/32/streams/developer-guide/dsl-topology-naming.html
index 9e687f9a..cd11c132 100644
--- a/32/streams/developer-guide/dsl-topology-naming.html
+++ b/32/streams/developer-guide/dsl-topology-naming.html
@@ -41,7 +41,7 @@
   you are required to explicitly name each one.


-  At the DLS layer, there are operators.  A single DSL 
operator may
+  At the DSL layer, there are operators.  A single DSL 
operator may
   compile down to multiple Processors and 
State Stores, and
   if required repartition topics. But with the 
Kafka Streams
   DSL, all these names are generated for you. There is a 
relationship between



[kafka] branch trunk updated: MINOR: Enable some AdminClient integration tests (#12110)

2022-05-18 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 67d00e25e9 MINOR: Enable some AdminClient integration tests (#12110)
67d00e25e9 is described below

commit 67d00e25e941f73be8b959c6732ac4db1d1083bf
Author: dengziming 
AuthorDate: Thu May 19 00:39:26 2022 +0800

MINOR: Enable some AdminClient integration tests (#12110)

Enable KRaft in `AdminClientWithPoliciesIntegrationTes`t and 
`PlaintextAdminIntegrationTest`. There are some tests not enabled or not as 
expected yet:

- testNullConfigs, see KAFKA-13863
- testDescribeCluster and testMetadataRefresh, currently we don't get the 
real controller in KRaft mode so the test may not run as expected

This patch also changes the exception type raised from invalid 
`IncrementalAlterConfig` requests with the `SUBTRACT` and `APPEND` operations. 
When the configuration value type is not a list, we now raise `INVALID_CONFIG` 
instead of `INVALID_REQUEST`.

Reviewers: Luke Chen , Jason Gustafson 

---
 .../scala/kafka/server/ConfigAdminManager.scala|   4 +-
 .../AdminClientWithPoliciesIntegrationTest.scala   |  16 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 558 -
 3 files changed, 324 insertions(+), 254 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala 
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index e7d6c33ab2..cc7a98179d 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -494,7 +494,7 @@ object ConfigAdminManager {
 case OpType.DELETE => 
configProps.remove(alterConfigOp.configEntry.name)
 case OpType.APPEND => {
   if (!listType(alterConfigOp.configEntry.name, configKeys))
-throw new InvalidRequestException(s"Config value append is not 
allowed for config key: ${alterConfigOp.configEntry.name}")
+throw new InvalidConfigurationException(s"Config value append is 
not allowed for config key: ${alterConfigOp.configEntry.name}")
   val oldValueList = 
Option(configProps.getProperty(alterConfigOp.configEntry.name))
 
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
 ConfigDef.Type.LIST)))
 .getOrElse("")
@@ -505,7 +505,7 @@ object ConfigAdminManager {
 }
 case OpType.SUBTRACT => {
   if (!listType(alterConfigOp.configEntry.name, configKeys))
-throw new InvalidRequestException(s"Config value subtract is not 
allowed for config key: ${alterConfigOp.configEntry.name}")
+throw new InvalidConfigurationException(s"Config value subtract is 
not allowed for config key: ${alterConfigOp.configEntry.name}")
   val oldValueList = 
Option(configProps.getProperty(alterConfigOp.configEntry.name))
 
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
 ConfigDef.Type.LIST)))
 .getOrElse("")
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index fb1b0d248d..c9d40cadb0 100644
--- 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -15,18 +15,18 @@ package kafka.api
 
 import java.util
 import java.util.Properties
-import java.util.concurrent.ExecutionException
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
 import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigsOptions, Config, ConfigEntry}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException, PolicyViolationException}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.policy.AlterConfigPolicy
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, 
assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -143,10 +143,10 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
 ).asJava)
 
 assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
-

[kafka] branch trunk updated: MINOR: Remove extraneous code in LocalLogManager (#12168)

2022-05-18 Thread davidarthur
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 62ba4d3d4e MINOR: Remove extraneous code in LocalLogManager (#12168)
62ba4d3d4e is described below

commit 62ba4d3d4e42cf65174472f81caf3a18d8d7ed13
Author: David Arthur 
AuthorDate: Wed May 18 10:58:08 2022 -0400

MINOR: Remove extraneous code in LocalLogManager (#12168)

Reviewers: Kvicii , dengziming 
, Divij Vaidya 
---
 metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java | 5 -
 1 file changed, 5 deletions(-)

diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 3c495adff9..36fe3ec2c1 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -243,11 +243,6 @@ public final class LocalLogManager implements 
RaftClient,
 }
 
 public synchronized long append(LocalBatch batch) {
-try {
-throw new RuntimeException("foo");
-} catch (Exception e) {
-log.info("WATERMELON: appending {}", batch, e);
-}
 prevOffset += batch.size();
 log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
 batches.put(prevOffset, batch);



[kafka] branch trunk updated (f36de0744b -> a1cd1d1839)

2022-05-18 Thread dajac
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from f36de0744b MINOR: Remove redundant metric reset in KafkaController 
(#12158)
 add a1cd1d1839 MINOR: Followers should not have any remote replica states 
left over from previous leadership (#12138)

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/kafka/cluster/Partition.scala  | 52 -
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 86 +-
 .../server/HighwatermarkPersistenceTest.scala  |  3 +-
 .../unit/kafka/server/IsrExpirationTest.scala  |  3 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala|  3 +-
 5 files changed, 120 insertions(+), 27 deletions(-)