[kafka] branch 3.0 updated: KAFKA-13418: Support key updates with TLS 1.3 (#11966)

2022-03-29 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
 new edfd769  KAFKA-13418: Support key updates with TLS 1.3 (#11966)
edfd769 is described below

commit edfd769f426e5baaf94c379e23624ec82e3e80bb
Author: Ismael Juma 
AuthorDate: Tue Mar 29 14:59:38 2022 -0700

KAFKA-13418: Support key updates with TLS 1.3 (#11966)

Key updates with TLS 1.3 trigger code paths similar to renegotiation with 
TLS 1.2.
Update the read/write paths not to throw an exception in this case (kept 
the exception
in the `handshake` method).

With the default configuration, key updates happen after 2^37 bytes are 
encrypted.
There is a security property to adjust this configuration, but the change 
has to be
done before it is used for the first time and it cannot be changed after 
that. As such,
it is best done via a system test (filed KAFKA-13779).

To validate the change, I wrote a unit test that forces key updates and 
manually ran
a producer workload that produced more than 2^37 bytes. Both cases failed 
without
these changes and pass with them.

Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix 
and hence
included them as a co-author of this change.

Reviewers: Rajini Sivaram 

Co-authored-by: Shylaja Kokoori
---
 .../kafka/common/network/SslTransportLayer.java| 16 ++--
 .../apache/kafka/common/network/SelectorTest.java  |  5 --
 .../kafka/common/network/SslSelectorTest.java  | 44 ++-
 .../kafka/common/network/Tls12SelectorTest.java| 72 +
 .../kafka/common/network/Tls13SelectorTest.java| 92 ++
 5 files changed, 180 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index b9879ad..d276e99 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer {
 CLOSING
 }
 
+private static final String TLS13 = "TLSv1.3";
+
 private final String channelId;
 private final SSLEngine sslEngine;
 private final SelectionKey key;
@@ -446,7 +448,7 @@ public class SslTransportLayer implements TransportLayer {
 if (netWriteBuffer.hasRemaining())
 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
 else {
-state = sslEngine.getSession().getProtocol().equals("TLSv1.3") 
? State.POST_HANDSHAKE : State.READY;
+state = sslEngine.getSession().getProtocol().equals(TLS13) ? 
State.POST_HANDSHAKE : State.READY;
 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
 SSLSession session = sslEngine.getSession();
 log.debug("SSL handshake completed successfully with peerHost 
'{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'",
@@ -578,10 +580,11 @@ public class SslTransportLayer implements TransportLayer {
 throw e;
 }
 netReadBuffer.compact();
-// handle ssl renegotiation.
+// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 
are allowed
 if (unwrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING &&
 unwrapResult.getHandshakeStatus() != 
HandshakeStatus.FINISHED &&
-unwrapResult.getStatus() == Status.OK) {
+unwrapResult.getStatus() == Status.OK &&
+!sslEngine.getSession().getProtocol().equals(TLS13)) {
 log.error("Renegotiation requested, but it is not 
supported, channelId {}, " +
 "appReadBuffer pos {}, netReadBuffer pos {}, 
netWriteBuffer pos {} handshakeStatus {}", channelId,
 appReadBuffer.position(), netReadBuffer.position(), 
netWriteBuffer.position(), unwrapResult.getHandshakeStatus());
@@ -699,9 +702,12 @@ public class SslTransportLayer implements TransportLayer {
 SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
 netWriteBuffer.flip();
 
-//handle ssl renegotiation
-if (wrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
+// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are 
allowed
+if (wrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING &&
+wrapResult.getStatus() == Status.OK &&
+

[kafka] branch 3.1 updated: KAFKA-13418: Support key updates with TLS 1.3 (#11966)

2022-03-29 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
 new e714cb5  KAFKA-13418: Support key updates with TLS 1.3 (#11966)
e714cb5 is described below

commit e714cb5dbb92bf3905550daca0053a18ec53426d
Author: Ismael Juma 
AuthorDate: Tue Mar 29 14:59:38 2022 -0700

KAFKA-13418: Support key updates with TLS 1.3 (#11966)

Key updates with TLS 1.3 trigger code paths similar to renegotiation with 
TLS 1.2.
Update the read/write paths not to throw an exception in this case (kept 
the exception
in the `handshake` method).

With the default configuration, key updates happen after 2^37 bytes are 
encrypted.
There is a security property to adjust this configuration, but the change 
has to be
done before it is used for the first time and it cannot be changed after 
that. As such,
it is best done via a system test (filed KAFKA-13779).

To validate the change, I wrote a unit test that forces key updates and 
manually ran
a producer workload that produced more than 2^37 bytes. Both cases failed 
without
these changes and pass with them.

Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix 
and hence
included them as a co-author of this change.

Reviewers: Rajini Sivaram 

Co-authored-by: Shylaja Kokoori
---
 .../kafka/common/network/SslTransportLayer.java| 16 ++--
 .../apache/kafka/common/network/SelectorTest.java  |  5 --
 .../kafka/common/network/SslSelectorTest.java  | 44 ++-
 .../kafka/common/network/Tls12SelectorTest.java| 72 +
 .../kafka/common/network/Tls13SelectorTest.java| 92 ++
 5 files changed, 180 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index b9879ad..d276e99 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer {
 CLOSING
 }
 
+private static final String TLS13 = "TLSv1.3";
+
 private final String channelId;
 private final SSLEngine sslEngine;
 private final SelectionKey key;
@@ -446,7 +448,7 @@ public class SslTransportLayer implements TransportLayer {
 if (netWriteBuffer.hasRemaining())
 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
 else {
-state = sslEngine.getSession().getProtocol().equals("TLSv1.3") 
? State.POST_HANDSHAKE : State.READY;
+state = sslEngine.getSession().getProtocol().equals(TLS13) ? 
State.POST_HANDSHAKE : State.READY;
 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
 SSLSession session = sslEngine.getSession();
 log.debug("SSL handshake completed successfully with peerHost 
'{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'",
@@ -578,10 +580,11 @@ public class SslTransportLayer implements TransportLayer {
 throw e;
 }
 netReadBuffer.compact();
-// handle ssl renegotiation.
+// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 
are allowed
 if (unwrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING &&
 unwrapResult.getHandshakeStatus() != 
HandshakeStatus.FINISHED &&
-unwrapResult.getStatus() == Status.OK) {
+unwrapResult.getStatus() == Status.OK &&
+!sslEngine.getSession().getProtocol().equals(TLS13)) {
 log.error("Renegotiation requested, but it is not 
supported, channelId {}, " +
 "appReadBuffer pos {}, netReadBuffer pos {}, 
netWriteBuffer pos {} handshakeStatus {}", channelId,
 appReadBuffer.position(), netReadBuffer.position(), 
netWriteBuffer.position(), unwrapResult.getHandshakeStatus());
@@ -699,9 +702,12 @@ public class SslTransportLayer implements TransportLayer {
 SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
 netWriteBuffer.flip();
 
-//handle ssl renegotiation
-if (wrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
+// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are 
allowed
+if (wrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING &&
+wrapResult.getStatus() == Status.OK &&
+

[kafka] branch 3.2 updated: KAFKA-13418: Support key updates with TLS 1.3 (#11966)

2022-03-29 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
 new ecff741  KAFKA-13418: Support key updates with TLS 1.3 (#11966)
ecff741 is described below

commit ecff741c6d1b382aaf1a11b7a27631a449441fb8
Author: Ismael Juma 
AuthorDate: Tue Mar 29 14:59:38 2022 -0700

KAFKA-13418: Support key updates with TLS 1.3 (#11966)

Key updates with TLS 1.3 trigger code paths similar to renegotiation with 
TLS 1.2.
Update the read/write paths not to throw an exception in this case (kept 
the exception
in the `handshake` method).

With the default configuration, key updates happen after 2^37 bytes are 
encrypted.
There is a security property to adjust this configuration, but the change 
has to be
done before it is used for the first time and it cannot be changed after 
that. As such,
it is best done via a system test (filed KAFKA-13779).

To validate the change, I wrote a unit test that forces key updates and 
manually ran
a producer workload that produced more than 2^37 bytes. Both cases failed 
without
these changes and pass with them.

Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix 
and hence
included them as a co-author of this change.

Reviewers: Rajini Sivaram 

Co-authored-by: Shylaja Kokoori
---
 .../kafka/common/network/SslTransportLayer.java| 16 ++--
 .../apache/kafka/common/network/SelectorTest.java  |  5 --
 .../kafka/common/network/SslSelectorTest.java  | 44 ++-
 .../kafka/common/network/Tls12SelectorTest.java| 72 +
 .../kafka/common/network/Tls13SelectorTest.java| 92 ++
 5 files changed, 180 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 893fd6a..844c2bd 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer {
 CLOSING
 }
 
+private static final String TLS13 = "TLSv1.3";
+
 private final String channelId;
 private final SSLEngine sslEngine;
 private final SelectionKey key;
@@ -449,7 +451,7 @@ public class SslTransportLayer implements TransportLayer {
 if (netWriteBuffer.hasRemaining())
 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
 else {
-state = sslEngine.getSession().getProtocol().equals("TLSv1.3") 
? State.POST_HANDSHAKE : State.READY;
+state = sslEngine.getSession().getProtocol().equals(TLS13) ? 
State.POST_HANDSHAKE : State.READY;
 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
 SSLSession session = sslEngine.getSession();
 log.debug("SSL handshake completed successfully with peerHost 
'{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'",
@@ -585,10 +587,11 @@ public class SslTransportLayer implements TransportLayer {
 throw e;
 }
 netReadBuffer.compact();
-// handle ssl renegotiation.
+// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 
are allowed
 if (unwrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING &&
 unwrapResult.getHandshakeStatus() != 
HandshakeStatus.FINISHED &&
-unwrapResult.getStatus() == Status.OK) {
+unwrapResult.getStatus() == Status.OK &&
+!sslEngine.getSession().getProtocol().equals(TLS13)) {
 log.error("Renegotiation requested, but it is not 
supported, channelId {}, " +
 "appReadBuffer pos {}, netReadBuffer pos {}, 
netWriteBuffer pos {} handshakeStatus {}", channelId,
 appReadBuffer.position(), netReadBuffer.position(), 
netWriteBuffer.position(), unwrapResult.getHandshakeStatus());
@@ -706,9 +709,12 @@ public class SslTransportLayer implements TransportLayer {
 SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
 netWriteBuffer.flip();
 
-//handle ssl renegotiation
-if (wrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
+// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are 
allowed
+if (wrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING &&
+wrapResult.getStatus() == Status.OK &&
+

[kafka] branch trunk updated (f2aa0c4 -> 5aed178)

2022-03-29 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from f2aa0c4  MINOR: Disable 
SocketServerTest.closingChannelWithBufferedReceives and 
SocketServerTest.remoteCloseWithoutBufferedReceives (#11960)
 add 5aed178  KAFKA-13418: Support key updates with TLS 1.3 (#11966)

No new revisions were added by this update.

Summary of changes:
 .../kafka/common/network/SslTransportLayer.java| 16 ++--
 .../apache/kafka/common/network/SelectorTest.java  |  5 --
 .../kafka/common/network/SslSelectorTest.java  | 44 ++-
 .../kafka/common/network/Tls12SelectorTest.java| 72 +
 .../kafka/common/network/Tls13SelectorTest.java| 92 ++
 5 files changed, 180 insertions(+), 49 deletions(-)
 create mode 100644 
clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java
 create mode 100644 
clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java


[kafka] branch trunk updated: MINOR: Disable SocketServerTest.closingChannelWithBufferedReceives and SocketServerTest.remoteCloseWithoutBufferedReceives (#11960)

2022-03-29 Thread dajac
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new f2aa0c4  MINOR: Disable 
SocketServerTest.closingChannelWithBufferedReceives and 
SocketServerTest.remoteCloseWithoutBufferedReceives (#11960)
f2aa0c4 is described below

commit f2aa0c439cb0045e6505d1bbdf8731a253ca
Author: David Jacot 
AuthorDate: Tue Mar 29 14:31:12 2022 +0200

MINOR: Disable SocketServerTest.closingChannelWithBufferedReceives and 
SocketServerTest.remoteCloseWithoutBufferedReceives (#11960)

This reverts commit d706d6cac4622153973d131417e809ee57c60de0.

Reviewers: Bruno Cadonna 
---
 core/src/test/scala/unit/kafka/network/SocketServerTest.scala | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 849646c..33d15ad1 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -1389,6 +1389,7 @@ class SocketServerTest {
* buffered receive.
*/
   @Test
+  @Disabled // TODO: re-enabled until KAFKA-13735 is fixed
   def remoteCloseWithoutBufferedReceives(): Unit = {
 verifyRemoteCloseWithBufferedReceives(numComplete = 0, hasIncomplete = 
false)
   }
@@ -1426,6 +1427,7 @@ class SocketServerTest {
* The channel must be closed after pending receives are processed.
*/
   @Test
+  @Disabled // TODO: re-enable after KAFKA-13736 is fixed
   def closingChannelWithBufferedReceives(): Unit = {
 verifyRemoteCloseWithBufferedReceives(numComplete = 3, hasIncomplete = 
false, makeClosing = true)
   }


[kafka] branch 3.2 updated: KAFKA-6718: Add documentation for KIP-708 (#11923)

2022-03-29 Thread cadonna
This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
 new 3143e7f  KAFKA-6718: Add documentation for KIP-708 (#11923)
3143e7f is described below

commit 3143e7f13e9a1e03e9fd35d6cc528e6ec4a3f106
Author: Levani Kokhreidze 
AuthorDate: Tue Mar 29 15:08:51 2022 +0300

KAFKA-6718: Add documentation for KIP-708 (#11923)

Adds documentation for KIP-708: Rack awareness for Kafka Streams

Co-authored-by: Bruno Cadonna 

Reviewers: Luke Chen , Bruno Cadonna 
---
 .../org/apache/kafka/common/config/ConfigDef.java  |  9 +++--
 .../apache/kafka/common/config/ConfigDefTest.java  |  5 +++
 docs/streams/architecture.html |  6 
 docs/streams/developer-guide/config-streams.html   | 42 ++
 4 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 8c91a25..9331f99 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.common.config;
 
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
 
@@ -33,8 +31,10 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * This class is used for specifying the set of expected configurations. For 
each configuration, you can specify
@@ -1140,6 +1140,11 @@ public class ConfigDef {
 throw new ConfigException(name, value, "exceeds maximum list 
size of [" + maxSize + "].");
 }
 }
+
+@Override
+public String toString() {
+return "List containing maximum of " + maxSize + " elements";
+}
 }
 
 public static class ConfigKey {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 0e5af1f..76c20df 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -758,4 +758,9 @@ public class ConfigDefTest {
 "lst doc"));
 }
 
+@Test
+public void testListSizeValidatorToString() {
+assertEquals("List containing maximum of 5 elements", 
ListSize.atMostOfSize(5).toString());
+}
+
 }
diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html
index a1773c5..e561231 100644
--- a/docs/streams/architecture.html
+++ b/docs/streams/architecture.html
@@ -161,6 +161,12 @@
 Starting in 2.6, Kafka Streams will guarantee that a task is only ever 
assigned to an instance with a fully caught-up local copy of the state, if such 
an instance
 exists. Standby tasks will increase the likelihood that a caught-up 
instance exists in the case of a failure.
 
+
+You can also configure standby replicas with rack awareness. When 
configured, Kafka Streams will attempt to
+distribute a standby task on a different "rack" than the active one, 
thus having a faster recovery time when the
+rack of the active tasks fails. See 
rack.aware.assignment.tags
+in the Kafka
 Streams Developer Guide section.
+
 
 
 Previous
diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index dd9298d..0aee6b6 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -84,6 +84,7 @@ settings.put(... , ...);
   partition.grouper
   probing.rebalance.interval.ms
   processing.guarantee
+  rack.aware.assignment.tags
   replication.factor
   rocksdb.config.setter
   state.dir
@@ -383,6 +384,13 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);
 The amount of time in milliseconds to block 
waiting for input.
 100 milliseconds
   
+  rack.aware.assignment.tags
+Medium
+List of tag keys used to distribute standby 
replicas across Kafka Streams
+  clients. When configured, Kafka Streams will make a best-effort 
to distribute the standby tasks over
+  clients with different tag values.
+the empty list
+

[kafka] branch trunk updated (db2485c -> 35ae4f2)

2022-03-29 Thread cadonna
This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from db2485c  KAFKA-13767; Fetch from consumers should return immediately 
when preferred read replica is defined by the leader (#11942)
 add 35ae4f2  KAFKA-6718: Add documentation for KIP-708 (#11923)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/common/config/ConfigDef.java  |  9 +++--
 .../apache/kafka/common/config/ConfigDefTest.java  |  5 +++
 docs/streams/architecture.html |  6 
 docs/streams/developer-guide/config-streams.html   | 42 ++
 4 files changed, 60 insertions(+), 2 deletions(-)


[kafka] branch 3.2 updated: KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942)

2022-03-29 Thread dajac
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
 new f3eab7b  KAFKA-13767; Fetch from consumers should return immediately 
when preferred read replica is defined by the leader (#11942)
f3eab7b is described below

commit f3eab7b827b96fc69b4679b0e11656e00af776b7
Author: bozhao12 <102274736+bozha...@users.noreply.github.com>
AuthorDate: Tue Mar 29 16:13:05 2022 +0800

KAFKA-13767; Fetch from consumers should return immediately when preferred 
read replica is defined by the leader (#11942)

When a replica selector is configured, the partition leader computes a 
preferred read replica for any fetch from the consumers. When the preferred 
read replica is not the leader, the leader returns the preferred read replica 
with `FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY)` to the `ReplicaManager`. This causes the fetch to go into 
in the fetch purgatory because the exit conditions are not met. In turns out 
that the delayed fetch is not completed until  [...]

This patch fixes the issue by completing the fetch request immediately when 
a preferred read replica is defined.

Reviewers: David Jacot 
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  8 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala | 76 --
 2 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 42124aa..4b77a4a 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1028,15 +1028,17 @@ class ReplicaManager(val config: KafkaConfig,
 var bytesReadable: Long = 0
 var errorReadingData = false
 var hasDivergingEpoch = false
+var hasPreferredReadReplica = false
 val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
 logReadResults.foreach { case (topicIdPartition, logReadResult) =>
   
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
   brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
-
   if (logReadResult.error != Errors.NONE)
 errorReadingData = true
   if (logReadResult.divergingEpoch.nonEmpty)
 hasDivergingEpoch = true
+  if (logReadResult.preferredReadReplica.nonEmpty)
+hasPreferredReadReplica = true
   bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
   logReadResultMap.put(topicIdPartition, logReadResult)
 }
@@ -1046,7 +1048,9 @@ class ReplicaManager(val config: KafkaConfig,
 //3) has enough data to respond
 //4) some error happens while reading data
 //5) we found a diverging epoch
-if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes 
|| errorReadingData || hasDivergingEpoch) {
+//6) has a preferred read replica
+if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes 
|| errorReadingData ||
+  hasDivergingEpoch || hasPreferredReadReplica) {
   val fetchPartitionData = logReadResults.map { case (tp, result) =>
 val isReassignmentFetch = isFromFollower && 
isAddingReplica(tp.topicPartition, replicaId)
 tp -> result.toFetchPartitionData(isReassignmentFetch)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index dd644c8..a17c70b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1265,7 +1265,7 @@ class ReplicaManagerTest {
 
   initializeLogAndTopicId(replicaManager, tp0, topicId)
 
-  // Make this replica the follower
+  // Make this replica the leader
   val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
 Seq(new LeaderAndIsrPartitionState()
   .setTopicName(topic)
@@ -1281,14 +1281,14 @@ class ReplicaManagerTest {
 Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
   replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
 
-  val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", 
"client-id",
+  val metadata = new DefaultClientMetadata("rack-a", "client-id",
 InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
 
   val consumerResult = fetchAsConsumer(replicaManager, tidp0,
 new PartitionData(Uuid.ZERO_UUID, 0, 0, 10, Optional.empty()),
 clientMetadata = Some(metadata))
 
-  // Fetch from follower 

[kafka] branch trunk updated (19a6269 -> db2485c)

2022-03-29 Thread dajac
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 19a6269  MINOR: Fix log4j entry in RepartitionTopics (#11958)
 add db2485c  KAFKA-13767; Fetch from consumers should return immediately 
when preferred read replica is defined by the leader (#11942)

No new revisions were added by this update.

Summary of changes:
 .../main/scala/kafka/server/ReplicaManager.scala   |  8 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala | 76 --
 2 files changed, 75 insertions(+), 9 deletions(-)


[kafka] branch trunk updated (2a27059 -> 19a6269)

2022-03-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 2a27059  MINOR: Improved display names for parameterized KRaft and ZK 
tests (#11957)
 add 19a6269  MINOR: Fix log4j entry in RepartitionTopics (#11958)

No new revisions were added by this update.

Summary of changes:
 .../kafka/streams/processor/internals/RepartitionTopics.java  | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)