[kafka] branch 2.7 updated: KAFKA-13277; Fix size calculation for tagged string fields in message generator (#11308)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new d4e2273 KAFKA-13277; Fix size calculation for tagged string fields in message generator (#11308) d4e2273 is described below commit d4e227379c9096499788719fdbf2d1a5105d8ee9 Author: Rajini Sivaram AuthorDate: Tue Sep 7 21:02:45 2021 +0100 KAFKA-13277; Fix size calculation for tagged string fields in message generator (#11308) Reviewers: Colin P. McCabe --- .../org/apache/kafka/common/message/MessageTest.java | 16 .../org/apache/kafka/message/MessageDataGenerator.java | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 94e6bc2..9cf18cf 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -1068,6 +1068,22 @@ public final class MessageTest { verifyWriteSucceeds((short) 6, createTopics); } +@Test +public void testLongTaggedString() throws Exception { +char[] chars = new char[1024]; +Arrays.fill(chars, 'a'); +String longString = new String(chars); +SimpleExampleMessageData message = new SimpleExampleMessageData() +.setMyString(longString); +ObjectSerializationCache cache = new ObjectSerializationCache(); +short version = 1; +int size = message.size(cache, version); +ByteBuffer buf = ByteBuffer.allocate(size); +ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(buf); +message.write(byteBufferAccessor, cache, version); +assertEquals(size, buf.position()); +} + private void verifyWriteRaisesNpe(short version, Message message) { ObjectSerializationCache cache = new ObjectSerializationCache(); assertThrows(NullPointerException.class, () -> { diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java index 9dcaa76..54f2364 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java @@ -1530,7 +1530,7 @@ public final class MessageDataGenerator implements MessageClassGenerator { buffer.printf("int _stringPrefixSize = " + "ByteUtils.sizeOfUnsignedVarint(_stringBytes.length + 1);%n"); buffer.printf("_size += _stringBytes.length + _stringPrefixSize + " + - "ByteUtils.sizeOfUnsignedVarint(_stringPrefixSize);%n"); + "ByteUtils.sizeOfUnsignedVarint(_stringPrefixSize + _stringBytes.length);%n"); } else { buffer.printf("_size += _stringBytes.length + " + "ByteUtils.sizeOfUnsignedVarint(_stringBytes.length + 1);%n");
[kafka] branch trunk updated: KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new fe0fe68 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) fe0fe68 is described below commit fe0fe686e92d019ac2b5c8407ab2cbb55ae069e1 Author: Rajini Sivaram AuthorDate: Wed Jul 28 17:27:26 2021 +0100 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) Reviewers: Jason Gustafson --- .../main/scala/kafka/server/ReplicaManager.scala | 7 ++- .../unit/kafka/server/ReplicaManagerTest.scala | 23 ++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 949ff47..a9c99a9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1769,7 +1769,8 @@ class ReplicaManager(val config: KafkaConfig, * records in fetch response. Log start/end offset and high watermark may change not only due to * this fetch request, e.g., rolling new log segment and removing old log segment may move log * start offset further than the last offset in the fetched records. The followers will get the - * updated leader's state in the next fetch response. + * updated leader's state in the next fetch response. If follower has a diverging epoch or if read + * fails with any error, follower fetch state is not updated. */ private def updateFollowerFetchState(followerId: Int, readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { @@ -1778,6 +1779,10 @@ class ReplicaManager(val config: KafkaConfig, debug(s"Skipping update of fetch state for follower $followerId since the " + s"log read returned error ${readResult.error}") readResult + } else if (readResult.divergingEpoch.nonEmpty) { +debug(s"Skipping update of fetch state for follower $followerId since the " + + s"log read returned diverging epoch ${readResult.divergingEpoch}") +readResult } else { onlinePartition(topicPartition) match { case Some(partition) => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1d875ae..d50aa7b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -703,6 +703,29 @@ class ReplicaManagerTest { assertEquals(0L, followerReplica.logStartOffset) assertEquals(0L, followerReplica.logEndOffset) + // Next we receive an invalid request with a higher fetch offset, but a diverging epoch. + // We expect that the replica state does not get updated. + val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, maxFetchBytes, +Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1)) + + replicaManager.fetchMessages( +timeout = 0L, +replicaId = 1, +fetchMinBytes = 1, +fetchMaxBytes = maxFetchBytes, +hardMaxBytesLimit = false, +fetchInfos = Seq(tp -> divergingFetchPartitionData), +topicIds = topicIds.asJava, +quota = UnboundedQuota, +isolationLevel = IsolationLevel.READ_UNCOMMITTED, +responseCallback = callback, +clientMetadata = None + ) + + assertTrue(successfulFetch.isDefined) + assertEquals(0L, followerReplica.logStartOffset) + assertEquals(0L, followerReplica.logEndOffset) + } finally { replicaManager.shutdown(checkpointHW = false) }
[kafka] branch 2.7 updated: KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new c69d5f3 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) c69d5f3 is described below commit c69d5f30f9ecac7e0074b21d9170de4837be6067 Author: Rajini Sivaram AuthorDate: Wed Jul 28 17:27:26 2021 +0100 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) Reviewers: Jason Gustafson --- .../main/scala/kafka/server/ReplicaManager.scala | 7 ++- .../unit/kafka/server/ReplicaManagerTest.scala | 22 ++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1b819d3..7d40110 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1709,7 +1709,8 @@ class ReplicaManager(val config: KafkaConfig, * records in fetch response. Log start/end offset and high watermark may change not only due to * this fetch request, e.g., rolling new log segment and removing old log segment may move log * start offset further than the last offset in the fetched records. The followers will get the - * updated leader's state in the next fetch response. + * updated leader's state in the next fetch response. If follower has a diverging epoch or if read + * fails with any error, follower fetch state is not updated. */ private def updateFollowerFetchState(followerId: Int, readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { @@ -1718,6 +1719,10 @@ class ReplicaManager(val config: KafkaConfig, debug(s"Skipping update of fetch state for follower $followerId since the " + s"log read returned error ${readResult.error}") readResult + } else if (readResult.divergingEpoch.nonEmpty) { +debug(s"Skipping update of fetch state for follower $followerId since the " + + s"log read returned diverging epoch ${readResult.divergingEpoch}") +readResult } else { nonOfflinePartition(topicPartition) match { case Some(partition) => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 6a0f94c..127aff5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -671,6 +671,28 @@ class ReplicaManagerTest { assertEquals(0L, followerReplica.logStartOffset) assertEquals(0L, followerReplica.logEndOffset) + // Next we receive an invalid request with a higher fetch offset, but a diverging epoch. + // We expect that the replica state does not get updated. + val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, maxFetchBytes, +Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1)) + + replicaManager.fetchMessages( +timeout = 0L, +replicaId = 1, +fetchMinBytes = 1, +fetchMaxBytes = maxFetchBytes, +hardMaxBytesLimit = false, +fetchInfos = Seq(tp -> divergingFetchPartitionData), +quota = UnboundedQuota, +isolationLevel = IsolationLevel.READ_UNCOMMITTED, +responseCallback = callback, +clientMetadata = None + ) + + assertTrue(successfulFetch.isDefined) + assertEquals(0L, followerReplica.logStartOffset) + assertEquals(0L, followerReplica.logEndOffset) + } finally { replicaManager.shutdown(checkpointHW = false) }
[kafka] branch 3.0 updated: KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new 047608f KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) 047608f is described below commit 047608fe5e6631139d890ce1ca045052daa0e43c Author: Rajini Sivaram AuthorDate: Wed Jul 28 17:27:26 2021 +0100 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) Reviewers: Jason Gustafson --- .../main/scala/kafka/server/ReplicaManager.scala | 7 ++- .../unit/kafka/server/ReplicaManagerTest.scala | 22 ++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6837d81..f03571c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1750,7 +1750,8 @@ class ReplicaManager(val config: KafkaConfig, * records in fetch response. Log start/end offset and high watermark may change not only due to * this fetch request, e.g., rolling new log segment and removing old log segment may move log * start offset further than the last offset in the fetched records. The followers will get the - * updated leader's state in the next fetch response. + * updated leader's state in the next fetch response. If follower has a diverging epoch or if read + * fails with any error, follower fetch state is not updated. */ private def updateFollowerFetchState(followerId: Int, readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { @@ -1759,6 +1760,10 @@ class ReplicaManager(val config: KafkaConfig, debug(s"Skipping update of fetch state for follower $followerId since the " + s"log read returned error ${readResult.error}") readResult + } else if (readResult.divergingEpoch.nonEmpty) { +debug(s"Skipping update of fetch state for follower $followerId since the " + + s"log read returned diverging epoch ${readResult.divergingEpoch}") +readResult } else { onlinePartition(topicPartition) match { case Some(partition) => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 5e2563e..fd6ba75 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -698,6 +698,28 @@ class ReplicaManagerTest { assertEquals(0L, followerReplica.logStartOffset) assertEquals(0L, followerReplica.logEndOffset) + // Next we receive an invalid request with a higher fetch offset, but a diverging epoch. + // We expect that the replica state does not get updated. + val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, maxFetchBytes, +Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1)) + + replicaManager.fetchMessages( +timeout = 0L, +replicaId = 1, +fetchMinBytes = 1, +fetchMaxBytes = maxFetchBytes, +hardMaxBytesLimit = false, +fetchInfos = Seq(tp -> divergingFetchPartitionData), +quota = UnboundedQuota, +isolationLevel = IsolationLevel.READ_UNCOMMITTED, +responseCallback = callback, +clientMetadata = None + ) + + assertTrue(successfulFetch.isDefined) + assertEquals(0L, followerReplica.logStartOffset) + assertEquals(0L, followerReplica.logEndOffset) + } finally { replicaManager.shutdown(checkpointHW = false) }
[kafka] branch 2.8 updated: KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new 7e4a315 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) 7e4a315 is described below commit 7e4a315963a3d635756183e4c509dcfd82d8359d Author: Rajini Sivaram AuthorDate: Wed Jul 28 17:27:26 2021 +0100 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) Reviewers: Jason Gustafson --- .../main/scala/kafka/server/ReplicaManager.scala | 7 ++- .../unit/kafka/server/ReplicaManagerTest.scala | 22 ++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6e4fc5c..b25516c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1753,7 +1753,8 @@ class ReplicaManager(val config: KafkaConfig, * records in fetch response. Log start/end offset and high watermark may change not only due to * this fetch request, e.g., rolling new log segment and removing old log segment may move log * start offset further than the last offset in the fetched records. The followers will get the - * updated leader's state in the next fetch response. + * updated leader's state in the next fetch response. If follower has a diverging epoch or if read + * fails with any error, follower fetch state is not updated. */ private def updateFollowerFetchState(followerId: Int, readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { @@ -1762,6 +1763,10 @@ class ReplicaManager(val config: KafkaConfig, debug(s"Skipping update of fetch state for follower $followerId since the " + s"log read returned error ${readResult.error}") readResult + } else if (readResult.divergingEpoch.nonEmpty) { +debug(s"Skipping update of fetch state for follower $followerId since the " + + s"log read returned diverging epoch ${readResult.divergingEpoch}") +readResult } else { onlinePartition(topicPartition) match { case Some(partition) => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9b289e5..881430e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -683,6 +683,28 @@ class ReplicaManagerTest { assertEquals(0L, followerReplica.logStartOffset) assertEquals(0L, followerReplica.logEndOffset) + // Next we receive an invalid request with a higher fetch offset, but a diverging epoch. + // We expect that the replica state does not get updated. + val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, maxFetchBytes, +Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1)) + + replicaManager.fetchMessages( +timeout = 0L, +replicaId = 1, +fetchMinBytes = 1, +fetchMaxBytes = maxFetchBytes, +hardMaxBytesLimit = false, +fetchInfos = Seq(tp -> divergingFetchPartitionData), +quota = UnboundedQuota, +isolationLevel = IsolationLevel.READ_UNCOMMITTED, +responseCallback = callback, +clientMetadata = None + ) + + assertTrue(successfulFetch.isDefined) + assertEquals(0L, followerReplica.logStartOffset) + assertEquals(0L, followerReplica.logEndOffset) + } finally { replicaManager.shutdown(checkpointHW = false) }
[kafka] branch trunk updated: KAFKA-10774; Admin API for Describe topic using topic IDs (#9769)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1d22b0d KAFKA-10774; Admin API for Describe topic using topic IDs (#9769) 1d22b0d is described below commit 1d22b0d70686aef5689b775ea2ea7610a37f3e8c Author: dengziming AuthorDate: Sat Aug 28 16:00:36 2021 +0800 KAFKA-10774; Admin API for Describe topic using topic IDs (#9769) Reviewers: Justine Olshan , Chia-Ping Tsai , Satish Duggana , Rajini Sivaram --- .../java/org/apache/kafka/clients/admin/Admin.java | 44 +-- .../kafka/clients/admin/DeleteTopicsResult.java| 2 +- .../kafka/clients/admin/DescribeTopicsResult.java | 97 +-- .../kafka/clients/admin/KafkaAdminClient.java | 127 .../kafka/clients/admin/TopicDescription.java | 12 +- .../apache/kafka/clients/admin/TopicListing.java | 28 - .../main/java/org/apache/kafka/common/Cluster.java | 8 ++ .../kafka/common/requests/MetadataRequest.java | 40 ++- .../kafka/common/requests/MetadataResponse.java| 19 +++ .../resources/common/message/MetadataRequest.json | 3 +- .../resources/common/message/MetadataResponse.json | 5 +- .../kafka/clients/admin/AdminClientTestUtils.java | 7 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 4 +- .../kafka/clients/admin/MockAdminClient.java | 100 +--- .../ClientAuthenticationFailureTest.java | 2 +- .../connect/mirror/MirrorSourceConnector.java | 2 +- .../org/apache/kafka/connect/util/TopicAdmin.java | 2 +- .../apache/kafka/connect/util/TopicAdminTest.java | 2 +- .../util/clusters/EmbeddedKafkaCluster.java| 2 +- .../scala/kafka/admin/ConsumerGroupCommand.scala | 4 +- .../kafka/admin/ReassignPartitionsCommand.scala| 4 +- core/src/main/scala/kafka/admin/TopicCommand.scala | 133 +++-- core/src/main/scala/kafka/server/KafkaApis.scala | 36 +- .../main/scala/kafka/server/MetadataCache.scala| 4 + .../kafka/server/metadata/KRaftMetadataCache.scala | 4 + .../kafka/server/metadata/ZkMetadataCache.scala| 8 ++ .../kafka/tools/ReplicaVerificationTool.scala | 2 +- .../kafka/api/BaseAdminIntegrationTest.scala | 4 +- .../api/DescribeAuthorizedOperationsTest.scala | 6 +- .../kafka/api/EndToEndAuthorizationTest.scala | 8 +- .../kafka/api/PlaintextAdminIntegrationTest.scala | 24 +++- .../SaslClientsWithInvalidCredentialsTest.scala| 2 +- .../server/DynamicBrokerReconfigurationTest.scala | 2 +- .../kafka/server/KRaftClusterTest.scala| 2 +- .../MetadataRequestBetweenDifferentIbpTest.scala | 96 +++ .../scala/unit/kafka/admin/DeleteTopicTest.scala | 2 +- .../kafka/admin/TopicCommandIntegrationTest.scala | 18 +-- .../scala/unit/kafka/server/KafkaApisTest.scala| 111 + .../test/scala/unit/kafka/utils/TestUtils.scala| 8 +- .../TopicBasedRemoteLogMetadataManager.java| 2 +- .../processor/internals/InternalTopicManager.java | 4 +- .../KStreamRepartitionIntegrationTest.java | 2 +- ...bleJoinTopologyOptimizationIntegrationTest.java | 2 +- .../internals/InternalTopicManagerTest.java| 8 +- .../kafka/tools/ClientCompatibilityTest.java | 2 +- .../apache/kafka/tools/TransactionsCommand.java| 2 +- .../kafka/tools/TransactionsCommandTest.java | 2 +- .../apache/kafka/trogdor/common/WorkerUtils.java | 4 +- .../kafka/trogdor/common/WorkerUtilsTest.java | 6 +- 49 files changed, 846 insertions(+), 172 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 54d103b..4b6fe49 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -17,14 +17,6 @@ package org.apache.kafka.clients.admin; -import java.time.Duration; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; - import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.KafkaFuture; @@ -42,6 +34,14 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.LeaveGroupResponse; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + /** * The administrative client for Kafka, which supports managing and inspecting topics, brokers
[kafka] branch 3.1 updated: KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new de73bea KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563) de73bea is described below commit de73bea58ee6afb466eef4b375f8533aca97494f Author: Rajini Sivaram AuthorDate: Thu Dec 2 22:10:37 2021 + KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563) If JAAS configuration does not contain a Client section for ZK clients, an auth failure event is generated. If this occurs after the connection is setup in the controller, we schedule reinitialize(), which causes controller to resign. In the case where SASL is not mandatory and the connection is alive, controller maintains the current session and doesn't register its watchers, leaving it in a bad state. Reviewers: Jun Rao --- core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala | 4 ++-- .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 11 ++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 091b401..bc634a8 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -433,14 +433,14 @@ class ZooKeeperClient(connectString: String, isConnectedOrExpiredCondition.signalAll() } if (state == KeeperState.AuthFailed) { -error("Auth failed.") +error(s"Auth failed, initialized=$isFirstConnectionEstablished connectionState=$connectionState") stateChangeHandlers.values.foreach(_.onAuthFailure()) // If this is during initial startup, we fail fast. Otherwise, schedule retry. val initialized = inLock(isConnectedOrExpiredLock) { isFirstConnectionEstablished } -if (initialized) +if (initialized && !connectionState.isAlive) scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) } else if (state == KeeperState.Expired) { scheduleReinitialize("session-expired", "Session expired.", delayMs = 0L) diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index 649f3c5..5af2ba8 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -649,9 +649,18 @@ class ZooKeeperClientTest extends QuorumTestHarness { } zooKeeperClient.close() -zooKeeperClient = newZooKeeperClient() +@volatile var connectionStateOverride: Option[States] = None +zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, + zkMaxInFlightRequests, time, "testMetricGroup", "testMetricType", new ZKClientConfig, "ZooKeeperClientTest") { + override def connectionState: States = connectionStateOverride.getOrElse(super.connectionState) +} zooKeeperClient.registerStateChangeHandler(changeHandler) +connectionStateOverride = Some(States.CONNECTED) +zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null)) +assertFalse(sessionInitializedCountDownLatch.await(10, TimeUnit.MILLISECONDS), "Unexpected session initialization when connection is alive") + +connectionStateOverride = Some(States.AUTH_FAILED) zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null)) assertTrue(sessionInitializedCountDownLatch.await(5, TimeUnit.SECONDS), "Failed to receive session initializing notification") }
[kafka] branch trunk updated (62f73c3 -> da56146)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 62f73c3 KAFKA-13498: track position in remaining state stores (#11541) add da56146 KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala | 4 ++-- .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 11 ++- 2 files changed, 12 insertions(+), 3 deletions(-)
[kafka] branch 3.0 updated: KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new ac923b0 KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563) ac923b0 is described below commit ac923b0611817c9400e98cac5e0e19c18833de56 Author: Rajini Sivaram AuthorDate: Thu Dec 2 22:10:37 2021 + KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563) If JAAS configuration does not contain a Client section for ZK clients, an auth failure event is generated. If this occurs after the connection is setup in the controller, we schedule reinitialize(), which causes controller to resign. In the case where SASL is not mandatory and the connection is alive, controller maintains the current session and doesn't register its watchers, leaving it in a bad state. Reviewers: Jun Rao --- core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala | 4 ++-- .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 11 ++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 091b401..bc634a8 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -433,14 +433,14 @@ class ZooKeeperClient(connectString: String, isConnectedOrExpiredCondition.signalAll() } if (state == KeeperState.AuthFailed) { -error("Auth failed.") +error(s"Auth failed, initialized=$isFirstConnectionEstablished connectionState=$connectionState") stateChangeHandlers.values.foreach(_.onAuthFailure()) // If this is during initial startup, we fail fast. Otherwise, schedule retry. val initialized = inLock(isConnectedOrExpiredLock) { isFirstConnectionEstablished } -if (initialized) +if (initialized && !connectionState.isAlive) scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) } else if (state == KeeperState.Expired) { scheduleReinitialize("session-expired", "Session expired.", delayMs = 0L) diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index a0eb1ea..37954e6 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -648,9 +648,18 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { } zooKeeperClient.close() -zooKeeperClient = newZooKeeperClient() +@volatile var connectionStateOverride: Option[States] = None +zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, + zkMaxInFlightRequests, time, "testMetricGroup", "testMetricType", new ZKClientConfig, "ZooKeeperClientTest") { + override def connectionState: States = connectionStateOverride.getOrElse(super.connectionState) +} zooKeeperClient.registerStateChangeHandler(changeHandler) +connectionStateOverride = Some(States.CONNECTED) +zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null)) +assertFalse(sessionInitializedCountDownLatch.await(10, TimeUnit.MILLISECONDS), "Unexpected session initialization when connection is alive") + +connectionStateOverride = Some(States.AUTH_FAILED) zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null)) assertTrue(sessionInitializedCountDownLatch.await(5, TimeUnit.SECONDS), "Failed to receive session initializing notification") }
[kafka] branch trunk updated (acd1f9c -> 065fba9)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from acd1f9c KAFKA-13522: add position tracking and bounding to IQv2 (#11581) add 065fba9 KAFKA-13539: Improve propagation and processing of SSL handshake failures (#11597) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/common/network/Selector.java | 2 +- .../kafka/common/network/SslTransportLayer.java| 59 ++ .../kafka/common/network/NetworkTestUtils.java | 8 ++- .../common/network/SslTransportLayerTest.java | 18 ++- 4 files changed, 74 insertions(+), 13 deletions(-)
[kafka] branch trunk updated (b4602e8 -> 0e150a4)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from b4602e8 MINOR: Timeout waitForBlock in connect BlockingConnectorTest (#11595) add 0e150a4 MINOR: Reset java.security.auth.login.config in ZK-tests to avoid config reload affecting subsequent tests (#11602) No new revisions were added by this update. Summary of changes: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala | 1 + 1 file changed, 1 insertion(+)
[kafka] branch trunk updated (c807980 -> 8ed271e)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from c807980 MINOR: Update `./gradlew allDepInsight` example in README (#11125) add 8ed271e KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002) No new revisions were added by this update. Summary of changes: .../kafka/clients/producer/ProducerConfig.java | 2 +- .../kafka/clients/producer/KafkaProducerTest.java | 83 ++ .../sanity_checks/test_verifiable_producer.py | 8 ++- 3 files changed, 90 insertions(+), 3 deletions(-)
[kafka] branch 3.0 updated: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new 5fde508 KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002) 5fde508 is described below commit 5fde508731e000f528b7995de5f37602639d84df Author: Cheng Tan <31675100+d8tlt...@users.noreply.github.com> AuthorDate: Mon Jul 26 13:45:59 2021 -0700 KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002) Reviewers: Rajini Sivaram --- .../kafka/clients/producer/ProducerConfig.java | 2 +- .../kafka/clients/producer/KafkaProducerTest.java | 83 ++ .../sanity_checks/test_verifiable_producer.py | 8 ++- 3 files changed, 90 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 970a70b..0492fbf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -286,7 +286,7 @@ public class ProducerConfig extends AbstractConfig { Type.STRING, "all", in("all", "-1", "0", "1"), -Importance.HIGH, +Importance.LOW, ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index f055e12..2784f19 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -173,6 +173,89 @@ public class KafkaProducerTest { } @Test +public void testAcksAndIdempotenceForIdempotentProducers() { +Properties baseProps = new Properties() {{ +setProperty( +ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +setProperty( +ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); +setProperty( +ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); +}}; + +Properties validProps = new Properties() {{ +putAll(baseProps); +setProperty(ProducerConfig.ACKS_CONFIG, "0"); +setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); +}}; +ProducerConfig config = new ProducerConfig(validProps); +assertFalse( +config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), +"idempotence should be overwritten"); +assertEquals( +"0", +config.getString(ProducerConfig.ACKS_CONFIG), +"acks should be overwritten"); + +Properties validProps2 = new Properties() {{ +putAll(baseProps); +setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); +}}; +config = new ProducerConfig(validProps2); +assertTrue( +config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), +"idempotence should be set with the default value"); +assertEquals( +"-1", +config.getString(ProducerConfig.ACKS_CONFIG), +"acks should be set with the default value"); + +Properties validProps3 = new Properties() {{ +putAll(baseProps); +setProperty(ProducerConfig.ACKS_CONFIG, "all"); +setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); +}}; +config = new ProducerConfig(validProps3); + assertFalse(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), +"idempotence should be overwritten"); +assertEquals( +"-1", +config.getString(ProducerConfig.ACKS_CONFIG), +"acks should be overwritten"); + +Properties invalidProps = new Properties() {{ +putAll(baseProps); +
[kafka] branch trunk updated: MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0b9a8ba MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653) 0b9a8ba is described below commit 0b9a8bac36f16b5397e9ec3a0441758e4b60a384 Author: Lucas Bradstreet AuthorDate: Thu Jan 6 01:22:19 2022 -0800 MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653) Reviewers: Rajini Sivaram --- core/src/main/scala/kafka/server/ClientQuotaManager.scala | 12 +++- core/src/main/scala/kafka/server/DelayedOperation.scala | 11 +-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 5860cfc..7334519 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -562,8 +562,18 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, quotaMetricTags.asJava) } + def initiateShutdown(): Unit = { +throttledChannelReaper.initiateShutdown() +// improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by sending a no-op +delayQueue.add(new ThrottledChannel(time, 0, new ThrottleCallback { + override def startThrottling(): Unit = {} + override def endThrottling(): Unit = {} +})) + } + def shutdown(): Unit = { -throttledChannelReaper.shutdown() +initiateShutdown() +throttledChannelReaper.awaitShutdown() } class DefaultQuotaCallback extends ClientQuotaCallback { diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 251dd28..1151e65 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -328,8 +328,15 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri * Shutdown the expire reaper thread */ def shutdown(): Unit = { -if (reaperEnabled) - expirationReaper.shutdown() +if (reaperEnabled) { + expirationReaper.initiateShutdown() + // improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by sending a no-op + timeoutTimer.add(new TimerTask { +override val delayMs: Long = 0 +override def run(): Unit = {} + }) + expirationReaper.awaitShutdown() +} timeoutTimer.shutdown() removeMetric("PurgatorySize", metricsTags) removeMetric("NumDelayedOperations", metricsTags)
[kafka] branch trunk updated (8e205b5 -> bb60eb8)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 8e205b5 KAFKA-13719: Fix connector restart cause duplicate tasks (#11869) add bb60eb8 MINOR: Increase wait in ZooKeeperClientTest (#11973) No new revisions were added by this update. Summary of changes: core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[kafka] branch trunk updated: [KAFKA-15117] In TestSslUtils set SubjectAlternativeNames to null if there are no hostnames (#14440)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d8f358facc2 [KAFKA-15117] In TestSslUtils set SubjectAlternativeNames to null if there are no hostnames (#14440) d8f358facc2 is described below commit d8f358facc2a5405d08977f922bc0b1dae8f114e Author: Purshotam Chauhan AuthorDate: Mon Sep 25 22:13:01 2023 +0530 [KAFKA-15117] In TestSslUtils set SubjectAlternativeNames to null if there are no hostnames (#14440) We are currently encoding an empty hostNames array to subjectAltName in the keystore. While parsing the certificates in the test this causes the issue - Unparseable SubjectAlternativeName extension due to java.io.IOException: No data available in passed DER encoded value. Up to Java 17, this parsing error was ignored. This PR assigns subjectAltName to null if hostnames are empty. Co-authored-by: Ismael Juma Reviewers: Rajini Sivaram --- .../apache/kafka/common/network/SslTransportLayerTest.java | 3 --- .../src/test/java/org/apache/kafka/test/TestSslUtils.java| 12 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index f49bf868a46..26987e30da8 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -36,8 +36,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.condition.DisabledOnJre; -import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -195,7 +193,6 @@ public class SslTransportLayerTest { */ @ParameterizedTest @ArgumentsSource(SslTransportLayerArgumentsProvider.class) -@DisabledOnJre(value = {JRE.JAVA_20, JRE.JAVA_21}, disabledReason = "KAFKA-15117") public void testValidEndpointIdentificationCN(Args args) throws Exception { args.serverCertStores = certBuilder(true, "localhost", args.useInlinePem).build(); args.clientCertStores = certBuilder(false, "localhost", args.useInlinePem).build(); diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 6b7c16b0335..1181fc2 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -399,10 +399,14 @@ public class TestSslUtils { } public CertificateBuilder sanDnsNames(String... hostNames) throws IOException { -GeneralName[] altNames = new GeneralName[hostNames.length]; -for (int i = 0; i < hostNames.length; i++) -altNames[i] = new GeneralName(GeneralName.dNSName, hostNames[i]); -subjectAltName = GeneralNames.getInstance(new DERSequence(altNames)).getEncoded(); +if (hostNames.length > 0) { +GeneralName[] altNames = new GeneralName[hostNames.length]; +for (int i = 0; i < hostNames.length; i++) +altNames[i] = new GeneralName(GeneralName.dNSName, hostNames[i]); +subjectAltName = GeneralNames.getInstance(new DERSequence(altNames)).getEncoded(); +} else { +subjectAltName = null; +} return this; }
[kafka] branch trunk updated: KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … (#14457)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dedfed06f7a KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … (#14457) dedfed06f7a is described below commit dedfed06f7a472424080456c997f5200c6bef196 Author: chern AuthorDate: Thu Sep 28 06:14:42 2023 -0700 KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … (#14457) When a fetch response has no record for a partition, validBytes is 0. We shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch instead. Reviewers: Divij Vaidya , Viktor Somogyi-Vass , Kamal Chandraprakash, Rajini Sivaram --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 5 +++-- core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 1dafb89ef0a..450fcfea461 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -364,10 +364,11 @@ abstract class AbstractFetcherThread(name: String, // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) { + val lastFetchedEpoch = +if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch.asScala else currentFetchState.lastFetchedEpoch // Update partitionStates only if there is no exception during processPartitionData val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag), -currentFetchState.currentLeaderEpoch, state = Fetching, -logAppendInfo.lastLeaderEpoch.asScala) +currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) if (validBytes > 0) fetcherStats.byteRate.mark(validBytes) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index e58532622e3..6a0feaa6456 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -749,9 +749,10 @@ class ReplicaFetcherThreadTest { val log: UnifiedLog = mock(classOf[UnifiedLog]) val partition: Partition = mock(classOf[Partition]) val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) +val lastFetchedEpoch = 2 when(log.highWatermark).thenReturn(0) -when(log.latestEpoch).thenReturn(Some(0)) +when(log.latestEpoch).thenReturn(Some(lastFetchedEpoch)) when(log.endOffsetForEpoch(0)).thenReturn(Some(new OffsetAndEpoch(0, 0))) when(log.logEndOffset).thenReturn(0) when(log.maybeUpdateHighWatermark(0)).thenReturn(None) @@ -835,6 +836,7 @@ class ReplicaFetcherThreadTest { // Lag is set to Some(0). assertEquals(Some(0), thread.fetchState(t1p0).flatMap(_.lag)) +assertEquals(Some(lastFetchedEpoch), thread.fetchState(t1p0).flatMap(_.lastFetchedEpoch)) } @Test
[kafka] branch 3.3 updated: KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 66000787c1 KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964) 66000787c1 is described below commit 66000787c1146c6d08d88b9c564f5a000608f013 Author: Sanjana Kaundinya AuthorDate: Thu Jul 14 05:47:34 2022 -0700 KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964) This implements the AdminAPI portion of KIP-709: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The request/response protocol changes were implemented in 3.0.0. A new batched API has been introduced to list consumer offsets for different groups. For brokers older than 3.0.0, separate requests are sent for each group. Co-authored-by: Rajini Sivaram Co-authored-by: David Jacot Reviewers: David Jacot , Rajini Sivaram --- .../java/org/apache/kafka/clients/admin/Admin.java | 36 ++- .../kafka/clients/admin/KafkaAdminClient.java | 11 +- .../admin/ListConsumerGroupOffsetsOptions.java | 14 +- .../admin/ListConsumerGroupOffsetsResult.java | 56 +++- .../admin/ListConsumerGroupOffsetsSpec.java| 79 ++ .../clients/admin/internals/AdminApiDriver.java| 3 +- .../admin/internals/CoordinatorStrategy.java | 4 + .../internals/ListConsumerGroupOffsetsHandler.java | 128 + .../kafka/common/requests/OffsetFetchResponse.java | 10 +- .../kafka/clients/admin/AdminClientTestUtils.java | 12 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 206 -- .../kafka/clients/admin/MockAdminClient.java | 16 +- .../ListConsumerGroupOffsetsHandlerTest.java | 308 +++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 6 +- .../internals/ConsumerCoordinatorTest.java | 26 +- .../scala/kafka/admin/ConsumerGroupCommand.scala | 8 +- .../kafka/admin/ConsumerGroupServiceTest.scala | 22 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- .../processor/internals/StoreChangelogReader.java | 12 +- .../internals/StoreChangelogReaderTest.java| 11 +- 20 files changed, 813 insertions(+), 157 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index fdacc09db8..0698d29702 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; import java.time.Duration; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable { * @param options The options to use when listing the consumer group offsets. * @return The ListGroupOffsetsResult */ -ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options); +default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { +ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions() +.requireStable(options.requireStable()); +@SuppressWarnings("deprecation") +ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() +.topicPartitions(options.topicPartitions()); +return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions); +} /** * List the consumer group offsets available in the cluster with the default options. * - * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options. + * This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} + * to list offsets of all partitions of one group with default options. * * @return The ListGroupOffsetsResult. */ @@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable { return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions()); } +/** + * List the consumer group offsets available in the cluster for the specified consumer groups. + * + * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for. + * + * @param options The options to use when listing the consumer group offsets. + * @return The ListConsumerGroupOffsetsResult + */ +ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map
[kafka] branch trunk updated: KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new beac86f049 KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964) beac86f049 is described below commit beac86f049385932309158c1cb49c8657e53f45f Author: Sanjana Kaundinya AuthorDate: Thu Jul 14 05:47:34 2022 -0700 KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964) This implements the AdminAPI portion of KIP-709: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The request/response protocol changes were implemented in 3.0.0. A new batched API has been introduced to list consumer offsets for different groups. For brokers older than 3.0.0, separate requests are sent for each group. Co-authored-by: Rajini Sivaram Co-authored-by: David Jacot Reviewers: David Jacot , Rajini Sivaram --- .../java/org/apache/kafka/clients/admin/Admin.java | 36 ++- .../kafka/clients/admin/KafkaAdminClient.java | 11 +- .../admin/ListConsumerGroupOffsetsOptions.java | 14 +- .../admin/ListConsumerGroupOffsetsResult.java | 56 +++- .../admin/ListConsumerGroupOffsetsSpec.java| 79 ++ .../clients/admin/internals/AdminApiDriver.java| 3 +- .../admin/internals/CoordinatorStrategy.java | 4 + .../internals/ListConsumerGroupOffsetsHandler.java | 128 + .../kafka/common/requests/OffsetFetchResponse.java | 10 +- .../kafka/clients/admin/AdminClientTestUtils.java | 12 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 206 -- .../kafka/clients/admin/MockAdminClient.java | 16 +- .../ListConsumerGroupOffsetsHandlerTest.java | 308 +++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 6 +- .../internals/ConsumerCoordinatorTest.java | 26 +- .../scala/kafka/admin/ConsumerGroupCommand.scala | 8 +- .../kafka/admin/ConsumerGroupServiceTest.scala | 22 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- .../processor/internals/StoreChangelogReader.java | 12 +- .../internals/StoreChangelogReaderTest.java| 11 +- 20 files changed, 813 insertions(+), 157 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index fdacc09db8..0698d29702 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; import java.time.Duration; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable { * @param options The options to use when listing the consumer group offsets. * @return The ListGroupOffsetsResult */ -ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options); +default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { +ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions() +.requireStable(options.requireStable()); +@SuppressWarnings("deprecation") +ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() +.topicPartitions(options.topicPartitions()); +return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions); +} /** * List the consumer group offsets available in the cluster with the default options. * - * This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options. + * This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} + * to list offsets of all partitions of one group with default options. * * @return The ListGroupOffsetsResult. */ @@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable { return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions()); } +/** + * List the consumer group offsets available in the cluster for the specified consumer groups. + * + * @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for. + * + * @param options The options to use when listing the consumer group offsets. + * @return The ListConsumerGroupOffsetsResult + */ +ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map
[kafka] branch trunk updated: MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ddbc030036 MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406) ddbc030036 is described below commit ddbc0300365dd1d9a2fb2c73faef8c4cbec0b316 Author: Rajini Sivaram AuthorDate: Fri Jul 15 09:21:35 2022 +0100 MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406) Reviewers: David Jacot --- .../java/org/apache/kafka/clients/admin/Admin.java | 7 +-- .../kafka/clients/admin/KafkaAdminClientTest.java | 52 +++--- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 0698d29702..1d469a6643 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -921,12 +921,13 @@ public interface Admin extends AutoCloseable { * @return The ListGroupOffsetsResult */ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { -ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions() -.requireStable(options.requireStable()); @SuppressWarnings("deprecation") ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() .topicPartitions(options.topicPartitions()); -return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions); + +// We can use the provided options with the batched API, which uses topic partitions from +// the group spec and ignores any topic partitions set in the options. +return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), options); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 3d285a45f7..de57813679 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -131,6 +131,8 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection; import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; import org.apache.kafka.common.message.UnregisterBrokerResponseData; import org.apache.kafka.common.message.WriteTxnMarkersResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -3075,7 +3077,17 @@ public class KafkaAdminClientTest { } @Test -public void testListConsumerGroupOffsetsOptions() throws Exception { +public void testListConsumerGroupOffsetsOptionsWithUnbatchedApi() throws Exception { +verifyListConsumerGroupOffsetsOptions(false); +} + +@Test +public void testListConsumerGroupOffsetsOptionsWithBatchedApi() throws Exception { +verifyListConsumerGroupOffsetsOptions(true); +} + +@SuppressWarnings("deprecation") +private void verifyListConsumerGroupOffsetsOptions(boolean batchedApi) throws Exception { final Cluster cluster = mockCluster(3, 0); final Time time = new MockTime(); @@ -3085,24 +3097,32 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); -final TopicPartition tp1 = new TopicPartition("A", 0); +final List partitions = Collections.singletonList(new TopicPartition("A", 0)); final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions() -.requireStable(true); -final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() -.topicPartitions(Collections.singletonList(tp1)); - env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options); +.requireStable(true) +.timeoutMs(300); +if (batchedApi) { +final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() +.topicPartitions(partit
[kafka] branch 3.3 updated: MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 610780668e MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406) 610780668e is described below commit 610780668efa7c1e8d1be193985eb6e4d971fa0a Author: Rajini Sivaram AuthorDate: Fri Jul 15 09:21:35 2022 +0100 MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406) Reviewers: David Jacot --- .../java/org/apache/kafka/clients/admin/Admin.java | 7 +-- .../kafka/clients/admin/KafkaAdminClientTest.java | 52 +++--- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 0698d29702..1d469a6643 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -921,12 +921,13 @@ public interface Admin extends AutoCloseable { * @return The ListGroupOffsetsResult */ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { -ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions() -.requireStable(options.requireStable()); @SuppressWarnings("deprecation") ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() .topicPartitions(options.topicPartitions()); -return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions); + +// We can use the provided options with the batched API, which uses topic partitions from +// the group spec and ignores any topic partitions set in the options. +return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), options); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 3d285a45f7..de57813679 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -131,6 +131,8 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection; import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; import org.apache.kafka.common.message.UnregisterBrokerResponseData; import org.apache.kafka.common.message.WriteTxnMarkersResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -3075,7 +3077,17 @@ public class KafkaAdminClientTest { } @Test -public void testListConsumerGroupOffsetsOptions() throws Exception { +public void testListConsumerGroupOffsetsOptionsWithUnbatchedApi() throws Exception { +verifyListConsumerGroupOffsetsOptions(false); +} + +@Test +public void testListConsumerGroupOffsetsOptionsWithBatchedApi() throws Exception { +verifyListConsumerGroupOffsetsOptions(true); +} + +@SuppressWarnings("deprecation") +private void verifyListConsumerGroupOffsetsOptions(boolean batchedApi) throws Exception { final Cluster cluster = mockCluster(3, 0); final Time time = new MockTime(); @@ -3085,24 +3097,32 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); -final TopicPartition tp1 = new TopicPartition("A", 0); +final List partitions = Collections.singletonList(new TopicPartition("A", 0)); final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions() -.requireStable(true); -final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() -.topicPartitions(Collections.singletonList(tp1)); - env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options); +.requireStable(true) +.timeoutMs(300); +if (batchedApi) { +final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() +.topicPartitions(partit
[kafka] branch trunk updated: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new eeb1e702eb KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131) eeb1e702eb is described below commit eeb1e702eb7a43d88f11458f739672e2b7aa4871 Author: chern AuthorDate: Tue May 10 03:36:42 2022 -0700 KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131) When a client connects to a SSL listener using PLAINTEXT security protocol, after the TCP connection is setup, the client considers the channel setup is complete. In reality the channel setup is not complete yet. The client then resets reconnect exponential backoff and issues API version request. Since the broker expects SSL handshake, the API version request will cause the connection to disconnect. Client reconnects without exponential backoff since it has been reset. This commit removes the reset of reconnect exponential backoff when sending API version request. In the good case where the channel setup is complete, reconnect exponential backoff will be reset when the node becomes ready, which is after getting the API version response. Inter-broker clients which do not send API version request and go directly to ready state continue to reset backoff before any successful requests. Reviewers: Rajini Sivaram --- .../kafka/clients/ClusterConnectionStates.java | 1 - .../kafka/clients/ClusterConnectionStatesTest.java | 38 ++ .../apache/kafka/clients/NetworkClientTest.java| 31 ++ 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 95efdbeae4..f4d9092258 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -246,7 +246,6 @@ final class ClusterConnectionStates { public void checkingApiVersions(String id) { NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.CHECKING_API_VERSIONS; -resetReconnectBackoff(nodeState); resetConnectionSetupTimeout(nodeState); connectingNodes.remove(id); } diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java index 72cc123921..96fe89ca11 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java @@ -231,20 +231,8 @@ public class ClusterConnectionStatesTest { @Test public void testExponentialReconnectBackoff() { -double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1)) -/ Math.log(reconnectBackoffExpBase); - -// Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt -for (int i = 0; i < 10; i++) { -connectionStates.connecting(nodeId1, time.milliseconds(), "localhost"); -connectionStates.disconnected(nodeId1, time.milliseconds()); -// Calculate expected backoff value without jitter -long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp)) -* reconnectBackoffMs); -long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); -assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff); -time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1); -} +verifyReconnectExponentialBackoff(false); +verifyReconnectExponentialBackoff(true); } @Test @@ -426,4 +414,26 @@ public class ClusterConnectionStatesTest { this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(), this.multipleIPHostResolver); } + +private void verifyReconnectExponentialBackoff(boolean enterCheckingApiVersionState) { +double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1)) +/ Math.log(reconnectBackoffExpBase); + +connectionStates.remove(nodeId1); +// Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt +for (int i =
[kafka] branch trunk updated: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle. (#12416)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d529d86aa4b KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle. (#12416) d529d86aa4b is described below commit d529d86aa4be533d1251cfc0b4c0fb57c69ace72 Author: Badai Aqrandista AuthorDate: Mon Aug 15 21:34:03 2022 +1000 KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle. (#12416) Ensures that SSL buffered data is processed by server immediately on the next poll when channel is unmuted after processing previous request. Poll timeout is reset to zero for this case to avoid 300ms delay in poll() if no new data arrives on the sockets. Reviewers: David Mao , Ismael Juma , Rajini Sivaram --- .../org/apache/kafka/common/network/Selector.java | 1 + .../unit/kafka/network/SocketServerTest.scala | 44 +- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index bd1175a8ee0..2e581187625 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -757,6 +757,7 @@ public class Selector implements Selectable, AutoCloseable { explicitlyMutedChannels.remove(channel); if (channel.hasBytesBuffered()) { keysWithBufferedRead.add(channel.selectionKey()); +madeReadProgressLastPoll = true; } } } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 98f92d61ff2..801a2d83cab 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -1878,6 +1878,44 @@ class SocketServerTest { }, false) } + /** + * Test to ensure "Selector.poll()" does not block at "select(timeout)" when there is no data in the socket but there + * is data in the buffer. This only happens when SSL protocol is used. + */ + @Test + def testLatencyWithBufferedDataAndNoSocketData(): Unit = { +shutdownServerAndMetrics(server) + +props ++= sslServerProps +val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props)) +testableServer.enableRequestProcessing(Map.empty) +val testableSelector = testableServer.testableSelector +val proxyServer = new ProxyServer(testableServer) +val selectTimeoutMs = 5000 +// set pollTimeoutOverride to "selectTimeoutMs" to ensure poll() timeout is distinct and can be identified +testableSelector.pollTimeoutOverride = Some(selectTimeoutMs) + +try { + // initiate SSL connection by sending 1 request via socket, then send 2 requests directly into the netReadBuffer + val (sslSocket, req1) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer) + + // force all data to be transferred to the kafka broker by closing the client connection to proxy server + sslSocket.close() + TestUtils.waitUntilTrue(() => proxyServer.clientConnSocket.isClosed, "proxyServer.clientConnSocket is still not closed after 6 ms", 6) + + // process the request and send the response + processRequest(testableServer.dataPlaneRequestChannel, req1) + + // process the requests in the netReadBuffer, this should not block + val req2 = receiveRequest(testableServer.dataPlaneRequestChannel) + processRequest(testableServer.dataPlaneRequestChannel, req2) + +} finally { + proxyServer.close() + shutdownServerAndMetrics(testableServer) +} + } + private def sslServerProps: Properties = { val trustStoreFile = File.createTempFile("truststore", ".jks") val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL), @@ -2044,10 +2082,12 @@ class SocketServerTest { } def testableSelector: TestableSelector = - dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector] + testableProcessor.selector.asInstanceOf[TestableSelector] -def testableProcessor: TestableProcessor = +def testableProcessor: TestableProcessor = { + val endpoint = this.config.dataPlaneListeners.head dataPlaneAcceptors.get(endpoint).processors(0).asInstanceOf[TestableProcessor] +} def waitForChannelClose(connectionId: String, locallyClosed: Boolean): Unit = { val selector = testableSelector
[kafka] branch 3.4 updated: KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new f5f1060a81d KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954) f5f1060a81d is described below commit f5f1060a81dd200e573ef69574ea75b64b73474c Author: Rajini Sivaram AuthorDate: Wed Dec 7 11:41:21 2022 + KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954) Reviewers: David Jacot --- checkstyle/suppressions.xml| 2 +- .../consumer/ConsumerPartitionAssignor.java| 15 +++-- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../consumer/internals/ConsumerCoordinator.java| 8 ++- .../consumer/internals/ConsumerProtocol.java | 5 +- .../common/message/ConsumerProtocolAssignment.json | 3 +- .../message/ConsumerProtocolSubscription.json | 6 +- .../consumer/CooperativeStickyAssignorTest.java| 8 +-- .../kafka/clients/consumer/KafkaConsumerTest.java | 3 +- .../kafka/clients/consumer/StickyAssignorTest.java | 8 +-- .../internals/ConsumerCoordinatorTest.java | 67 +++--- .../consumer/internals/ConsumerProtocolTest.java | 23 ++-- .../kafka/api/PlaintextConsumerTest.scala | 17 ++ 13 files changed, 136 insertions(+), 32 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ac0accc17fa..cea6a193790 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -66,7 +66,7 @@ + files="(KafkaConsumer|ConsumerCoordinator).java"/> topics; private final ByteBuffer userData; private final List ownedPartitions; +private final Optional rackId; private Optional groupInstanceId; private final Optional generationId; -public Subscription(List topics, ByteBuffer userData, List ownedPartitions, int generationId) { +public Subscription(List topics, ByteBuffer userData, List ownedPartitions, int generationId, Optional rackId) { this.topics = topics; this.userData = userData; this.ownedPartitions = ownedPartitions; this.groupInstanceId = Optional.empty(); this.generationId = generationId < 0 ? Optional.empty() : Optional.of(generationId); +this.rackId = rackId; } public Subscription(List topics, ByteBuffer userData, List ownedPartitions) { -this(topics, userData, ownedPartitions, DEFAULT_GENERATION); +this(topics, userData, ownedPartitions, DEFAULT_GENERATION, Optional.empty()); } public Subscription(List topics, ByteBuffer userData) { -this(topics, userData, Collections.emptyList(), DEFAULT_GENERATION); +this(topics, userData, Collections.emptyList(), DEFAULT_GENERATION, Optional.empty()); } public Subscription(List topics) { -this(topics, null, Collections.emptyList(), DEFAULT_GENERATION); +this(topics, null, Collections.emptyList(), DEFAULT_GENERATION, Optional.empty()); } public List topics() { @@ -138,6 +140,10 @@ public interface ConsumerPartitionAssignor { return ownedPartitions; } +public Optional rackId() { +return rackId; +} + public void setGroupInstanceId(Optional groupInstanceId) { this.groupInstanceId = groupInstanceId; } @@ -158,6 +164,7 @@ public interface ConsumerPartitionAssignor { ", ownedPartitions=" + ownedPartitions + ", groupInstanceId=" + groupInstanceId.map(String::toString).orElse("null") + ", generationId=" + generationId.orElse(-1) + +", rackId=" + (rackId.orElse("null")) + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f07846945a7..cf85798f82b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -790,7 +790,8 @@ public class KafkaConsumer implements Consumer { enableAutoCommit, config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, - config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNS
[kafka] branch trunk updated: KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d23ce20bdfb KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954) d23ce20bdfb is described below commit d23ce20bdfbe5a9598523961cb7cf747ce4f52ef Author: Rajini Sivaram AuthorDate: Wed Dec 7 11:41:21 2022 + KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954) Reviewers: David Jacot --- checkstyle/suppressions.xml| 2 +- .../consumer/ConsumerPartitionAssignor.java| 15 +++-- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../consumer/internals/ConsumerCoordinator.java| 8 ++- .../consumer/internals/ConsumerProtocol.java | 5 +- .../common/message/ConsumerProtocolAssignment.json | 3 +- .../message/ConsumerProtocolSubscription.json | 6 +- .../consumer/CooperativeStickyAssignorTest.java| 8 +-- .../kafka/clients/consumer/KafkaConsumerTest.java | 3 +- .../kafka/clients/consumer/StickyAssignorTest.java | 8 +-- .../internals/ConsumerCoordinatorTest.java | 67 +++--- .../consumer/internals/ConsumerProtocolTest.java | 23 ++-- .../kafka/api/PlaintextConsumerTest.scala | 17 ++ 13 files changed, 136 insertions(+), 32 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ac0accc17fa..cea6a193790 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -66,7 +66,7 @@ + files="(KafkaConsumer|ConsumerCoordinator).java"/> topics; private final ByteBuffer userData; private final List ownedPartitions; +private final Optional rackId; private Optional groupInstanceId; private final Optional generationId; -public Subscription(List topics, ByteBuffer userData, List ownedPartitions, int generationId) { +public Subscription(List topics, ByteBuffer userData, List ownedPartitions, int generationId, Optional rackId) { this.topics = topics; this.userData = userData; this.ownedPartitions = ownedPartitions; this.groupInstanceId = Optional.empty(); this.generationId = generationId < 0 ? Optional.empty() : Optional.of(generationId); +this.rackId = rackId; } public Subscription(List topics, ByteBuffer userData, List ownedPartitions) { -this(topics, userData, ownedPartitions, DEFAULT_GENERATION); +this(topics, userData, ownedPartitions, DEFAULT_GENERATION, Optional.empty()); } public Subscription(List topics, ByteBuffer userData) { -this(topics, userData, Collections.emptyList(), DEFAULT_GENERATION); +this(topics, userData, Collections.emptyList(), DEFAULT_GENERATION, Optional.empty()); } public Subscription(List topics) { -this(topics, null, Collections.emptyList(), DEFAULT_GENERATION); +this(topics, null, Collections.emptyList(), DEFAULT_GENERATION, Optional.empty()); } public List topics() { @@ -138,6 +140,10 @@ public interface ConsumerPartitionAssignor { return ownedPartitions; } +public Optional rackId() { +return rackId; +} + public void setGroupInstanceId(Optional groupInstanceId) { this.groupInstanceId = groupInstanceId; } @@ -158,6 +164,7 @@ public interface ConsumerPartitionAssignor { ", ownedPartitions=" + ownedPartitions + ", groupInstanceId=" + groupInstanceId.map(String::toString).orElse("null") + ", generationId=" + generationId.orElse(-1) + +", rackId=" + (rackId.orElse("null")) + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f07846945a7..cf85798f82b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -790,7 +790,8 @@ public class KafkaConsumer implements Consumer { enableAutoCommit, config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, - config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNS
[kafka] branch trunk updated (3799708ff09 -> bc1ce9f0f1b)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 3799708ff09 KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest (#13156) add bc1ce9f0f1b KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging (#13119) No new revisions were added by this update. Summary of changes: .../internals/secured/HttpAccessTokenRetriever.java | 17 - 1 file changed, 12 insertions(+), 5 deletions(-)
[kafka] branch trunk updated: KAFKA-14770: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match (#13346)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4d43abf1e09 KAFKA-14770: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match (#13346) 4d43abf1e09 is described below commit 4d43abf1e09e01fc5e7af52f65e3fbae02cf9771 Author: Rajini Sivaram AuthorDate: Tue Mar 7 09:41:01 2023 + KAFKA-14770: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match (#13346) To avoid mistakes during dynamic broker config updates that could potentially affect clients, we restrict changes that can be performed dynamically without broker restart. For broker keystore updates, we require the DN to be the same for the old and new certificates since this could potentially contain host names used for host name verification by clients. DNs are compared using standard Java implementation of X500Principal.equals() which compares canonical names. If tags of fields ch [...] Reviewers: Manikumar Reddy , Kalpesh Patel --- .../kafka/common/security/ssl/SslFactory.java | 8 - .../kafka/common/security/ssl/SslFactoryTest.java | 38 ++ .../java/org/apache/kafka/test/TestSslUtils.java | 19 +-- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index d0cc4cc1e69..65c37aa6b47 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -312,7 +312,13 @@ public class SslFactory implements Reconfigurable, Closeable { for (int i = 0; i < newEntries.size(); i++) { CertificateEntries newEntry = newEntries.get(i); CertificateEntries oldEntry = oldEntries.get(i); -if (!Objects.equals(newEntry.subjectPrincipal, oldEntry.subjectPrincipal)) { +Principal newPrincipal = newEntry.subjectPrincipal; +Principal oldPrincipal = oldEntry.subjectPrincipal; +// Compare principal objects to compare canonical names (e.g. to ignore leading/trailing whitespaces). +// Canonical names may differ if the tags of a field changes from one with a printable string representation +// to one without or vice-versa due to optional conversion to hex representation based on the tag. So we +// also compare Principal.getName which compares the RFC2253 name. If either matches, allow dynamic update. +if (!Objects.equals(newPrincipal, oldPrincipal) && !newPrincipal.getName().equalsIgnoreCase(oldPrincipal.getName())) { throw new ConfigException(String.format("Keystore DistinguishedName does not match: " + " existing={alias=%s, DN=%s}, new={alias=%s, DN=%s}", oldEntry.alias, oldEntry.subjectPrincipal, newEntry.alias, newEntry.subjectPrincipal)); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index 21dcd6e4b0f..7ac707b5de7 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -20,7 +20,9 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.security.GeneralSecurityException; +import java.security.KeyPair; import java.security.KeyStore; +import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.Map; @@ -46,6 +48,7 @@ import org.apache.kafka.common.network.Mode; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; +import static org.apache.kafka.common.security.ssl.SslFactory.CertificateEntries.ensureCompatible; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -521,6 +524,41 @@ public abstract class SslFactoryTest { assertFalse(securityConfig.unused().contains(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG)); } +@Test +public void testDynamicUpdateCompatibility() throws Exception { +KeyPair keyPair = TestSslUtils.generateKeyPair("RSA"); +KeyStore ks = createKeyStore(keyPair, "*.example.com", "Kafka", true, "localhost", "*.example.com"); +ensureCompatible(ks, k
[kafka] branch trunk updated (20e05695f95 -> 1401769d92f)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 20e05695f95 KAFKA-14447: remove stale TODO comment (#13258) add 1401769d92f KAFKA-14452: Refactor AbstractStickyAssignor to prepare for rack-aware assignment (#13349) No new revisions were added by this update. Summary of changes: .../consumer/internals/AbstractStickyAssignor.java | 1801 ++-- 1 file changed, 911 insertions(+), 890 deletions(-)
[kafka] branch trunk updated: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881) (#12990)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 98d84b17f74 KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881) (#12990) 98d84b17f74 is described below commit 98d84b17f74b0bfe65163d0ddf88976746de5f7e Author: Rajini Sivaram AuthorDate: Wed Mar 1 21:01:35 2023 + KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881) (#12990) Best-effort rack alignment for range assignor when both consumer racks and partition racks are available with the protocol changes introduced in KIP-881. Rack-aware assignment is enabled by configuring client.rack for consumers. Balanced assignment per topic is prioritized over rack-alignment. For topics with equal partitions and the same set of subscribers, co-partitioning is prioritized over rack-alignment. Reviewers: David Jacot --- .../kafka/clients/consumer/RangeAssignor.java | 229 +-- .../internals/AbstractPartitionAssignor.java | 66 +++- .../kafka/clients/consumer/internals/Utils.java| 4 +- .../kafka/clients/consumer/RangeAssignorTest.java | 436 - .../clients/consumer/RoundRobinAssignorTest.java | 11 +- .../internals/AbstractPartitionAssignorTest.java | 169 .../kafka/api/PlaintextConsumerTest.scala | 36 +- .../server/FetchFromFollowerIntegrationTest.scala | 66 +++- 8 files changed, 852 insertions(+), 165 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java index aec0d3997c4..0b3071a4915 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java @@ -17,13 +17,27 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; /** * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order @@ -63,9 +77,26 @@ import java.util.Map; * I0: [t0p0, t0p1, t1p0, t1p1] * I1: [t0p2, t1p2] * + * + * Rack-aware assignment is used if both consumer and partition replica racks are available and + * some partitions have replicas only on a subset of racks. We attempt to match consumer racks with + * partition replica racks on a best-effort basis, prioritizing balanced assignment over rack alignment. + * Topics with equal partition count and same set of subscribers guarantee co-partitioning by prioritizing + * co-partitioning over rack alignment. In this case, aligning partition replicas of these topics on the + * same racks will improve locality for consumers. For example, if partitions 0 of all topics have a replica + * on rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be assigned to a consumer + * on rack 'a', partition 1 to a consumer on rack 'b' and so on. + * + * Note that rack-aware assignment currently takes all replicas into account, including any offline replicas + * and replicas that are not in the ISR. This is based on the assumption that these replicas are likely + * to join the ISR relatively soon. Since consumers don't rebalance on ISR change, this avoids unnecessary + * cross-rack traffic for long durations after replicas rejoin the ISR. In the future, we may consider + * rebalancing when replicas are added or removed to improve consumer rack alignment. + * */ public class RangeAssignor extends AbstractPartitionAssignor { public static final String RANGE_ASSIGNOR_NAME = "range"; +private static final TopicPartitionComparator PARTITION_COMPARATOR = new TopicPartitionComparator(); @Override public String name() { @@ -74,45 +105,193 @@ public class RangeAssignor extends AbstractPartitionAssignor { private Map> consumersPerTopic(Map consumerMetadata) { Map> topicToConsumers = new HashMap<>(); -for (Map.Entry subscriptionEntry : consumerMetadata.entrySet())
[kafka] branch trunk updated: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881) (#13474)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3c4472d701a KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881) (#13474) 3c4472d701a is described below commit 3c4472d701a7e9d9b8714a0b9d87ae190d1679fb Author: Rajini Sivaram AuthorDate: Fri Mar 31 15:01:07 2023 +0100 KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881) (#13474) When `client.rack` is configured for consumers, we perform rack-aware consumer partition assignment to improve locality. After/during reassignments, replica racks may change, so to ensure optimal consumer assignment, trigger rebalance from the leader when set of racks of any partition changes. Reviewers: David Jacot --- .../consumer/internals/ConsumerCoordinator.java| 58 +-- .../internals/ConsumerCoordinatorTest.java | 171 ++--- .../server/FetchFromFollowerIntegrationTest.scala | 26 +++- 3 files changed, 224 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index fec31fe80f8..1f6c5ef0d75 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; import org.apache.kafka.clients.GroupRebalanceConfig; @@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparato import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; @@ -174,7 +176,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.rebalanceConfig = rebalanceConfig; this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; -this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion()); +this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); +this.metadataSnapshot = new MetadataSnapshot(this.rackId, subscriptions, metadata.fetch(), metadata.updateVersion()); this.subscriptions = subscriptions; this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback(); this.autoCommitEnabled = autoCommitEnabled; @@ -188,7 +191,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, JoinGroupRequest.UNKNOWN_GENERATION_ID, JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId); this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; -this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); if (autoCommitEnabled) this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs); @@ -489,7 +491,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // Update the current snapshot, which will be used to check for subscription // changes that would require a rebalance (e.g. new partitions). -metadataSnapshot = new MetadataSnapshot(subscriptions, cluster, version); +metadataSnapshot = new MetadataSnapshot(rackId, subscriptions, cluster, version); } } @@ -1613,14 +1615,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private static class MetadataSnapshot { private final int version; -private final Map partitionsPerTopic; +private final Map> partitionsPerTopic; -private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) { -Map partitionsPerTopic = new HashMap<>(); +private MetadataSnapshot(Optional clientRack, SubscriptionState subscription, Cluster cluster, int version) { +Map> partitionsPerTopic = new HashMap<>(); for (String topic : subscription.metadataTopics()) { -Integer numPartitions = cluster.partitionCountForTopic(topic); -if (num
[kafka] branch trunk updated (970dea60e86 -> 1f0ae71fb32)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 970dea60e86 KAFKA-14785 (KIP-875): Connect offset read REST API (#13434) add 1f0ae71fb32 KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881) (#13350) No new revisions were added by this update. Summary of changes: .../consumer/CooperativeStickyAssignor.java| 9 +- .../kafka/clients/consumer/StickyAssignor.java | 2 +- .../consumer/internals/AbstractStickyAssignor.java | 329 ++-- .../consumer/CooperativeStickyAssignorTest.java| 72 +- .../kafka/clients/consumer/StickyAssignorTest.java | 206 ++--- .../internals/AbstractPartitionAssignorTest.java | 48 +- .../internals/AbstractStickyAssignorTest.java | 827 ++--- 7 files changed, 1014 insertions(+), 479 deletions(-)
[kafka] branch trunk updated (e99984248da -> b64ac94a8c6)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from e99984248da KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager (#13487) add b64ac94a8c6 KAFKA-14891: Fix rack-aware range assignor to assign co-partitioned subsets (#13539) No new revisions were added by this update. Summary of changes: .../kafka/clients/consumer/RangeAssignor.java | 2 +- .../kafka/clients/consumer/RangeAssignorTest.java | 30 -- 2 files changed, 29 insertions(+), 3 deletions(-)
[kafka] branch trunk updated: KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ef9c9486dac KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211) ef9c9486dac is described below commit ef9c9486dac8d6076e2897580b10a699697205ac Author: Rajini Sivaram AuthorDate: Sun Feb 12 19:49:12 2023 + KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211) We currently cache login managers in static maps for both static JAAS config using system property and for JAAS config specified using Kafka config sasl.jaas.config. In addition to the JAAS config, the login manager callback handler is included in the key, but all other configs are ignored. This implementation is based on the assumption clients that require different logins (e.g. username/password) use different JAAS configs, because login properties are included in the JAAS config ra [...] This PR includes all SASL configs prefixed with sasl. to be included in the key so that logins are only shared if all the sasl configs are identical. Reviewers: Manikumar Reddy , Kirk True --- .../security/authenticator/LoginManager.java | 17 ++--- .../security/authenticator/LoginManagerTest.java | 43 ++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java index 6613fd147f8..f84a8ac9b75 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -99,14 +99,14 @@ public class LoginManager { LoginManager loginManager; Password jaasConfigValue = jaasContext.dynamicJaasConfig(); if (jaasConfigValue != null) { -LoginMetadata loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass); +LoginMetadata loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs); loginManager = DYNAMIC_INSTANCES.get(loginMetadata); if (loginManager == null) { loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata); DYNAMIC_INSTANCES.put(loginMetadata, loginManager); } } else { -LoginMetadata loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass); +LoginMetadata loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs); loginManager = STATIC_INSTANCES.get(loginMetadata); if (loginManager == null) { loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata); @@ -198,17 +198,23 @@ public class LoginManager { final T configInfo; final Class loginClass; final Class loginCallbackClass; +final Map saslConfigs; LoginMetadata(T configInfo, Class loginClass, - Class loginCallbackClass) { + Class loginCallbackClass, + Map configs) { this.configInfo = configInfo; this.loginClass = loginClass; this.loginCallbackClass = loginCallbackClass; +this.saslConfigs = new HashMap<>(); +configs.entrySet().stream() +.filter(e -> e.getKey().startsWith("sasl.")) +.forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); // value may be null } @Override public int hashCode() { -return Objects.hash(configInfo, loginClass, loginCallbackClass); +return Objects.hash(configInfo, loginClass, loginCallbackClass, saslConfigs); } @Override @@ -219,7 +225,8 @@ public class LoginManager { LoginMetadata loginMetadata = (LoginMetadata) o; return Objects.equals(configInfo, loginMetadata.configInfo) && Objects.equals(loginClass, loginMetadata.loginClass) && - Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass); + Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass) && + Objects.equals(saslConfigs, loginMetadata.saslConfigs); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/secu
[kafka] branch 3.4 updated: KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new f7a9f5d6a32 KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211) f7a9f5d6a32 is described below commit f7a9f5d6a3203bcf2df75ea52bb0ba5a9f7474a6 Author: Rajini Sivaram AuthorDate: Sun Feb 12 19:49:12 2023 + KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211) We currently cache login managers in static maps for both static JAAS config using system property and for JAAS config specified using Kafka config sasl.jaas.config. In addition to the JAAS config, the login manager callback handler is included in the key, but all other configs are ignored. This implementation is based on the assumption clients that require different logins (e.g. username/password) use different JAAS configs, because login properties are included in the JAAS config ra [...] This PR includes all SASL configs prefixed with sasl. to be included in the key so that logins are only shared if all the sasl configs are identical. Reviewers: Manikumar Reddy , Kirk True --- .../security/authenticator/LoginManager.java | 17 ++--- .../security/authenticator/LoginManagerTest.java | 43 ++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java index 6613fd147f8..f84a8ac9b75 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -99,14 +99,14 @@ public class LoginManager { LoginManager loginManager; Password jaasConfigValue = jaasContext.dynamicJaasConfig(); if (jaasConfigValue != null) { -LoginMetadata loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass); +LoginMetadata loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs); loginManager = DYNAMIC_INSTANCES.get(loginMetadata); if (loginManager == null) { loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata); DYNAMIC_INSTANCES.put(loginMetadata, loginManager); } } else { -LoginMetadata loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass); +LoginMetadata loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs); loginManager = STATIC_INSTANCES.get(loginMetadata); if (loginManager == null) { loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata); @@ -198,17 +198,23 @@ public class LoginManager { final T configInfo; final Class loginClass; final Class loginCallbackClass; +final Map saslConfigs; LoginMetadata(T configInfo, Class loginClass, - Class loginCallbackClass) { + Class loginCallbackClass, + Map configs) { this.configInfo = configInfo; this.loginClass = loginClass; this.loginCallbackClass = loginCallbackClass; +this.saslConfigs = new HashMap<>(); +configs.entrySet().stream() +.filter(e -> e.getKey().startsWith("sasl.")) +.forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); // value may be null } @Override public int hashCode() { -return Objects.hash(configInfo, loginClass, loginCallbackClass); +return Objects.hash(configInfo, loginClass, loginCallbackClass, saslConfigs); } @Override @@ -219,7 +225,8 @@ public class LoginManager { LoginMetadata loginMetadata = (LoginMetadata) o; return Objects.equals(configInfo, loginMetadata.configInfo) && Objects.equals(loginClass, loginMetadata.loginClass) && - Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass); + Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass) && + Objects.equals(saslConfigs, loginMetadata.saslConfigs); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/au
[kafka] branch 3.3 updated: KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 6a86e54c764 KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211) 6a86e54c764 is described below commit 6a86e54c7645c3db950640b10340ff347a0bc1a2 Author: Rajini Sivaram AuthorDate: Sun Feb 12 19:49:12 2023 + KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211) We currently cache login managers in static maps for both static JAAS config using system property and for JAAS config specified using Kafka config sasl.jaas.config. In addition to the JAAS config, the login manager callback handler is included in the key, but all other configs are ignored. This implementation is based on the assumption clients that require different logins (e.g. username/password) use different JAAS configs, because login properties are included in the JAAS config ra [...] This PR includes all SASL configs prefixed with sasl. to be included in the key so that logins are only shared if all the sasl configs are identical. Reviewers: Manikumar Reddy , Kirk True --- .../security/authenticator/LoginManager.java | 17 ++--- .../security/authenticator/LoginManagerTest.java | 43 ++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java index 6613fd147f8..f84a8ac9b75 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -99,14 +99,14 @@ public class LoginManager { LoginManager loginManager; Password jaasConfigValue = jaasContext.dynamicJaasConfig(); if (jaasConfigValue != null) { -LoginMetadata loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass); +LoginMetadata loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs); loginManager = DYNAMIC_INSTANCES.get(loginMetadata); if (loginManager == null) { loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata); DYNAMIC_INSTANCES.put(loginMetadata, loginManager); } } else { -LoginMetadata loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass); +LoginMetadata loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs); loginManager = STATIC_INSTANCES.get(loginMetadata); if (loginManager == null) { loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata); @@ -198,17 +198,23 @@ public class LoginManager { final T configInfo; final Class loginClass; final Class loginCallbackClass; +final Map saslConfigs; LoginMetadata(T configInfo, Class loginClass, - Class loginCallbackClass) { + Class loginCallbackClass, + Map configs) { this.configInfo = configInfo; this.loginClass = loginClass; this.loginCallbackClass = loginCallbackClass; +this.saslConfigs = new HashMap<>(); +configs.entrySet().stream() +.filter(e -> e.getKey().startsWith("sasl.")) +.forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); // value may be null } @Override public int hashCode() { -return Objects.hash(configInfo, loginClass, loginCallbackClass); +return Objects.hash(configInfo, loginClass, loginCallbackClass, saslConfigs); } @Override @@ -219,7 +225,8 @@ public class LoginManager { LoginMetadata loginMetadata = (LoginMetadata) o; return Objects.equals(configInfo, loginMetadata.configInfo) && Objects.equals(loginClass, loginMetadata.loginClass) && - Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass); + Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass) && + Objects.equals(saslConfigs, loginMetadata.saslConfigs); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/au
(kafka) branch trunk updated: KAFKA-16305: Avoid optimisation in handshakeUnwrap (#15434)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5d6936a4992 KAFKA-16305: Avoid optimisation in handshakeUnwrap (#15434) 5d6936a4992 is described below commit 5d6936a4992b77ef68da216a7c2dbf1f8c9f909e Author: Gaurav Narula AuthorDate: Wed Feb 28 09:37:58 2024 + KAFKA-16305: Avoid optimisation in handshakeUnwrap (#15434) Performs additional unwrap during handshake after data from client is processed to support openssl, which needs the extra unwrap to complete handshake. Reviewers: Ismael Juma , Rajini Sivaram --- .../kafka/common/network/SslTransportLayer.java| 7 +-- .../common/network/SslTransportLayerTest.java | 60 ++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 904c5216a40..da80e363a95 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -498,13 +498,14 @@ public class SslTransportLayer implements TransportLayer { } /** - * Perform handshake unwrap + * Perform handshake unwrap. + * Visible for testing. * @param doRead boolean If true, read more from the socket channel * @param ignoreHandshakeStatus If true, continue to unwrap if data available regardless of handshake status * @return SSLEngineResult * @throws IOException */ -private SSLEngineResult handshakeUnwrap(boolean doRead, boolean ignoreHandshakeStatus) throws IOException { +SSLEngineResult handshakeUnwrap(boolean doRead, boolean ignoreHandshakeStatus) throws IOException { log.trace("SSLHandshake handshakeUnwrap {}", channelId); SSLEngineResult result; int read = 0; @@ -526,7 +527,7 @@ public class SslTransportLayer implements TransportLayer { handshakeStatus == HandshakeStatus.NEED_UNWRAP) || (ignoreHandshakeStatus && netReadBuffer.position() != position); log.trace("SSLHandshake handshakeUnwrap: handshakeStatus {} status {}", handshakeStatus, result.getStatus()); -} while (netReadBuffer.position() != 0 && cont); +} while (cont); // Throw EOF exception for failed read after processing already received data // so that handshake failures are reported correctly diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index d92f4facb3c..8b00bcdb955 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -51,6 +52,7 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -65,13 +67,20 @@ import java.util.stream.Stream; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSession; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses. @@ -1467,4 +1476,55 @@ public class SslTransportLayerTest { } } } + +/** + * SSLEngine implementations may transition from NEED_UNWRAP to NEED_UNWRAP + * even after reading all the data from the socket. This test ensures we + * con