[kafka] branch 2.7 updated: KAFKA-13277; Fix size calculation for tagged string fields in message generator (#11308)

2021-09-08 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
 new d4e2273  KAFKA-13277; Fix size calculation for tagged string fields in 
message generator (#11308)
d4e2273 is described below

commit d4e227379c9096499788719fdbf2d1a5105d8ee9
Author: Rajini Sivaram 
AuthorDate: Tue Sep 7 21:02:45 2021 +0100

KAFKA-13277; Fix size calculation for tagged string fields in message 
generator (#11308)

Reviewers: Colin P. McCabe 
---
 .../org/apache/kafka/common/message/MessageTest.java | 16 
 .../org/apache/kafka/message/MessageDataGenerator.java   |  2 +-
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java 
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 94e6bc2..9cf18cf 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -1068,6 +1068,22 @@ public final class MessageTest {
 verifyWriteSucceeds((short) 6, createTopics);
 }
 
+@Test
+public void testLongTaggedString() throws Exception {
+char[] chars = new char[1024];
+Arrays.fill(chars, 'a');
+String longString = new String(chars);
+SimpleExampleMessageData message = new SimpleExampleMessageData()
+.setMyString(longString);
+ObjectSerializationCache cache = new ObjectSerializationCache();
+short version = 1;
+int size = message.size(cache, version);
+ByteBuffer buf = ByteBuffer.allocate(size);
+ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(buf);
+message.write(byteBufferAccessor, cache, version);
+assertEquals(size, buf.position());
+}
+
 private void verifyWriteRaisesNpe(short version, Message message) {
 ObjectSerializationCache cache = new ObjectSerializationCache();
 assertThrows(NullPointerException.class, () -> {
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 9dcaa76..54f2364 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -1530,7 +1530,7 @@ public final class MessageDataGenerator implements 
MessageClassGenerator {
 buffer.printf("int _stringPrefixSize = " +
 
"ByteUtils.sizeOfUnsignedVarint(_stringBytes.length + 1);%n");
 buffer.printf("_size += _stringBytes.length + 
_stringPrefixSize + " +
-
"ByteUtils.sizeOfUnsignedVarint(_stringPrefixSize);%n");
+
"ByteUtils.sizeOfUnsignedVarint(_stringPrefixSize + _stringBytes.length);%n");
 } else {
 buffer.printf("_size += _stringBytes.length + 
" +
 
"ByteUtils.sizeOfUnsignedVarint(_stringBytes.length + 1);%n");


[kafka] branch trunk updated: KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136)

2021-07-28 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new fe0fe68  KAFKA-13141; Skip follower fetch offset update in leader if 
diverging epoch is present (#11136)
fe0fe68 is described below

commit fe0fe686e92d019ac2b5c8407ab2cbb55ae069e1
Author: Rajini Sivaram 
AuthorDate: Wed Jul 28 17:27:26 2021 +0100

KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch 
is present (#11136)

Reviewers: Jason Gustafson 
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  7 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala | 23 ++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 949ff47..a9c99a9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1769,7 +1769,8 @@ class ReplicaManager(val config: KafkaConfig,
* records in fetch response. Log start/end offset and high watermark may 
change not only due to
* this fetch request, e.g., rolling new log segment and removing old log 
segment may move log
* start offset further than the last offset in the fetched records. The 
followers will get the
-   * updated leader's state in the next fetch response.
+   * updated leader's state in the next fetch response. If follower has a 
diverging epoch or if read
+   * fails with any error, follower fetch state is not updated.
*/
   private def updateFollowerFetchState(followerId: Int,
readResults: Seq[(TopicPartition, 
LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1778,6 +1779,10 @@ class ReplicaManager(val config: KafkaConfig,
 debug(s"Skipping update of fetch state for follower $followerId since 
the " +
   s"log read returned error ${readResult.error}")
 readResult
+  } else if (readResult.divergingEpoch.nonEmpty) {
+debug(s"Skipping update of fetch state for follower $followerId since 
the " +
+  s"log read returned diverging epoch ${readResult.divergingEpoch}")
+readResult
   } else {
 onlinePartition(topicPartition) match {
   case Some(partition) =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1d875ae..d50aa7b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -703,6 +703,29 @@ class ReplicaManagerTest {
   assertEquals(0L, followerReplica.logStartOffset)
   assertEquals(0L, followerReplica.logEndOffset)
 
+  // Next we receive an invalid request with a higher fetch offset, but a 
diverging epoch.
+  // We expect that the replica state does not get updated.
+  val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, 
maxFetchBytes,
+Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1))
+
+  replicaManager.fetchMessages(
+timeout = 0L,
+replicaId = 1,
+fetchMinBytes = 1,
+fetchMaxBytes = maxFetchBytes,
+hardMaxBytesLimit = false,
+fetchInfos = Seq(tp -> divergingFetchPartitionData),
+topicIds = topicIds.asJava,
+quota = UnboundedQuota,
+isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+responseCallback = callback,
+clientMetadata = None
+  )
+
+  assertTrue(successfulFetch.isDefined)
+  assertEquals(0L, followerReplica.logStartOffset)
+  assertEquals(0L, followerReplica.logEndOffset)
+
 } finally {
   replicaManager.shutdown(checkpointHW = false)
 }


[kafka] branch 2.7 updated: KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136)

2021-07-28 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
 new c69d5f3  KAFKA-13141; Skip follower fetch offset update in leader if 
diverging epoch is present (#11136)
c69d5f3 is described below

commit c69d5f30f9ecac7e0074b21d9170de4837be6067
Author: Rajini Sivaram 
AuthorDate: Wed Jul 28 17:27:26 2021 +0100

KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch 
is present (#11136)

Reviewers: Jason Gustafson 
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  7 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala | 22 ++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1b819d3..7d40110 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1709,7 +1709,8 @@ class ReplicaManager(val config: KafkaConfig,
* records in fetch response. Log start/end offset and high watermark may 
change not only due to
* this fetch request, e.g., rolling new log segment and removing old log 
segment may move log
* start offset further than the last offset in the fetched records. The 
followers will get the
-   * updated leader's state in the next fetch response.
+   * updated leader's state in the next fetch response. If follower has a 
diverging epoch or if read
+   * fails with any error, follower fetch state is not updated.
*/
   private def updateFollowerFetchState(followerId: Int,
readResults: Seq[(TopicPartition, 
LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1718,6 +1719,10 @@ class ReplicaManager(val config: KafkaConfig,
 debug(s"Skipping update of fetch state for follower $followerId since 
the " +
   s"log read returned error ${readResult.error}")
 readResult
+  } else if (readResult.divergingEpoch.nonEmpty) {
+debug(s"Skipping update of fetch state for follower $followerId since 
the " +
+  s"log read returned diverging epoch ${readResult.divergingEpoch}")
+readResult
   } else {
 nonOfflinePartition(topicPartition) match {
   case Some(partition) =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6a0f94c..127aff5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -671,6 +671,28 @@ class ReplicaManagerTest {
   assertEquals(0L, followerReplica.logStartOffset)
   assertEquals(0L, followerReplica.logEndOffset)
 
+  // Next we receive an invalid request with a higher fetch offset, but a 
diverging epoch.
+  // We expect that the replica state does not get updated.
+  val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, 
maxFetchBytes,
+Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1))
+
+  replicaManager.fetchMessages(
+timeout = 0L,
+replicaId = 1,
+fetchMinBytes = 1,
+fetchMaxBytes = maxFetchBytes,
+hardMaxBytesLimit = false,
+fetchInfos = Seq(tp -> divergingFetchPartitionData),
+quota = UnboundedQuota,
+isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+responseCallback = callback,
+clientMetadata = None
+  )
+
+  assertTrue(successfulFetch.isDefined)
+  assertEquals(0L, followerReplica.logStartOffset)
+  assertEquals(0L, followerReplica.logEndOffset)
+
 } finally {
   replicaManager.shutdown(checkpointHW = false)
 }


[kafka] branch 3.0 updated: KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136)

2021-07-28 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
 new 047608f  KAFKA-13141; Skip follower fetch offset update in leader if 
diverging epoch is present (#11136)
047608f is described below

commit 047608fe5e6631139d890ce1ca045052daa0e43c
Author: Rajini Sivaram 
AuthorDate: Wed Jul 28 17:27:26 2021 +0100

KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch 
is present (#11136)

Reviewers: Jason Gustafson 
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  7 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala | 22 ++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6837d81..f03571c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1750,7 +1750,8 @@ class ReplicaManager(val config: KafkaConfig,
* records in fetch response. Log start/end offset and high watermark may 
change not only due to
* this fetch request, e.g., rolling new log segment and removing old log 
segment may move log
* start offset further than the last offset in the fetched records. The 
followers will get the
-   * updated leader's state in the next fetch response.
+   * updated leader's state in the next fetch response. If follower has a 
diverging epoch or if read
+   * fails with any error, follower fetch state is not updated.
*/
   private def updateFollowerFetchState(followerId: Int,
readResults: Seq[(TopicPartition, 
LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1759,6 +1760,10 @@ class ReplicaManager(val config: KafkaConfig,
 debug(s"Skipping update of fetch state for follower $followerId since 
the " +
   s"log read returned error ${readResult.error}")
 readResult
+  } else if (readResult.divergingEpoch.nonEmpty) {
+debug(s"Skipping update of fetch state for follower $followerId since 
the " +
+  s"log read returned diverging epoch ${readResult.divergingEpoch}")
+readResult
   } else {
 onlinePartition(topicPartition) match {
   case Some(partition) =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5e2563e..fd6ba75 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -698,6 +698,28 @@ class ReplicaManagerTest {
   assertEquals(0L, followerReplica.logStartOffset)
   assertEquals(0L, followerReplica.logEndOffset)
 
+  // Next we receive an invalid request with a higher fetch offset, but a 
diverging epoch.
+  // We expect that the replica state does not get updated.
+  val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, 
maxFetchBytes,
+Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1))
+
+  replicaManager.fetchMessages(
+timeout = 0L,
+replicaId = 1,
+fetchMinBytes = 1,
+fetchMaxBytes = maxFetchBytes,
+hardMaxBytesLimit = false,
+fetchInfos = Seq(tp -> divergingFetchPartitionData),
+quota = UnboundedQuota,
+isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+responseCallback = callback,
+clientMetadata = None
+  )
+
+  assertTrue(successfulFetch.isDefined)
+  assertEquals(0L, followerReplica.logStartOffset)
+  assertEquals(0L, followerReplica.logEndOffset)
+
 } finally {
   replicaManager.shutdown(checkpointHW = false)
 }


[kafka] branch 2.8 updated: KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136)

2021-07-28 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
 new 7e4a315  KAFKA-13141; Skip follower fetch offset update in leader if 
diverging epoch is present (#11136)
7e4a315 is described below

commit 7e4a315963a3d635756183e4c509dcfd82d8359d
Author: Rajini Sivaram 
AuthorDate: Wed Jul 28 17:27:26 2021 +0100

KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch 
is present (#11136)

Reviewers: Jason Gustafson 
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  7 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala | 22 ++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6e4fc5c..b25516c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1753,7 +1753,8 @@ class ReplicaManager(val config: KafkaConfig,
* records in fetch response. Log start/end offset and high watermark may 
change not only due to
* this fetch request, e.g., rolling new log segment and removing old log 
segment may move log
* start offset further than the last offset in the fetched records. The 
followers will get the
-   * updated leader's state in the next fetch response.
+   * updated leader's state in the next fetch response. If follower has a 
diverging epoch or if read
+   * fails with any error, follower fetch state is not updated.
*/
   private def updateFollowerFetchState(followerId: Int,
readResults: Seq[(TopicPartition, 
LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1762,6 +1763,10 @@ class ReplicaManager(val config: KafkaConfig,
 debug(s"Skipping update of fetch state for follower $followerId since 
the " +
   s"log read returned error ${readResult.error}")
 readResult
+  } else if (readResult.divergingEpoch.nonEmpty) {
+debug(s"Skipping update of fetch state for follower $followerId since 
the " +
+  s"log read returned diverging epoch ${readResult.divergingEpoch}")
+readResult
   } else {
 onlinePartition(topicPartition) match {
   case Some(partition) =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9b289e5..881430e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -683,6 +683,28 @@ class ReplicaManagerTest {
   assertEquals(0L, followerReplica.logStartOffset)
   assertEquals(0L, followerReplica.logEndOffset)
 
+  // Next we receive an invalid request with a higher fetch offset, but a 
diverging epoch.
+  // We expect that the replica state does not get updated.
+  val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, 
maxFetchBytes,
+Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1))
+
+  replicaManager.fetchMessages(
+timeout = 0L,
+replicaId = 1,
+fetchMinBytes = 1,
+fetchMaxBytes = maxFetchBytes,
+hardMaxBytesLimit = false,
+fetchInfos = Seq(tp -> divergingFetchPartitionData),
+quota = UnboundedQuota,
+isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+responseCallback = callback,
+clientMetadata = None
+  )
+
+  assertTrue(successfulFetch.isDefined)
+  assertEquals(0L, followerReplica.logStartOffset)
+  assertEquals(0L, followerReplica.logEndOffset)
+
 } finally {
   replicaManager.shutdown(checkpointHW = false)
 }


[kafka] branch trunk updated: KAFKA-10774; Admin API for Describe topic using topic IDs (#9769)

2021-08-28 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 1d22b0d  KAFKA-10774; Admin API for Describe topic using topic IDs 
(#9769)
1d22b0d is described below

commit 1d22b0d70686aef5689b775ea2ea7610a37f3e8c
Author: dengziming 
AuthorDate: Sat Aug 28 16:00:36 2021 +0800

KAFKA-10774; Admin API for Describe topic using topic IDs (#9769)

Reviewers: Justine Olshan , Chia-Ping Tsai 
, Satish Duggana , Rajini Sivaram 

---
 .../java/org/apache/kafka/clients/admin/Admin.java |  44 +--
 .../kafka/clients/admin/DeleteTopicsResult.java|   2 +-
 .../kafka/clients/admin/DescribeTopicsResult.java  |  97 +--
 .../kafka/clients/admin/KafkaAdminClient.java  | 127 
 .../kafka/clients/admin/TopicDescription.java  |  12 +-
 .../apache/kafka/clients/admin/TopicListing.java   |  28 -
 .../main/java/org/apache/kafka/common/Cluster.java |   8 ++
 .../kafka/common/requests/MetadataRequest.java |  40 ++-
 .../kafka/common/requests/MetadataResponse.java|  19 +++
 .../resources/common/message/MetadataRequest.json  |   3 +-
 .../resources/common/message/MetadataResponse.json |   5 +-
 .../kafka/clients/admin/AdminClientTestUtils.java  |   7 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   4 +-
 .../kafka/clients/admin/MockAdminClient.java   | 100 +---
 .../ClientAuthenticationFailureTest.java   |   2 +-
 .../connect/mirror/MirrorSourceConnector.java  |   2 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  |   2 +-
 .../apache/kafka/connect/util/TopicAdminTest.java  |   2 +-
 .../util/clusters/EmbeddedKafkaCluster.java|   2 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |   4 +-
 .../kafka/admin/ReassignPartitionsCommand.scala|   4 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala | 133 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  36 +-
 .../main/scala/kafka/server/MetadataCache.scala|   4 +
 .../kafka/server/metadata/KRaftMetadataCache.scala |   4 +
 .../kafka/server/metadata/ZkMetadataCache.scala|   8 ++
 .../kafka/tools/ReplicaVerificationTool.scala  |   2 +-
 .../kafka/api/BaseAdminIntegrationTest.scala   |   4 +-
 .../api/DescribeAuthorizedOperationsTest.scala |   6 +-
 .../kafka/api/EndToEndAuthorizationTest.scala  |   8 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  24 +++-
 .../SaslClientsWithInvalidCredentialsTest.scala|   2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   2 +-
 .../kafka/server/KRaftClusterTest.scala|   2 +-
 .../MetadataRequestBetweenDifferentIbpTest.scala   |  96 +++
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |   2 +-
 .../kafka/admin/TopicCommandIntegrationTest.scala  |  18 +--
 .../scala/unit/kafka/server/KafkaApisTest.scala| 111 +
 .../test/scala/unit/kafka/utils/TestUtils.scala|   8 +-
 .../TopicBasedRemoteLogMetadataManager.java|   2 +-
 .../processor/internals/InternalTopicManager.java  |   4 +-
 .../KStreamRepartitionIntegrationTest.java |   2 +-
 ...bleJoinTopologyOptimizationIntegrationTest.java |   2 +-
 .../internals/InternalTopicManagerTest.java|   8 +-
 .../kafka/tools/ClientCompatibilityTest.java   |   2 +-
 .../apache/kafka/tools/TransactionsCommand.java|   2 +-
 .../kafka/tools/TransactionsCommandTest.java   |   2 +-
 .../apache/kafka/trogdor/common/WorkerUtils.java   |   4 +-
 .../kafka/trogdor/common/WorkerUtilsTest.java  |   6 +-
 49 files changed, 846 insertions(+), 172 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 54d103b..4b6fe49 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -17,14 +17,6 @@
 
 package org.apache.kafka.clients.admin;
 
-import java.time.Duration;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaFuture;
@@ -42,6 +34,14 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
 /**
  * The administrative client for Kafka, which supports managing and inspecting 
topics, brokers

[kafka] branch 3.1 updated: KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563)

2021-12-02 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
 new de73bea  KAFKA-13461: Don't re-initialize ZK client session after auth 
failure if connection still alive (#11563)
de73bea is described below

commit de73bea58ee6afb466eef4b375f8533aca97494f
Author: Rajini Sivaram 
AuthorDate: Thu Dec 2 22:10:37 2021 +

KAFKA-13461: Don't re-initialize ZK client session after auth failure if 
connection still alive (#11563)

If JAAS configuration does not contain a Client section for ZK clients, an 
auth failure event is generated. If this occurs after the connection is setup 
in the controller, we schedule reinitialize(), which causes controller to 
resign. In the case where SASL is not mandatory and the connection is alive, 
controller maintains the current session and doesn't register its watchers, 
leaving it in a bad state.

Reviewers: Jun Rao 
---
 core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala |  4 ++--
 .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 11 ++-
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 091b401..bc634a8 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -433,14 +433,14 @@ class ZooKeeperClient(connectString: String,
 isConnectedOrExpiredCondition.signalAll()
   }
   if (state == KeeperState.AuthFailed) {
-error("Auth failed.")
+error(s"Auth failed, initialized=$isFirstConnectionEstablished 
connectionState=$connectionState")
 stateChangeHandlers.values.foreach(_.onAuthFailure())
 
 // If this is during initial startup, we fail fast. Otherwise, 
schedule retry.
 val initialized = inLock(isConnectedOrExpiredLock) {
   isFirstConnectionEstablished
 }
-if (initialized)
+if (initialized && !connectionState.isAlive)
   scheduleReinitialize("auth-failed", "Reinitializing due to auth 
failure.", RetryBackoffMs)
   } else if (state == KeeperState.Expired) {
 scheduleReinitialize("session-expired", "Session expired.", 
delayMs = 0L)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 649f3c5..5af2ba8 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -649,9 +649,18 @@ class ZooKeeperClientTest extends QuorumTestHarness {
 }
 
 zooKeeperClient.close()
-zooKeeperClient = newZooKeeperClient()
+@volatile var connectionStateOverride: Option[States] = None
+zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout,
+  zkMaxInFlightRequests, time, "testMetricGroup", "testMetricType", new 
ZKClientConfig, "ZooKeeperClientTest") {
+  override def connectionState: States = 
connectionStateOverride.getOrElse(super.connectionState)
+}
 zooKeeperClient.registerStateChangeHandler(changeHandler)
 
+connectionStateOverride = Some(States.CONNECTED)
+zooKeeperClient.ZooKeeperClientWatcher.process(new 
WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
+assertFalse(sessionInitializedCountDownLatch.await(10, 
TimeUnit.MILLISECONDS), "Unexpected session initialization when connection is 
alive")
+
+connectionStateOverride = Some(States.AUTH_FAILED)
 zooKeeperClient.ZooKeeperClientWatcher.process(new 
WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
 assertTrue(sessionInitializedCountDownLatch.await(5, TimeUnit.SECONDS), 
"Failed to receive session initializing notification")
   }


[kafka] branch trunk updated (62f73c3 -> da56146)

2021-12-02 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 62f73c3  KAFKA-13498: track position in remaining state stores (#11541)
 add da56146  KAFKA-13461: Don't re-initialize ZK client session after auth 
failure if connection still alive (#11563)

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala |  4 ++--
 .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 11 ++-
 2 files changed, 12 insertions(+), 3 deletions(-)


[kafka] branch 3.0 updated: KAFKA-13461: Don't re-initialize ZK client session after auth failure if connection still alive (#11563)

2021-12-02 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
 new ac923b0  KAFKA-13461: Don't re-initialize ZK client session after auth 
failure if connection still alive (#11563)
ac923b0 is described below

commit ac923b0611817c9400e98cac5e0e19c18833de56
Author: Rajini Sivaram 
AuthorDate: Thu Dec 2 22:10:37 2021 +

KAFKA-13461: Don't re-initialize ZK client session after auth failure if 
connection still alive (#11563)

If JAAS configuration does not contain a Client section for ZK clients, an 
auth failure event is generated. If this occurs after the connection is setup 
in the controller, we schedule reinitialize(), which causes controller to 
resign. In the case where SASL is not mandatory and the connection is alive, 
controller maintains the current session and doesn't register its watchers, 
leaving it in a bad state.

Reviewers: Jun Rao 
---
 core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala |  4 ++--
 .../test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 11 ++-
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 091b401..bc634a8 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -433,14 +433,14 @@ class ZooKeeperClient(connectString: String,
 isConnectedOrExpiredCondition.signalAll()
   }
   if (state == KeeperState.AuthFailed) {
-error("Auth failed.")
+error(s"Auth failed, initialized=$isFirstConnectionEstablished 
connectionState=$connectionState")
 stateChangeHandlers.values.foreach(_.onAuthFailure())
 
 // If this is during initial startup, we fail fast. Otherwise, 
schedule retry.
 val initialized = inLock(isConnectedOrExpiredLock) {
   isFirstConnectionEstablished
 }
-if (initialized)
+if (initialized && !connectionState.isAlive)
   scheduleReinitialize("auth-failed", "Reinitializing due to auth 
failure.", RetryBackoffMs)
   } else if (state == KeeperState.Expired) {
 scheduleReinitialize("session-expired", "Session expired.", 
delayMs = 0L)
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index a0eb1ea..37954e6 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -648,9 +648,18 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 }
 
 zooKeeperClient.close()
-zooKeeperClient = newZooKeeperClient()
+@volatile var connectionStateOverride: Option[States] = None
+zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout,
+  zkMaxInFlightRequests, time, "testMetricGroup", "testMetricType", new 
ZKClientConfig, "ZooKeeperClientTest") {
+  override def connectionState: States = 
connectionStateOverride.getOrElse(super.connectionState)
+}
 zooKeeperClient.registerStateChangeHandler(changeHandler)
 
+connectionStateOverride = Some(States.CONNECTED)
+zooKeeperClient.ZooKeeperClientWatcher.process(new 
WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
+assertFalse(sessionInitializedCountDownLatch.await(10, 
TimeUnit.MILLISECONDS), "Unexpected session initialization when connection is 
alive")
+
+connectionStateOverride = Some(States.AUTH_FAILED)
 zooKeeperClient.ZooKeeperClientWatcher.process(new 
WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
 assertTrue(sessionInitializedCountDownLatch.await(5, TimeUnit.SECONDS), 
"Failed to receive session initializing notification")
   }


[kafka] branch trunk updated (acd1f9c -> 065fba9)

2021-12-14 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from acd1f9c  KAFKA-13522: add position tracking and bounding to IQv2 
(#11581)
 add 065fba9  KAFKA-13539: Improve propagation and processing of SSL 
handshake failures (#11597)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/common/network/Selector.java  |  2 +-
 .../kafka/common/network/SslTransportLayer.java| 59 ++
 .../kafka/common/network/NetworkTestUtils.java |  8 ++-
 .../common/network/SslTransportLayerTest.java  | 18 ++-
 4 files changed, 74 insertions(+), 13 deletions(-)


[kafka] branch trunk updated (b4602e8 -> 0e150a4)

2021-12-15 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from b4602e8  MINOR: Timeout waitForBlock in connect BlockingConnectorTest 
(#11595)
 add 0e150a4  MINOR: Reset java.security.auth.login.config in ZK-tests to 
avoid config reload affecting subsequent tests (#11602)

No new revisions were added by this update.

Summary of changes:
 core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala | 1 +
 1 file changed, 1 insertion(+)


[kafka] branch trunk updated (c807980 -> 8ed271e)

2021-07-26 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from c807980  MINOR: Update `./gradlew allDepInsight` example in README 
(#11125)
 add 8ed271e  KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up 
testings (#11002)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/producer/ProducerConfig.java |  2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 83 ++
 .../sanity_checks/test_verifiable_producer.py  |  8 ++-
 3 files changed, 90 insertions(+), 3 deletions(-)


[kafka] branch 3.0 updated: KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002)

2021-07-26 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
 new 5fde508  KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up 
testings (#11002)
5fde508 is described below

commit 5fde508731e000f528b7995de5f37602639d84df
Author: Cheng Tan <31675100+d8tlt...@users.noreply.github.com>
AuthorDate: Mon Jul 26 13:45:59 2021 -0700

KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002)

Reviewers: Rajini Sivaram 
---
 .../kafka/clients/producer/ProducerConfig.java |  2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 83 ++
 .../sanity_checks/test_verifiable_producer.py  |  8 ++-
 3 files changed, 90 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 970a70b..0492fbf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -286,7 +286,7 @@ public class ProducerConfig extends AbstractConfig {
 Type.STRING,
 "all",
 in("all", "-1", "0", "1"),
-Importance.HIGH,
+Importance.LOW,
 ACKS_DOC)
 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, 
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index f055e12..2784f19 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -173,6 +173,89 @@ public class KafkaProducerTest {
 }
 
 @Test
+public void testAcksAndIdempotenceForIdempotentProducers() {
+Properties baseProps = new Properties() {{
+setProperty(
+ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+setProperty(
+ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+setProperty(
+ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+}};
+
+Properties validProps = new Properties() {{
+putAll(baseProps);
+setProperty(ProducerConfig.ACKS_CONFIG, "0");
+setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+}};
+ProducerConfig config = new ProducerConfig(validProps);
+assertFalse(
+config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+"idempotence should be overwritten");
+assertEquals(
+"0",
+config.getString(ProducerConfig.ACKS_CONFIG),
+"acks should be overwritten");
+
+Properties validProps2 = new Properties() {{
+putAll(baseProps);
+setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"transactionalId");
+}};
+config = new ProducerConfig(validProps2);
+assertTrue(
+config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+"idempotence should be set with the default value");
+assertEquals(
+"-1",
+config.getString(ProducerConfig.ACKS_CONFIG),
+"acks should be set with the default value");
+
+Properties validProps3 = new Properties() {{
+putAll(baseProps);
+setProperty(ProducerConfig.ACKS_CONFIG, "all");
+setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+}};
+config = new ProducerConfig(validProps3);
+
assertFalse(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+"idempotence should be overwritten");
+assertEquals(
+"-1",
+config.getString(ProducerConfig.ACKS_CONFIG),
+"acks should be overwritten");
+
+Properties invalidProps = new Properties() {{
+putAll(baseProps);
+ 

[kafka] branch trunk updated: MINOR: greatly improve test runtime by unblocking purgatory and quota manager threads (#11653)

2022-01-06 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 0b9a8ba  MINOR: greatly improve test runtime by unblocking purgatory 
and quota manager threads (#11653)
0b9a8ba is described below

commit 0b9a8bac36f16b5397e9ec3a0441758e4b60a384
Author: Lucas Bradstreet 
AuthorDate: Thu Jan 6 01:22:19 2022 -0800

MINOR: greatly improve test runtime by unblocking purgatory and quota 
manager threads (#11653)

Reviewers: Rajini Sivaram 
---
 core/src/main/scala/kafka/server/ClientQuotaManager.scala | 12 +++-
 core/src/main/scala/kafka/server/DelayedOperation.scala   | 11 +--
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5860cfc..7334519 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -562,8 +562,18 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
   quotaMetricTags.asJava)
   }
 
+  def initiateShutdown(): Unit = {
+throttledChannelReaper.initiateShutdown()
+// improve shutdown time by waking up any ShutdownableThread(s) blocked on 
poll by sending a no-op
+delayQueue.add(new ThrottledChannel(time, 0, new ThrottleCallback {
+  override def startThrottling(): Unit = {}
+  override def endThrottling(): Unit = {}
+}))
+  }
+
   def shutdown(): Unit = {
-throttledChannelReaper.shutdown()
+initiateShutdown()
+throttledChannelReaper.awaitShutdown()
   }
 
   class DefaultQuotaCallback extends ClientQuotaCallback {
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 251dd28..1151e65 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -328,8 +328,15 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
* Shutdown the expire reaper thread
*/
   def shutdown(): Unit = {
-if (reaperEnabled)
-  expirationReaper.shutdown()
+if (reaperEnabled) {
+  expirationReaper.initiateShutdown()
+  // improve shutdown time by waking up any ShutdownableThread(s) blocked 
on poll by sending a no-op
+  timeoutTimer.add(new TimerTask {
+override val delayMs: Long = 0
+override def run(): Unit = {}
+  })
+  expirationReaper.awaitShutdown()
+}
 timeoutTimer.shutdown()
 removeMetric("PurgatorySize", metricsTags)
 removeMetric("NumDelayedOperations", metricsTags)


[kafka] branch trunk updated (8e205b5 -> bb60eb8)

2022-03-30 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 8e205b5  KAFKA-13719: Fix connector restart cause duplicate tasks 
(#11869)
 add bb60eb8  MINOR: Increase wait in ZooKeeperClientTest (#11973)

No new revisions were added by this update.

Summary of changes:
 core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[kafka] branch trunk updated: [KAFKA-15117] In TestSslUtils set SubjectAlternativeNames to null if there are no hostnames (#14440)

2023-09-25 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new d8f358facc2 [KAFKA-15117] In TestSslUtils set SubjectAlternativeNames 
to null if there are no hostnames (#14440)
d8f358facc2 is described below

commit d8f358facc2a5405d08977f922bc0b1dae8f114e
Author: Purshotam Chauhan 
AuthorDate: Mon Sep 25 22:13:01 2023 +0530

[KAFKA-15117] In TestSslUtils set SubjectAlternativeNames to null if there 
are no hostnames (#14440)

We are currently encoding an empty hostNames array to subjectAltName in the 
keystore. While parsing the certificates in the test this causes the issue - 
Unparseable SubjectAlternativeName extension due to java.io.IOException: No 
data available in passed DER encoded value. Up to Java 17, this parsing error 
was ignored. This PR assigns subjectAltName to null if hostnames are empty.

Co-authored-by: Ismael Juma 
Reviewers: Rajini Sivaram 
---
 .../apache/kafka/common/network/SslTransportLayerTest.java   |  3 ---
 .../src/test/java/org/apache/kafka/test/TestSslUtils.java| 12 
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index f49bf868a46..26987e30da8 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -36,8 +36,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.condition.DisabledOnJre;
-import org.junit.jupiter.api.condition.JRE;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -195,7 +193,6 @@ public class SslTransportLayerTest {
  */
 @ParameterizedTest
 @ArgumentsSource(SslTransportLayerArgumentsProvider.class)
-@DisabledOnJre(value = {JRE.JAVA_20, JRE.JAVA_21}, disabledReason = 
"KAFKA-15117")
 public void testValidEndpointIdentificationCN(Args args) throws Exception {
 args.serverCertStores = certBuilder(true, "localhost", 
args.useInlinePem).build();
 args.clientCertStores = certBuilder(false, "localhost", 
args.useInlinePem).build();
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 6b7c16b0335..1181fc2 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -399,10 +399,14 @@ public class TestSslUtils {
 }
 
 public CertificateBuilder sanDnsNames(String... hostNames) throws 
IOException {
-GeneralName[] altNames = new GeneralName[hostNames.length];
-for (int i = 0; i < hostNames.length; i++)
-altNames[i] = new GeneralName(GeneralName.dNSName, 
hostNames[i]);
-subjectAltName = GeneralNames.getInstance(new 
DERSequence(altNames)).getEncoded();
+if (hostNames.length > 0) {
+GeneralName[] altNames = new GeneralName[hostNames.length];
+for (int i = 0; i < hostNames.length; i++)
+altNames[i] = new GeneralName(GeneralName.dNSName, 
hostNames[i]);
+subjectAltName = GeneralNames.getInstance(new 
DERSequence(altNames)).getEncoded();
+} else {
+subjectAltName = null;
+}
 return this;
 }
 



[kafka] branch trunk updated: KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … (#14457)

2023-09-28 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new dedfed06f7a KAFKA-15510: Fix follower's lastFetchedEpoch when fetch 
response has … (#14457)
dedfed06f7a is described below

commit dedfed06f7a472424080456c997f5200c6bef196
Author: chern 
AuthorDate: Thu Sep 28 06:14:42 2023 -0700

KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has … 
(#14457)

When a fetch response has no record for a partition, validBytes is 0. We 
shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala 
since there is no record and it is Optional.empty. We should use 
currentFetchState.lastFetchedEpoch instead.

Reviewers: Divij Vaidya , Viktor Somogyi-Vass 
, Kamal 
Chandraprakash, Rajini Sivaram 

---
 core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 5 +++--
 core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala | 4 +++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1dafb89ef0a..450fcfea461 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -364,10 +364,11 @@ abstract class AbstractFetcherThread(name: String,
 
 // ReplicaDirAlterThread may have removed 
topicPartition from the partitionStates after processing the partition data
 if ((validBytes > 0 || currentFetchState.lag.isEmpty) 
&& partitionStates.contains(topicPartition)) {
+  val lastFetchedEpoch =
+if (logAppendInfo.lastLeaderEpoch.isPresent) 
logAppendInfo.lastLeaderEpoch.asScala else currentFetchState.lastFetchedEpoch
   // Update partitionStates only if there is no 
exception during processPartitionData
   val newFetchState = 
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-currentFetchState.currentLeaderEpoch, state = 
Fetching,
-logAppendInfo.lastLeaderEpoch.asScala)
+currentFetchState.currentLeaderEpoch, state = 
Fetching, lastFetchedEpoch)
   partitionStates.updateAndMoveToEnd(topicPartition, 
newFetchState)
   if (validBytes > 0) 
fetcherStats.byteRate.mark(validBytes)
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index e58532622e3..6a0feaa6456 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -749,9 +749,10 @@ class ReplicaFetcherThreadTest {
 val log: UnifiedLog = mock(classOf[UnifiedLog])
 val partition: Partition = mock(classOf[Partition])
 val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+val lastFetchedEpoch = 2
 
 when(log.highWatermark).thenReturn(0)
-when(log.latestEpoch).thenReturn(Some(0))
+when(log.latestEpoch).thenReturn(Some(lastFetchedEpoch))
 when(log.endOffsetForEpoch(0)).thenReturn(Some(new OffsetAndEpoch(0, 0)))
 when(log.logEndOffset).thenReturn(0)
 when(log.maybeUpdateHighWatermark(0)).thenReturn(None)
@@ -835,6 +836,7 @@ class ReplicaFetcherThreadTest {
 
 // Lag is set to Some(0).
 assertEquals(Some(0), thread.fetchState(t1p0).flatMap(_.lag))
+assertEquals(Some(lastFetchedEpoch), 
thread.fetchState(t1p0).flatMap(_.lastFetchedEpoch))
   }
 
   @Test



[kafka] branch 3.3 updated: KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)

2022-07-14 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
 new 66000787c1 KAFKA-13043: Implement Admin APIs for offsetFetch batching 
(#10964)
66000787c1 is described below

commit 66000787c1146c6d08d88b9c564f5a000608f013
Author: Sanjana Kaundinya 
AuthorDate: Thu Jul 14 05:47:34 2022 -0700

KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)

This implements the AdminAPI portion of KIP-709: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The 
request/response protocol changes were implemented in 3.0.0. A new batched API 
has been introduced to list consumer offsets for different groups. For brokers 
older than 3.0.0, separate requests are sent for each group.

Co-authored-by: Rajini Sivaram 
Co-authored-by: David Jacot 

Reviewers: David Jacot ,  Rajini Sivaram 

---
 .../java/org/apache/kafka/clients/admin/Admin.java |  36 ++-
 .../kafka/clients/admin/KafkaAdminClient.java  |  11 +-
 .../admin/ListConsumerGroupOffsetsOptions.java |  14 +-
 .../admin/ListConsumerGroupOffsetsResult.java  |  56 +++-
 .../admin/ListConsumerGroupOffsetsSpec.java|  79 ++
 .../clients/admin/internals/AdminApiDriver.java|   3 +-
 .../admin/internals/CoordinatorStrategy.java   |   4 +
 .../internals/ListConsumerGroupOffsetsHandler.java | 128 +
 .../kafka/common/requests/OffsetFetchResponse.java |  10 +-
 .../kafka/clients/admin/AdminClientTestUtils.java  |  12 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 206 --
 .../kafka/clients/admin/MockAdminClient.java   |  16 +-
 .../ListConsumerGroupOffsetsHandlerTest.java   | 308 +++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   6 +-
 .../internals/ConsumerCoordinatorTest.java |  26 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |   8 +-
 .../kafka/admin/ConsumerGroupServiceTest.scala |  22 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../processor/internals/StoreChangelogReader.java  |  12 +-
 .../internals/StoreChangelogReaderTest.java|  11 +-
 20 files changed, 813 insertions(+), 157 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fdacc09db8..0698d29702 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
 
 import java.time.Duration;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable {
  * @param options The options to use when listing the consumer group 
offsets.
  * @return The ListGroupOffsetsResult
  */
-ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, 
ListConsumerGroupOffsetsOptions options);
+default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
+ListConsumerGroupOffsetsOptions listOptions = new 
ListConsumerGroupOffsetsOptions()
+.requireStable(options.requireStable());
+@SuppressWarnings("deprecation")
+ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
+.topicPartitions(options.topicPartitions());
+return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), listOptions);
+}
 
 /**
  * List the consumer group offsets available in the cluster with the 
default options.
  * 
- * This is a convenience method for {@link 
#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with 
default options.
+ * This is a convenience method for {@link #listConsumerGroupOffsets(Map, 
ListConsumerGroupOffsetsOptions)}
+ * to list offsets of all partitions of one group with default options.
  *
  * @return The ListGroupOffsetsResult.
  */
@@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable {
 return listConsumerGroupOffsets(groupId, new 
ListConsumerGroupOffsetsOptions());
 }
 
+/**
+ * List the consumer group offsets available in the cluster for the 
specified consumer groups.
+ *
+ * @param groupSpecs Map of consumer group ids to a spec that specifies 
the topic partitions of the group to list offsets for.
+ *
+ * @param options The options to use when listing the consumer group 
offsets.
+ * @return The ListConsumerGroupOffsetsResult
+ */
+ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map 

[kafka] branch trunk updated: KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)

2022-07-14 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new beac86f049 KAFKA-13043: Implement Admin APIs for offsetFetch batching 
(#10964)
beac86f049 is described below

commit beac86f049385932309158c1cb49c8657e53f45f
Author: Sanjana Kaundinya 
AuthorDate: Thu Jul 14 05:47:34 2022 -0700

KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)

This implements the AdminAPI portion of KIP-709: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The 
request/response protocol changes were implemented in 3.0.0. A new batched API 
has been introduced to list consumer offsets for different groups. For brokers 
older than 3.0.0, separate requests are sent for each group.

Co-authored-by: Rajini Sivaram 
Co-authored-by: David Jacot 

Reviewers: David Jacot ,  Rajini Sivaram 

---
 .../java/org/apache/kafka/clients/admin/Admin.java |  36 ++-
 .../kafka/clients/admin/KafkaAdminClient.java  |  11 +-
 .../admin/ListConsumerGroupOffsetsOptions.java |  14 +-
 .../admin/ListConsumerGroupOffsetsResult.java  |  56 +++-
 .../admin/ListConsumerGroupOffsetsSpec.java|  79 ++
 .../clients/admin/internals/AdminApiDriver.java|   3 +-
 .../admin/internals/CoordinatorStrategy.java   |   4 +
 .../internals/ListConsumerGroupOffsetsHandler.java | 128 +
 .../kafka/common/requests/OffsetFetchResponse.java |  10 +-
 .../kafka/clients/admin/AdminClientTestUtils.java  |  12 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 206 --
 .../kafka/clients/admin/MockAdminClient.java   |  16 +-
 .../ListConsumerGroupOffsetsHandlerTest.java   | 308 +++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   6 +-
 .../internals/ConsumerCoordinatorTest.java |  26 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |   8 +-
 .../kafka/admin/ConsumerGroupServiceTest.scala |  22 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../processor/internals/StoreChangelogReader.java  |  12 +-
 .../internals/StoreChangelogReaderTest.java|  11 +-
 20 files changed, 813 insertions(+), 157 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fdacc09db8..0698d29702 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
 
 import java.time.Duration;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable {
  * @param options The options to use when listing the consumer group 
offsets.
  * @return The ListGroupOffsetsResult
  */
-ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, 
ListConsumerGroupOffsetsOptions options);
+default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
+ListConsumerGroupOffsetsOptions listOptions = new 
ListConsumerGroupOffsetsOptions()
+.requireStable(options.requireStable());
+@SuppressWarnings("deprecation")
+ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
+.topicPartitions(options.topicPartitions());
+return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), listOptions);
+}
 
 /**
  * List the consumer group offsets available in the cluster with the 
default options.
  * 
- * This is a convenience method for {@link 
#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with 
default options.
+ * This is a convenience method for {@link #listConsumerGroupOffsets(Map, 
ListConsumerGroupOffsetsOptions)}
+ * to list offsets of all partitions of one group with default options.
  *
  * @return The ListGroupOffsetsResult.
  */
@@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable {
 return listConsumerGroupOffsets(groupId, new 
ListConsumerGroupOffsetsOptions());
 }
 
+/**
+ * List the consumer group offsets available in the cluster for the 
specified consumer groups.
+ *
+ * @param groupSpecs Map of consumer group ids to a spec that specifies 
the topic partitions of the group to list offsets for.
+ *
+ * @param options The options to use when listing the consumer group 
offsets.
+ * @return The ListConsumerGroupOffsetsResult
+ */
+ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map 

[kafka] branch trunk updated: MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)

2022-07-15 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ddbc030036 MINOR: Fix options for old-style 
Admin.listConsumerGroupOffsets (#12406)
ddbc030036 is described below

commit ddbc0300365dd1d9a2fb2c73faef8c4cbec0b316
Author: Rajini Sivaram 
AuthorDate: Fri Jul 15 09:21:35 2022 +0100

MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)

Reviewers: David Jacot 
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  7 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 52 +++---
 2 files changed, 40 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 0698d29702..1d469a6643 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -921,12 +921,13 @@ public interface Admin extends AutoCloseable {
  * @return The ListGroupOffsetsResult
  */
 default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
-ListConsumerGroupOffsetsOptions listOptions = new 
ListConsumerGroupOffsetsOptions()
-.requireStable(options.requireStable());
 @SuppressWarnings("deprecation")
 ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
 .topicPartitions(options.topicPartitions());
-return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), listOptions);
+
+// We can use the provided options with the batched API, which uses 
topic partitions from
+// the group spec and ignores any topic partitions set in the options.
+return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), options);
 }
 
 /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3d285a45f7..de57813679 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -131,6 +131,8 @@ import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -3075,7 +3077,17 @@ public class KafkaAdminClientTest {
 }
 
 @Test
-public void testListConsumerGroupOffsetsOptions() throws Exception {
+public void testListConsumerGroupOffsetsOptionsWithUnbatchedApi() throws 
Exception {
+verifyListConsumerGroupOffsetsOptions(false);
+}
+
+@Test
+public void testListConsumerGroupOffsetsOptionsWithBatchedApi() throws 
Exception {
+verifyListConsumerGroupOffsetsOptions(true);
+}
+
+@SuppressWarnings("deprecation")
+private void verifyListConsumerGroupOffsetsOptions(boolean batchedApi) 
throws Exception {
 final Cluster cluster = mockCluster(3, 0);
 final Time time = new MockTime();
 
@@ -3085,24 +3097,32 @@ public class KafkaAdminClientTest {
 
 
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
-final TopicPartition tp1 = new TopicPartition("A", 0);
+final List partitions = 
Collections.singletonList(new TopicPartition("A", 0));
 final ListConsumerGroupOffsetsOptions options = new 
ListConsumerGroupOffsetsOptions()
-.requireStable(true);
-final ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
-.topicPartitions(Collections.singletonList(tp1));
-
env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, 
groupSpec), options);
+.requireStable(true)
+.timeoutMs(300);
+if (batchedApi) {
+final ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
+.topicPartitions(partit

[kafka] branch 3.3 updated: MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)

2022-07-15 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
 new 610780668e MINOR: Fix options for old-style 
Admin.listConsumerGroupOffsets (#12406)
610780668e is described below

commit 610780668efa7c1e8d1be193985eb6e4d971fa0a
Author: Rajini Sivaram 
AuthorDate: Fri Jul 15 09:21:35 2022 +0100

MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)

Reviewers: David Jacot 
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  7 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 52 +++---
 2 files changed, 40 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 0698d29702..1d469a6643 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -921,12 +921,13 @@ public interface Admin extends AutoCloseable {
  * @return The ListGroupOffsetsResult
  */
 default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
-ListConsumerGroupOffsetsOptions listOptions = new 
ListConsumerGroupOffsetsOptions()
-.requireStable(options.requireStable());
 @SuppressWarnings("deprecation")
 ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
 .topicPartitions(options.topicPartitions());
-return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), listOptions);
+
+// We can use the provided options with the batched API, which uses 
topic partitions from
+// the group spec and ignores any topic partitions set in the options.
+return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), options);
 }
 
 /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3d285a45f7..de57813679 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -131,6 +131,8 @@ import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -3075,7 +3077,17 @@ public class KafkaAdminClientTest {
 }
 
 @Test
-public void testListConsumerGroupOffsetsOptions() throws Exception {
+public void testListConsumerGroupOffsetsOptionsWithUnbatchedApi() throws 
Exception {
+verifyListConsumerGroupOffsetsOptions(false);
+}
+
+@Test
+public void testListConsumerGroupOffsetsOptionsWithBatchedApi() throws 
Exception {
+verifyListConsumerGroupOffsetsOptions(true);
+}
+
+@SuppressWarnings("deprecation")
+private void verifyListConsumerGroupOffsetsOptions(boolean batchedApi) 
throws Exception {
 final Cluster cluster = mockCluster(3, 0);
 final Time time = new MockTime();
 
@@ -3085,24 +3097,32 @@ public class KafkaAdminClientTest {
 
 
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
-final TopicPartition tp1 = new TopicPartition("A", 0);
+final List partitions = 
Collections.singletonList(new TopicPartition("A", 0));
 final ListConsumerGroupOffsetsOptions options = new 
ListConsumerGroupOffsetsOptions()
-.requireStable(true);
-final ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
-.topicPartitions(Collections.singletonList(tp1));
-
env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, 
groupSpec), options);
+.requireStable(true)
+.timeoutMs(300);
+if (batchedApi) {
+final ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
+.topicPartitions(partit

[kafka] branch trunk updated: KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131)

2022-05-10 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new eeb1e702eb KAFKA-13879: Reconnect exponential backoff is ineffective 
in some cases (#12131)
eeb1e702eb is described below

commit eeb1e702eb7a43d88f11458f739672e2b7aa4871
Author: chern 
AuthorDate: Tue May 10 03:36:42 2022 -0700

KAFKA-13879: Reconnect exponential backoff is ineffective in some cases 
(#12131)

When a client connects to a SSL listener using PLAINTEXT security protocol, 
after the TCP connection is setup, the client considers the channel setup is 
complete. In reality the channel setup is not complete yet. The client then 
resets reconnect exponential backoff and issues API version request. Since the 
broker expects SSL handshake, the API version request will cause the connection 
to disconnect. Client reconnects without exponential backoff since it has been 
reset.

This commit removes the reset of reconnect exponential backoff when sending 
API version request. In the good case where the channel setup is complete, 
reconnect exponential backoff will be reset when the node becomes ready, which 
is after getting the API version response. Inter-broker clients which do not 
send API version request and go directly to ready state continue to reset 
backoff before any  successful requests.

Reviewers: Rajini Sivaram 
---
 .../kafka/clients/ClusterConnectionStates.java |  1 -
 .../kafka/clients/ClusterConnectionStatesTest.java | 38 ++
 .../apache/kafka/clients/NetworkClientTest.java| 31 ++
 3 files changed, 48 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 95efdbeae4..f4d9092258 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -246,7 +246,6 @@ final class ClusterConnectionStates {
 public void checkingApiVersions(String id) {
 NodeConnectionState nodeState = nodeState(id);
 nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
-resetReconnectBackoff(nodeState);
 resetConnectionSetupTimeout(nodeState);
 connectingNodes.remove(id);
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 72cc123921..96fe89ca11 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -231,20 +231,8 @@ public class ClusterConnectionStatesTest {
 
 @Test
 public void testExponentialReconnectBackoff() {
-double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / 
(double) Math.max(reconnectBackoffMs, 1))
-/ Math.log(reconnectBackoffExpBase);
-
-// Run through 10 disconnects and check that reconnect backoff value 
is within expected range for every attempt
-for (int i = 0; i < 10; i++) {
-connectionStates.connecting(nodeId1, time.milliseconds(), 
"localhost");
-connectionStates.disconnected(nodeId1, time.milliseconds());
-// Calculate expected backoff value without jitter
-long expectedBackoff = 
Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, 
reconnectBackoffMaxExp))
-* reconnectBackoffMs);
-long currentBackoff = connectionStates.connectionDelay(nodeId1, 
time.milliseconds());
-assertEquals(expectedBackoff, currentBackoff, 
reconnectBackoffJitter * expectedBackoff);
-time.sleep(connectionStates.connectionDelay(nodeId1, 
time.milliseconds()) + 1);
-}
+verifyReconnectExponentialBackoff(false);
+verifyReconnectExponentialBackoff(true);
 }
 
 @Test
@@ -426,4 +414,26 @@ public class ClusterConnectionStatesTest {
 this.connectionStates = new 
ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
 connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new 
LogContext(), this.multipleIPHostResolver);
 }
+
+private void verifyReconnectExponentialBackoff(boolean 
enterCheckingApiVersionState) {
+double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / 
(double) Math.max(reconnectBackoffMs, 1))
+/ Math.log(reconnectBackoffExpBase);
+
+connectionStates.remove(nodeId1);
+// Run through 10 disconnects and check that reconnect backoff value 
is within expected range for every attempt
+for (int i =

[kafka] branch trunk updated: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle. (#12416)

2022-08-15 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new d529d86aa4b KAFKA-13559: Fix issue where responses intermittently 
takes 300+ ms to respond, even when the server is idle. (#12416)
d529d86aa4b is described below

commit d529d86aa4be533d1251cfc0b4c0fb57c69ace72
Author: Badai Aqrandista 
AuthorDate: Mon Aug 15 21:34:03 2022 +1000

KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to 
respond, even when the server is idle. (#12416)

Ensures that SSL buffered data is processed by server immediately on the 
next poll when channel is unmuted after processing previous request. Poll 
timeout is reset to zero for this case to avoid 300ms delay in poll() if no new 
data arrives on the sockets.

Reviewers: David Mao , Ismael Juma , 
Rajini Sivaram 
---
 .../org/apache/kafka/common/network/Selector.java  |  1 +
 .../unit/kafka/network/SocketServerTest.scala  | 44 +-
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index bd1175a8ee0..2e581187625 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -757,6 +757,7 @@ public class Selector implements Selectable, AutoCloseable {
 explicitlyMutedChannels.remove(channel);
 if (channel.hasBytesBuffered()) {
 keysWithBufferedRead.add(channel.selectionKey());
+madeReadProgressLastPoll = true;
 }
 }
 }
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 98f92d61ff2..801a2d83cab 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -1878,6 +1878,44 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Test to ensure "Selector.poll()" does not block at "select(timeout)" when 
there is no data in the socket but there
+   * is data in the buffer. This only happens when SSL protocol is used.
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+props ++= sslServerProps
+val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+testableServer.enableRequestProcessing(Map.empty)
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+val selectTimeoutMs = 5000
+// set pollTimeoutOverride to "selectTimeoutMs" to ensure poll() timeout 
is distinct and can be identified
+testableSelector.pollTimeoutOverride = Some(selectTimeoutMs)
+
+try {
+  // initiate SSL connection by sending 1 request via socket, then send 2 
requests directly into the netReadBuffer
+  val (sslSocket, req1) = makeSocketWithBufferedRequests(testableServer, 
testableSelector, proxyServer)
+
+  // force all data to be transferred to the kafka broker by closing the 
client connection to proxy server
+  sslSocket.close()
+  TestUtils.waitUntilTrue(() => proxyServer.clientConnSocket.isClosed, 
"proxyServer.clientConnSocket is still not closed after 6 ms", 6)
+
+  // process the request and send the response
+  processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+  // process the requests in the netReadBuffer, this should not block
+  val req2 = receiveRequest(testableServer.dataPlaneRequestChannel)
+  processRequest(testableServer.dataPlaneRequestChannel, req2)
+
+} finally {
+  proxyServer.close()
+  shutdownServerAndMetrics(testableServer)
+}
+  }
+
   private def sslServerProps: Properties = {
 val trustStoreFile = File.createTempFile("truststore", ".jks")
 val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, 
interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
@@ -2044,10 +2082,12 @@ class SocketServerTest {
 }
 
 def testableSelector: TestableSelector =
-  
dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]
+  testableProcessor.selector.asInstanceOf[TestableSelector]
 
-def testableProcessor: TestableProcessor =
+def testableProcessor: TestableProcessor = {
+  val endpoint = this.config.dataPlaneListeners.head
   
dataPlaneAcceptors.get(endpoint).processors(0).asInstanceOf[TestableProcessor]
+}
 
 def waitForChannelClose(connectionId: String, locallyClosed: Boolean): 
Unit = {
   val selector = testableSelector



[kafka] branch 3.4 updated: KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954)

2022-12-07 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
 new f5f1060a81d KAFKA-14352: Rack-aware consumer partition assignment 
protocol changes (KIP-881) (#12954)
f5f1060a81d is described below

commit f5f1060a81dd200e573ef69574ea75b64b73474c
Author: Rajini Sivaram 
AuthorDate: Wed Dec 7 11:41:21 2022 +

KAFKA-14352: Rack-aware consumer partition assignment protocol changes 
(KIP-881) (#12954)

Reviewers: David Jacot 
---
 checkstyle/suppressions.xml|  2 +-
 .../consumer/ConsumerPartitionAssignor.java| 15 +++--
 .../kafka/clients/consumer/KafkaConsumer.java  |  3 +-
 .../consumer/internals/ConsumerCoordinator.java|  8 ++-
 .../consumer/internals/ConsumerProtocol.java   |  5 +-
 .../common/message/ConsumerProtocolAssignment.json |  3 +-
 .../message/ConsumerProtocolSubscription.json  |  6 +-
 .../consumer/CooperativeStickyAssignorTest.java|  8 +--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  3 +-
 .../kafka/clients/consumer/StickyAssignorTest.java |  8 +--
 .../internals/ConsumerCoordinatorTest.java | 67 +++---
 .../consumer/internals/ConsumerProtocolTest.java   | 23 ++--
 .../kafka/api/PlaintextConsumerTest.scala  | 17 ++
 13 files changed, 136 insertions(+), 32 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ac0accc17fa..cea6a193790 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -66,7 +66,7 @@
 
 
+  files="(KafkaConsumer|ConsumerCoordinator).java"/>
 
  topics;
 private final ByteBuffer userData;
 private final List ownedPartitions;
+private final Optional rackId;
 private Optional groupInstanceId;
 private final Optional generationId;
 
-public Subscription(List topics, ByteBuffer userData, 
List ownedPartitions, int generationId) {
+public Subscription(List topics, ByteBuffer userData, 
List ownedPartitions, int generationId, Optional 
rackId) {
 this.topics = topics;
 this.userData = userData;
 this.ownedPartitions = ownedPartitions;
 this.groupInstanceId = Optional.empty();
 this.generationId = generationId < 0 ? Optional.empty() : 
Optional.of(generationId);
+this.rackId = rackId;
 }
 
 public Subscription(List topics, ByteBuffer userData, 
List ownedPartitions) {
-this(topics, userData, ownedPartitions, DEFAULT_GENERATION);
+this(topics, userData, ownedPartitions, DEFAULT_GENERATION, 
Optional.empty());
 }
 
 public Subscription(List topics, ByteBuffer userData) {
-this(topics, userData, Collections.emptyList(), 
DEFAULT_GENERATION);
+this(topics, userData, Collections.emptyList(), 
DEFAULT_GENERATION, Optional.empty());
 }
 
 public Subscription(List topics) {
-this(topics, null, Collections.emptyList(), DEFAULT_GENERATION);
+this(topics, null, Collections.emptyList(), DEFAULT_GENERATION, 
Optional.empty());
 }
 
 public List topics() {
@@ -138,6 +140,10 @@ public interface ConsumerPartitionAssignor {
 return ownedPartitions;
 }
 
+public Optional rackId() {
+return rackId;
+}
+
 public void setGroupInstanceId(Optional groupInstanceId) {
 this.groupInstanceId = groupInstanceId;
 }
@@ -158,6 +164,7 @@ public interface ConsumerPartitionAssignor {
 ", ownedPartitions=" + ownedPartitions +
 ", groupInstanceId=" + 
groupInstanceId.map(String::toString).orElse("null") +
 ", generationId=" + generationId.orElse(-1) +
+", rackId=" + (rackId.orElse("null")) +
 ")";
 }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f07846945a7..cf85798f82b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -790,7 +790,8 @@ public class KafkaConsumer implements Consumer {
 enableAutoCommit,
 
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
 this.interceptors,
-
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNS

[kafka] branch trunk updated: KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881) (#12954)

2022-12-07 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new d23ce20bdfb KAFKA-14352: Rack-aware consumer partition assignment 
protocol changes (KIP-881) (#12954)
d23ce20bdfb is described below

commit d23ce20bdfbe5a9598523961cb7cf747ce4f52ef
Author: Rajini Sivaram 
AuthorDate: Wed Dec 7 11:41:21 2022 +

KAFKA-14352: Rack-aware consumer partition assignment protocol changes 
(KIP-881) (#12954)

Reviewers: David Jacot 
---
 checkstyle/suppressions.xml|  2 +-
 .../consumer/ConsumerPartitionAssignor.java| 15 +++--
 .../kafka/clients/consumer/KafkaConsumer.java  |  3 +-
 .../consumer/internals/ConsumerCoordinator.java|  8 ++-
 .../consumer/internals/ConsumerProtocol.java   |  5 +-
 .../common/message/ConsumerProtocolAssignment.json |  3 +-
 .../message/ConsumerProtocolSubscription.json  |  6 +-
 .../consumer/CooperativeStickyAssignorTest.java|  8 +--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  3 +-
 .../kafka/clients/consumer/StickyAssignorTest.java |  8 +--
 .../internals/ConsumerCoordinatorTest.java | 67 +++---
 .../consumer/internals/ConsumerProtocolTest.java   | 23 ++--
 .../kafka/api/PlaintextConsumerTest.scala  | 17 ++
 13 files changed, 136 insertions(+), 32 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ac0accc17fa..cea6a193790 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -66,7 +66,7 @@
 
 
+  files="(KafkaConsumer|ConsumerCoordinator).java"/>
 
  topics;
 private final ByteBuffer userData;
 private final List ownedPartitions;
+private final Optional rackId;
 private Optional groupInstanceId;
 private final Optional generationId;
 
-public Subscription(List topics, ByteBuffer userData, 
List ownedPartitions, int generationId) {
+public Subscription(List topics, ByteBuffer userData, 
List ownedPartitions, int generationId, Optional 
rackId) {
 this.topics = topics;
 this.userData = userData;
 this.ownedPartitions = ownedPartitions;
 this.groupInstanceId = Optional.empty();
 this.generationId = generationId < 0 ? Optional.empty() : 
Optional.of(generationId);
+this.rackId = rackId;
 }
 
 public Subscription(List topics, ByteBuffer userData, 
List ownedPartitions) {
-this(topics, userData, ownedPartitions, DEFAULT_GENERATION);
+this(topics, userData, ownedPartitions, DEFAULT_GENERATION, 
Optional.empty());
 }
 
 public Subscription(List topics, ByteBuffer userData) {
-this(topics, userData, Collections.emptyList(), 
DEFAULT_GENERATION);
+this(topics, userData, Collections.emptyList(), 
DEFAULT_GENERATION, Optional.empty());
 }
 
 public Subscription(List topics) {
-this(topics, null, Collections.emptyList(), DEFAULT_GENERATION);
+this(topics, null, Collections.emptyList(), DEFAULT_GENERATION, 
Optional.empty());
 }
 
 public List topics() {
@@ -138,6 +140,10 @@ public interface ConsumerPartitionAssignor {
 return ownedPartitions;
 }
 
+public Optional rackId() {
+return rackId;
+}
+
 public void setGroupInstanceId(Optional groupInstanceId) {
 this.groupInstanceId = groupInstanceId;
 }
@@ -158,6 +164,7 @@ public interface ConsumerPartitionAssignor {
 ", ownedPartitions=" + ownedPartitions +
 ", groupInstanceId=" + 
groupInstanceId.map(String::toString).orElse("null") +
 ", generationId=" + generationId.orElse(-1) +
+", rackId=" + (rackId.orElse("null")) +
 ")";
 }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f07846945a7..cf85798f82b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -790,7 +790,8 @@ public class KafkaConsumer implements Consumer {
 enableAutoCommit,
 
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
 this.interceptors,
-
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNS

[kafka] branch trunk updated (3799708ff09 -> bc1ce9f0f1b)

2023-01-24 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 3799708ff09 KAFKA-14533: re-enable 'false' and disable the 'true' 
parameter of SmokeTestDriverIntegrationTest (#13156)
 add bc1ce9f0f1b KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially 
leaks secrets in logging (#13119)

No new revisions were added by this update.

Summary of changes:
 .../internals/secured/HttpAccessTokenRetriever.java | 17 -
 1 file changed, 12 insertions(+), 5 deletions(-)



[kafka] branch trunk updated: KAFKA-14770: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match (#13346)

2023-03-07 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 4d43abf1e09 KAFKA-14770: Allow dynamic keystore update for brokers if 
string representation of DN matches even if canonical DNs don't match (#13346)
4d43abf1e09 is described below

commit 4d43abf1e09e01fc5e7af52f65e3fbae02cf9771
Author: Rajini Sivaram 
AuthorDate: Tue Mar 7 09:41:01 2023 +

KAFKA-14770: Allow dynamic keystore update for brokers if string 
representation of DN matches even if canonical DNs don't match (#13346)

To avoid mistakes during dynamic broker config updates that could 
potentially affect clients, we restrict changes that can be performed 
dynamically without broker restart. For broker keystore updates, we require the 
DN to be the same for the old and new certificates since this could potentially 
contain host names used for host name verification by clients. DNs are compared 
using standard Java implementation of X500Principal.equals() which compares 
canonical names. If tags of fields ch [...]

Reviewers: Manikumar Reddy , Kalpesh Patel 

---
 .../kafka/common/security/ssl/SslFactory.java  |  8 -
 .../kafka/common/security/ssl/SslFactoryTest.java  | 38 ++
 .../java/org/apache/kafka/test/TestSslUtils.java   | 19 +--
 3 files changed, 62 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index d0cc4cc1e69..65c37aa6b47 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -312,7 +312,13 @@ public class SslFactory implements Reconfigurable, 
Closeable {
 for (int i = 0; i < newEntries.size(); i++) {
 CertificateEntries newEntry = newEntries.get(i);
 CertificateEntries oldEntry = oldEntries.get(i);
-if (!Objects.equals(newEntry.subjectPrincipal, 
oldEntry.subjectPrincipal)) {
+Principal newPrincipal = newEntry.subjectPrincipal;
+Principal oldPrincipal = oldEntry.subjectPrincipal;
+// Compare principal objects to compare canonical names (e.g. 
to ignore leading/trailing whitespaces).
+// Canonical names may differ if the tags of a field changes 
from one with a printable string representation
+// to one without or vice-versa due to optional conversion to 
hex representation based on the tag. So we
+// also compare Principal.getName which compares the RFC2253 
name. If either matches, allow dynamic update.
+if (!Objects.equals(newPrincipal, oldPrincipal) && 
!newPrincipal.getName().equalsIgnoreCase(oldPrincipal.getName())) {
 throw new ConfigException(String.format("Keystore 
DistinguishedName does not match: " +
 " existing={alias=%s, DN=%s}, new={alias=%s, DN=%s}",
 oldEntry.alias, oldEntry.subjectPrincipal, 
newEntry.alias, newEntry.subjectPrincipal));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 21dcd6e4b0f..7ac707b5de7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -20,7 +20,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.security.GeneralSecurityException;
+import java.security.KeyPair;
 import java.security.KeyStore;
+import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.Map;
 
@@ -46,6 +48,7 @@ import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.kafka.common.security.ssl.SslFactory.CertificateEntries.ensureCompatible;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -521,6 +524,41 @@ public abstract class SslFactoryTest {
 
assertFalse(securityConfig.unused().contains(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG));
 }
 
+@Test
+public void testDynamicUpdateCompatibility() throws Exception {
+KeyPair keyPair = TestSslUtils.generateKeyPair("RSA");
+KeyStore ks = createKeyStore(keyPair, "*.example.com", "Kafka", true, 
"localhost", "*.example.com");
+ensureCompatible(ks, k

[kafka] branch trunk updated (20e05695f95 -> 1401769d92f)

2023-03-13 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 20e05695f95 KAFKA-14447: remove stale TODO comment (#13258)
 add 1401769d92f KAFKA-14452: Refactor AbstractStickyAssignor to prepare 
for rack-aware assignment (#13349)

No new revisions were added by this update.

Summary of changes:
 .../consumer/internals/AbstractStickyAssignor.java | 1801 ++--
 1 file changed, 911 insertions(+), 890 deletions(-)



[kafka] branch trunk updated: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881) (#12990)

2023-03-01 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 98d84b17f74 KAFKA-14451: Rack-aware consumer partition assignment for 
RangeAssignor (KIP-881) (#12990)
98d84b17f74 is described below

commit 98d84b17f74b0bfe65163d0ddf88976746de5f7e
Author: Rajini Sivaram 
AuthorDate: Wed Mar 1 21:01:35 2023 +

KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor 
(KIP-881) (#12990)

Best-effort rack alignment for range assignor when both consumer racks and 
partition racks are available with the protocol changes introduced in KIP-881. 
Rack-aware assignment is enabled by configuring client.rack for consumers. 
Balanced assignment per topic is prioritized over rack-alignment. For topics 
with equal partitions and the same set of subscribers, co-partitioning is 
prioritized over rack-alignment.

Reviewers: David Jacot 
---
 .../kafka/clients/consumer/RangeAssignor.java  | 229 +--
 .../internals/AbstractPartitionAssignor.java   |  66 +++-
 .../kafka/clients/consumer/internals/Utils.java|   4 +-
 .../kafka/clients/consumer/RangeAssignorTest.java  | 436 -
 .../clients/consumer/RoundRobinAssignorTest.java   |  11 +-
 .../internals/AbstractPartitionAssignorTest.java   | 169 
 .../kafka/api/PlaintextConsumerTest.scala  |  36 +-
 .../server/FetchFromFollowerIntegrationTest.scala  |  66 +++-
 8 files changed, 852 insertions(+), 165 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index aec0d3997c4..0b3071a4915 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -17,13 +17,27 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The range assignor works on a per-topic basis. For each topic, we lay 
out the available partitions in numeric order
@@ -63,9 +77,26 @@ import java.util.Map;
  * I0: [t0p0, t0p1, t1p0, t1p1]
  * I1: [t0p2, t1p2]
  * 
+ * 
+ * Rack-aware assignment is used if both consumer and partition replica racks 
are available and
+ * some partitions have replicas only on a subset of racks. We attempt to 
match consumer racks with
+ * partition replica racks on a best-effort basis, prioritizing balanced 
assignment over rack alignment.
+ * Topics with equal partition count and same set of subscribers guarantee 
co-partitioning by prioritizing
+ * co-partitioning over rack alignment. In this case, aligning partition 
replicas of these topics on the
+ * same racks will improve locality for consumers. For example, if partitions 
0 of all topics have a replica
+ * on rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be 
assigned to a consumer
+ * on rack 'a', partition 1 to a consumer on rack 'b' and so on.
+ * 
+ * Note that rack-aware assignment currently takes all replicas into account, 
including any offline replicas
+ * and replicas that are not in the ISR. This is based on the assumption that 
these replicas are likely
+ * to join the ISR relatively soon. Since consumers don't rebalance on ISR 
change, this avoids unnecessary
+ * cross-rack traffic for long durations after replicas rejoin the ISR. In the 
future, we may consider
+ * rebalancing when replicas are added or removed to improve consumer rack 
alignment.
+ * 
  */
 public class RangeAssignor extends AbstractPartitionAssignor {
 public static final String RANGE_ASSIGNOR_NAME = "range";
+private static final TopicPartitionComparator PARTITION_COMPARATOR = new 
TopicPartitionComparator();
 
 @Override
 public String name() {
@@ -74,45 +105,193 @@ public class RangeAssignor extends 
AbstractPartitionAssignor {
 
 private Map> consumersPerTopic(Map consumerMetadata) {
 Map> topicToConsumers = new HashMap<>();
-for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet())

[kafka] branch trunk updated: KAFKA-14867: Trigger rebalance when replica racks change if client.rack is configured (KIP-881) (#13474)

2023-03-31 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 3c4472d701a KAFKA-14867: Trigger rebalance when replica racks change 
if client.rack is configured (KIP-881) (#13474)
3c4472d701a is described below

commit 3c4472d701a7e9d9b8714a0b9d87ae190d1679fb
Author: Rajini Sivaram 
AuthorDate: Fri Mar 31 15:01:07 2023 +0100

KAFKA-14867: Trigger rebalance when replica racks change if client.rack is 
configured (KIP-881) (#13474)

When `client.rack` is configured for consumers, we perform rack-aware 
consumer partition assignment to improve locality. After/during reassignments, 
replica racks may change, so to ensure optimal consumer assignment, trigger 
rebalance from the leader when set of racks of any partition changes.

Reviewers: David Jacot 
---
 .../consumer/internals/ConsumerCoordinator.java|  58 +--
 .../internals/ConsumerCoordinatorTest.java | 171 ++---
 .../server/FetchFromFollowerIntegrationTest.scala  |  26 +++-
 3 files changed, 224 insertions(+), 31 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fec31fe80f8..1f6c5ef0d75 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.util.Arrays;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -35,6 +36,7 @@ import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparato
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -174,7 +176,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 this.rebalanceConfig = rebalanceConfig;
 this.log = logContext.logger(ConsumerCoordinator.class);
 this.metadata = metadata;
-this.metadataSnapshot = new MetadataSnapshot(subscriptions, 
metadata.fetch(), metadata.updateVersion());
+this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : 
Optional.of(rackId);
+this.metadataSnapshot = new MetadataSnapshot(this.rackId, 
subscriptions, metadata.fetch(), metadata.updateVersion());
 this.subscriptions = subscriptions;
 this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
 this.autoCommitEnabled = autoCommitEnabled;
@@ -188,7 +191,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
 JoinGroupRequest.UNKNOWN_GENERATION_ID, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, rebalanceConfig.groupInstanceId);
 this.throwOnFetchStableOffsetsUnsupported = 
throwOnFetchStableOffsetsUnsupported;
-this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : 
Optional.of(rackId);
 
 if (autoCommitEnabled)
 this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
@@ -489,7 +491,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
 // Update the current snapshot, which will be used to check for 
subscription
 // changes that would require a rebalance (e.g. new partitions).
-metadataSnapshot = new MetadataSnapshot(subscriptions, cluster, 
version);
+metadataSnapshot = new MetadataSnapshot(rackId, subscriptions, 
cluster, version);
 }
 }
 
@@ -1613,14 +1615,18 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
 private static class MetadataSnapshot {
 private final int version;
-private final Map partitionsPerTopic;
+private final Map> partitionsPerTopic;
 
-private MetadataSnapshot(SubscriptionState subscription, Cluster 
cluster, int version) {
-Map partitionsPerTopic = new HashMap<>();
+private MetadataSnapshot(Optional clientRack, 
SubscriptionState subscription, Cluster cluster, int version) {
+Map> partitionsPerTopic = new 
HashMap<>();
 for (String topic : subscription.metadataTopics()) {
-Integer numPartitions = cluster.partitionCountForTopic(topic);
-if (num

[kafka] branch trunk updated (970dea60e86 -> 1f0ae71fb32)

2023-04-03 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 970dea60e86 KAFKA-14785 (KIP-875): Connect offset read REST API 
(#13434)
 add 1f0ae71fb32 KAFKA-14452: Make sticky assignors rack-aware if client 
rack is configured (KIP-881) (#13350)

No new revisions were added by this update.

Summary of changes:
 .../consumer/CooperativeStickyAssignor.java|   9 +-
 .../kafka/clients/consumer/StickyAssignor.java |   2 +-
 .../consumer/internals/AbstractStickyAssignor.java | 329 ++--
 .../consumer/CooperativeStickyAssignorTest.java|  72 +-
 .../kafka/clients/consumer/StickyAssignorTest.java | 206 ++---
 .../internals/AbstractPartitionAssignorTest.java   |  48 +-
 .../internals/AbstractStickyAssignorTest.java  | 827 ++---
 7 files changed, 1014 insertions(+), 479 deletions(-)



[kafka] branch trunk updated (e99984248da -> b64ac94a8c6)

2023-04-12 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from e99984248da KAFKA-9550 Copying log segments to tiered storage in 
RemoteLogManager (#13487)
 add b64ac94a8c6 KAFKA-14891: Fix rack-aware range assignor to assign 
co-partitioned subsets (#13539)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/consumer/RangeAssignor.java  |  2 +-
 .../kafka/clients/consumer/RangeAssignorTest.java  | 30 --
 2 files changed, 29 insertions(+), 3 deletions(-)



[kafka] branch trunk updated: KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)

2023-02-12 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ef9c9486dac KAFKA-14676: Include all SASL configs in login cache key 
to ensure clients in a JVM can use different OAuth configs (#13211)
ef9c9486dac is described below

commit ef9c9486dac8d6076e2897580b10a699697205ac
Author: Rajini Sivaram 
AuthorDate: Sun Feb 12 19:49:12 2023 +

KAFKA-14676: Include all SASL configs in login cache key to ensure clients 
in a JVM can use different OAuth configs (#13211)

We currently cache login managers in static maps for both static JAAS 
config using system property and for JAAS config specified using Kafka config 
sasl.jaas.config. In addition to the JAAS config, the login manager callback 
handler is included in the key, but all other configs are ignored. This 
implementation is based on the assumption clients that require different logins 
(e.g. username/password) use different JAAS configs, because login properties 
are included in the JAAS config ra [...]

This PR includes all SASL configs prefixed with sasl. to be included in the 
key so that logins are only shared if all the sasl configs are identical.

Reviewers: Manikumar Reddy , Kirk True 

---
 .../security/authenticator/LoginManager.java   | 17 ++---
 .../security/authenticator/LoginManagerTest.java   | 43 ++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 6613fd147f8..f84a8ac9b75 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -99,14 +99,14 @@ public class LoginManager {
 LoginManager loginManager;
 Password jaasConfigValue = jaasContext.dynamicJaasConfig();
 if (jaasConfigValue != null) {
-LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass);
+LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs);
 loginManager = DYNAMIC_INSTANCES.get(loginMetadata);
 if (loginManager == null) {
 loginManager = new LoginManager(jaasContext, 
saslMechanism, configs, loginMetadata);
 DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
 }
 } else {
-LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass);
+LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs);
 loginManager = STATIC_INSTANCES.get(loginMetadata);
 if (loginManager == null) {
 loginManager = new LoginManager(jaasContext, 
saslMechanism, configs, loginMetadata);
@@ -198,17 +198,23 @@ public class LoginManager {
 final T configInfo;
 final Class loginClass;
 final Class loginCallbackClass;
+final Map saslConfigs;
 
 LoginMetadata(T configInfo, Class loginClass,
-  Class 
loginCallbackClass) {
+  Class 
loginCallbackClass,
+  Map configs) {
 this.configInfo = configInfo;
 this.loginClass = loginClass;
 this.loginCallbackClass = loginCallbackClass;
+this.saslConfigs = new HashMap<>();
+configs.entrySet().stream()
+.filter(e -> e.getKey().startsWith("sasl."))
+.forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); 
// value may be null
 }
 
 @Override
 public int hashCode() {
-return Objects.hash(configInfo, loginClass, loginCallbackClass);
+return Objects.hash(configInfo, loginClass, loginCallbackClass, 
saslConfigs);
 }
 
 @Override
@@ -219,7 +225,8 @@ public class LoginManager {
 LoginMetadata loginMetadata = (LoginMetadata) o;
 return Objects.equals(configInfo, loginMetadata.configInfo) &&
Objects.equals(loginClass, loginMetadata.loginClass) &&
-   Objects.equals(loginCallbackClass, 
loginMetadata.loginCallbackClass);
+   Objects.equals(loginCallbackClass, 
loginMetadata.loginCallbackClass) &&
+   Objects.equals(saslConfigs, loginMetadata.saslConfigs);
 }
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/secu

[kafka] branch 3.4 updated: KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)

2023-02-12 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
 new f7a9f5d6a32 KAFKA-14676: Include all SASL configs in login cache key 
to ensure clients in a JVM can use different OAuth configs (#13211)
f7a9f5d6a32 is described below

commit f7a9f5d6a3203bcf2df75ea52bb0ba5a9f7474a6
Author: Rajini Sivaram 
AuthorDate: Sun Feb 12 19:49:12 2023 +

KAFKA-14676: Include all SASL configs in login cache key to ensure clients 
in a JVM can use different OAuth configs (#13211)

We currently cache login managers in static maps for both static JAAS 
config using system property and for JAAS config specified using Kafka config 
sasl.jaas.config. In addition to the JAAS config, the login manager callback 
handler is included in the key, but all other configs are ignored. This 
implementation is based on the assumption clients that require different logins 
(e.g. username/password) use different JAAS configs, because login properties 
are included in the JAAS config ra [...]

This PR includes all SASL configs prefixed with sasl. to be included in the 
key so that logins are only shared if all the sasl configs are identical.

Reviewers: Manikumar Reddy , Kirk True 

---
 .../security/authenticator/LoginManager.java   | 17 ++---
 .../security/authenticator/LoginManagerTest.java   | 43 ++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 6613fd147f8..f84a8ac9b75 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -99,14 +99,14 @@ public class LoginManager {
 LoginManager loginManager;
 Password jaasConfigValue = jaasContext.dynamicJaasConfig();
 if (jaasConfigValue != null) {
-LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass);
+LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs);
 loginManager = DYNAMIC_INSTANCES.get(loginMetadata);
 if (loginManager == null) {
 loginManager = new LoginManager(jaasContext, 
saslMechanism, configs, loginMetadata);
 DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
 }
 } else {
-LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass);
+LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs);
 loginManager = STATIC_INSTANCES.get(loginMetadata);
 if (loginManager == null) {
 loginManager = new LoginManager(jaasContext, 
saslMechanism, configs, loginMetadata);
@@ -198,17 +198,23 @@ public class LoginManager {
 final T configInfo;
 final Class loginClass;
 final Class loginCallbackClass;
+final Map saslConfigs;
 
 LoginMetadata(T configInfo, Class loginClass,
-  Class 
loginCallbackClass) {
+  Class 
loginCallbackClass,
+  Map configs) {
 this.configInfo = configInfo;
 this.loginClass = loginClass;
 this.loginCallbackClass = loginCallbackClass;
+this.saslConfigs = new HashMap<>();
+configs.entrySet().stream()
+.filter(e -> e.getKey().startsWith("sasl."))
+.forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); 
// value may be null
 }
 
 @Override
 public int hashCode() {
-return Objects.hash(configInfo, loginClass, loginCallbackClass);
+return Objects.hash(configInfo, loginClass, loginCallbackClass, 
saslConfigs);
 }
 
 @Override
@@ -219,7 +225,8 @@ public class LoginManager {
 LoginMetadata loginMetadata = (LoginMetadata) o;
 return Objects.equals(configInfo, loginMetadata.configInfo) &&
Objects.equals(loginClass, loginMetadata.loginClass) &&
-   Objects.equals(loginCallbackClass, 
loginMetadata.loginCallbackClass);
+   Objects.equals(loginCallbackClass, 
loginMetadata.loginCallbackClass) &&
+   Objects.equals(saslConfigs, loginMetadata.saslConfigs);
 }
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/au

[kafka] branch 3.3 updated: KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)

2023-02-12 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
 new 6a86e54c764 KAFKA-14676: Include all SASL configs in login cache key 
to ensure clients in a JVM can use different OAuth configs (#13211)
6a86e54c764 is described below

commit 6a86e54c7645c3db950640b10340ff347a0bc1a2
Author: Rajini Sivaram 
AuthorDate: Sun Feb 12 19:49:12 2023 +

KAFKA-14676: Include all SASL configs in login cache key to ensure clients 
in a JVM can use different OAuth configs (#13211)

We currently cache login managers in static maps for both static JAAS 
config using system property and for JAAS config specified using Kafka config 
sasl.jaas.config. In addition to the JAAS config, the login manager callback 
handler is included in the key, but all other configs are ignored. This 
implementation is based on the assumption clients that require different logins 
(e.g. username/password) use different JAAS configs, because login properties 
are included in the JAAS config ra [...]

This PR includes all SASL configs prefixed with sasl. to be included in the 
key so that logins are only shared if all the sasl configs are identical.

Reviewers: Manikumar Reddy , Kirk True 

---
 .../security/authenticator/LoginManager.java   | 17 ++---
 .../security/authenticator/LoginManagerTest.java   | 43 ++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 6613fd147f8..f84a8ac9b75 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -99,14 +99,14 @@ public class LoginManager {
 LoginManager loginManager;
 Password jaasConfigValue = jaasContext.dynamicJaasConfig();
 if (jaasConfigValue != null) {
-LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass);
+LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs);
 loginManager = DYNAMIC_INSTANCES.get(loginMetadata);
 if (loginManager == null) {
 loginManager = new LoginManager(jaasContext, 
saslMechanism, configs, loginMetadata);
 DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
 }
 } else {
-LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass);
+LoginMetadata loginMetadata = new 
LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs);
 loginManager = STATIC_INSTANCES.get(loginMetadata);
 if (loginManager == null) {
 loginManager = new LoginManager(jaasContext, 
saslMechanism, configs, loginMetadata);
@@ -198,17 +198,23 @@ public class LoginManager {
 final T configInfo;
 final Class loginClass;
 final Class loginCallbackClass;
+final Map saslConfigs;
 
 LoginMetadata(T configInfo, Class loginClass,
-  Class 
loginCallbackClass) {
+  Class 
loginCallbackClass,
+  Map configs) {
 this.configInfo = configInfo;
 this.loginClass = loginClass;
 this.loginCallbackClass = loginCallbackClass;
+this.saslConfigs = new HashMap<>();
+configs.entrySet().stream()
+.filter(e -> e.getKey().startsWith("sasl."))
+.forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); 
// value may be null
 }
 
 @Override
 public int hashCode() {
-return Objects.hash(configInfo, loginClass, loginCallbackClass);
+return Objects.hash(configInfo, loginClass, loginCallbackClass, 
saslConfigs);
 }
 
 @Override
@@ -219,7 +225,8 @@ public class LoginManager {
 LoginMetadata loginMetadata = (LoginMetadata) o;
 return Objects.equals(configInfo, loginMetadata.configInfo) &&
Objects.equals(loginClass, loginMetadata.loginClass) &&
-   Objects.equals(loginCallbackClass, 
loginMetadata.loginCallbackClass);
+   Objects.equals(loginCallbackClass, 
loginMetadata.loginCallbackClass) &&
+   Objects.equals(saslConfigs, loginMetadata.saslConfigs);
 }
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/au

(kafka) branch trunk updated: KAFKA-16305: Avoid optimisation in handshakeUnwrap (#15434)

2024-02-28 Thread rsivaram
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 5d6936a4992  KAFKA-16305: Avoid optimisation in handshakeUnwrap 
(#15434)
5d6936a4992 is described below

commit 5d6936a4992b77ef68da216a7c2dbf1f8c9f909e
Author: Gaurav Narula 
AuthorDate: Wed Feb 28 09:37:58 2024 +

 KAFKA-16305: Avoid optimisation in handshakeUnwrap (#15434)

Performs additional unwrap during handshake after data from client is 
processed to support openssl, which needs the extra unwrap to complete 
handshake.

Reviewers: Ismael Juma , Rajini Sivaram 

---
 .../kafka/common/network/SslTransportLayer.java|  7 +--
 .../common/network/SslTransportLayerTest.java  | 60 ++
 2 files changed, 64 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 904c5216a40..da80e363a95 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -498,13 +498,14 @@ public class SslTransportLayer implements TransportLayer {
 }
 
 /**
- * Perform handshake unwrap
+ * Perform handshake unwrap.
+ * Visible for testing.
  * @param doRead boolean If true, read more from the socket channel
  * @param ignoreHandshakeStatus If true, continue to unwrap if data 
available regardless of handshake status
  * @return SSLEngineResult
  * @throws IOException
  */
-private SSLEngineResult handshakeUnwrap(boolean doRead, boolean 
ignoreHandshakeStatus) throws IOException {
+SSLEngineResult handshakeUnwrap(boolean doRead, boolean 
ignoreHandshakeStatus) throws IOException {
 log.trace("SSLHandshake handshakeUnwrap {}", channelId);
 SSLEngineResult result;
 int read = 0;
@@ -526,7 +527,7 @@ public class SslTransportLayer implements TransportLayer {
 handshakeStatus == HandshakeStatus.NEED_UNWRAP) ||
 (ignoreHandshakeStatus && netReadBuffer.position() != 
position);
 log.trace("SSLHandshake handshakeUnwrap: handshakeStatus {} status 
{}", handshakeStatus, result.getStatus());
-} while (netReadBuffer.position() != 0 && cont);
+} while (cont);
 
 // Throw EOF exception for failed read after processing already 
received data
 // so that handshake failures are reported correctly
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index d92f4facb3c..8b00bcdb955 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -51,6 +52,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -65,13 +67,20 @@ import java.util.stream.Stream;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLSession;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the SSL transport layer. These use a test harness that runs a 
simple socket server that echos back responses.
@@ -1467,4 +1476,55 @@ public class SslTransportLayerTest {
 }
 }
 }
+
+/**
+ * SSLEngine implementations may transition from NEED_UNWRAP to NEED_UNWRAP
+ * even after reading all the data from the socket. This test ensures we
+ * con

<    3   4   5   6   7   8