[kafka] branch 3.0 updated: KAFKA-13418: Support key updates with TLS 1.3 (#11966)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new edfd769 KAFKA-13418: Support key updates with TLS 1.3 (#11966) edfd769 is described below commit edfd769f426e5baaf94c379e23624ec82e3e80bb Author: Ismael Juma AuthorDate: Tue Mar 29 14:59:38 2022 -0700 KAFKA-13418: Support key updates with TLS 1.3 (#11966) Key updates with TLS 1.3 trigger code paths similar to renegotiation with TLS 1.2. Update the read/write paths not to throw an exception in this case (kept the exception in the `handshake` method). With the default configuration, key updates happen after 2^37 bytes are encrypted. There is a security property to adjust this configuration, but the change has to be done before it is used for the first time and it cannot be changed after that. As such, it is best done via a system test (filed KAFKA-13779). To validate the change, I wrote a unit test that forces key updates and manually ran a producer workload that produced more than 2^37 bytes. Both cases failed without these changes and pass with them. Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix and hence included them as a co-author of this change. Reviewers: Rajini Sivaram Co-authored-by: Shylaja Kokoori --- .../kafka/common/network/SslTransportLayer.java| 16 ++-- .../apache/kafka/common/network/SelectorTest.java | 5 -- .../kafka/common/network/SslSelectorTest.java | 44 ++- .../kafka/common/network/Tls12SelectorTest.java| 72 + .../kafka/common/network/Tls13SelectorTest.java| 92 ++ 5 files changed, 180 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index b9879ad..d276e99 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer { CLOSING } +private static final String TLS13 = "TLSv1.3"; + private final String channelId; private final SSLEngine sslEngine; private final SelectionKey key; @@ -446,7 +448,7 @@ public class SslTransportLayer implements TransportLayer { if (netWriteBuffer.hasRemaining()) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); else { -state = sslEngine.getSession().getProtocol().equals("TLSv1.3") ? State.POST_HANDSHAKE : State.READY; +state = sslEngine.getSession().getProtocol().equals(TLS13) ? State.POST_HANDSHAKE : State.READY; key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); SSLSession session = sslEngine.getSession(); log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'", @@ -578,10 +580,11 @@ public class SslTransportLayer implements TransportLayer { throw e; } netReadBuffer.compact(); -// handle ssl renegotiation. +// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && unwrapResult.getHandshakeStatus() != HandshakeStatus.FINISHED && -unwrapResult.getStatus() == Status.OK) { +unwrapResult.getStatus() == Status.OK && +!sslEngine.getSession().getProtocol().equals(TLS13)) { log.error("Renegotiation requested, but it is not supported, channelId {}, " + "appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} handshakeStatus {}", channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position(), unwrapResult.getHandshakeStatus()); @@ -699,9 +702,12 @@ public class SslTransportLayer implements TransportLayer { SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer); netWriteBuffer.flip(); -//handle ssl renegotiation -if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) +// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed +if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && +wrapResult.getStatus() == Status.OK && +
[kafka] branch 3.1 updated: KAFKA-13418: Support key updates with TLS 1.3 (#11966)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new e714cb5 KAFKA-13418: Support key updates with TLS 1.3 (#11966) e714cb5 is described below commit e714cb5dbb92bf3905550daca0053a18ec53426d Author: Ismael Juma AuthorDate: Tue Mar 29 14:59:38 2022 -0700 KAFKA-13418: Support key updates with TLS 1.3 (#11966) Key updates with TLS 1.3 trigger code paths similar to renegotiation with TLS 1.2. Update the read/write paths not to throw an exception in this case (kept the exception in the `handshake` method). With the default configuration, key updates happen after 2^37 bytes are encrypted. There is a security property to adjust this configuration, but the change has to be done before it is used for the first time and it cannot be changed after that. As such, it is best done via a system test (filed KAFKA-13779). To validate the change, I wrote a unit test that forces key updates and manually ran a producer workload that produced more than 2^37 bytes. Both cases failed without these changes and pass with them. Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix and hence included them as a co-author of this change. Reviewers: Rajini Sivaram Co-authored-by: Shylaja Kokoori --- .../kafka/common/network/SslTransportLayer.java| 16 ++-- .../apache/kafka/common/network/SelectorTest.java | 5 -- .../kafka/common/network/SslSelectorTest.java | 44 ++- .../kafka/common/network/Tls12SelectorTest.java| 72 + .../kafka/common/network/Tls13SelectorTest.java| 92 ++ 5 files changed, 180 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index b9879ad..d276e99 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer { CLOSING } +private static final String TLS13 = "TLSv1.3"; + private final String channelId; private final SSLEngine sslEngine; private final SelectionKey key; @@ -446,7 +448,7 @@ public class SslTransportLayer implements TransportLayer { if (netWriteBuffer.hasRemaining()) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); else { -state = sslEngine.getSession().getProtocol().equals("TLSv1.3") ? State.POST_HANDSHAKE : State.READY; +state = sslEngine.getSession().getProtocol().equals(TLS13) ? State.POST_HANDSHAKE : State.READY; key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); SSLSession session = sslEngine.getSession(); log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'", @@ -578,10 +580,11 @@ public class SslTransportLayer implements TransportLayer { throw e; } netReadBuffer.compact(); -// handle ssl renegotiation. +// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && unwrapResult.getHandshakeStatus() != HandshakeStatus.FINISHED && -unwrapResult.getStatus() == Status.OK) { +unwrapResult.getStatus() == Status.OK && +!sslEngine.getSession().getProtocol().equals(TLS13)) { log.error("Renegotiation requested, but it is not supported, channelId {}, " + "appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} handshakeStatus {}", channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position(), unwrapResult.getHandshakeStatus()); @@ -699,9 +702,12 @@ public class SslTransportLayer implements TransportLayer { SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer); netWriteBuffer.flip(); -//handle ssl renegotiation -if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) +// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed +if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && +wrapResult.getStatus() == Status.OK && +
[kafka] branch 3.2 updated: KAFKA-13418: Support key updates with TLS 1.3 (#11966)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new ecff741 KAFKA-13418: Support key updates with TLS 1.3 (#11966) ecff741 is described below commit ecff741c6d1b382aaf1a11b7a27631a449441fb8 Author: Ismael Juma AuthorDate: Tue Mar 29 14:59:38 2022 -0700 KAFKA-13418: Support key updates with TLS 1.3 (#11966) Key updates with TLS 1.3 trigger code paths similar to renegotiation with TLS 1.2. Update the read/write paths not to throw an exception in this case (kept the exception in the `handshake` method). With the default configuration, key updates happen after 2^37 bytes are encrypted. There is a security property to adjust this configuration, but the change has to be done before it is used for the first time and it cannot be changed after that. As such, it is best done via a system test (filed KAFKA-13779). To validate the change, I wrote a unit test that forces key updates and manually ran a producer workload that produced more than 2^37 bytes. Both cases failed without these changes and pass with them. Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix and hence included them as a co-author of this change. Reviewers: Rajini Sivaram Co-authored-by: Shylaja Kokoori --- .../kafka/common/network/SslTransportLayer.java| 16 ++-- .../apache/kafka/common/network/SelectorTest.java | 5 -- .../kafka/common/network/SslSelectorTest.java | 44 ++- .../kafka/common/network/Tls12SelectorTest.java| 72 + .../kafka/common/network/Tls13SelectorTest.java| 92 ++ 5 files changed, 180 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 893fd6a..844c2bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer { CLOSING } +private static final String TLS13 = "TLSv1.3"; + private final String channelId; private final SSLEngine sslEngine; private final SelectionKey key; @@ -449,7 +451,7 @@ public class SslTransportLayer implements TransportLayer { if (netWriteBuffer.hasRemaining()) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); else { -state = sslEngine.getSession().getProtocol().equals("TLSv1.3") ? State.POST_HANDSHAKE : State.READY; +state = sslEngine.getSession().getProtocol().equals(TLS13) ? State.POST_HANDSHAKE : State.READY; key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); SSLSession session = sslEngine.getSession(); log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'", @@ -585,10 +587,11 @@ public class SslTransportLayer implements TransportLayer { throw e; } netReadBuffer.compact(); -// handle ssl renegotiation. +// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && unwrapResult.getHandshakeStatus() != HandshakeStatus.FINISHED && -unwrapResult.getStatus() == Status.OK) { +unwrapResult.getStatus() == Status.OK && +!sslEngine.getSession().getProtocol().equals(TLS13)) { log.error("Renegotiation requested, but it is not supported, channelId {}, " + "appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} handshakeStatus {}", channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position(), unwrapResult.getHandshakeStatus()); @@ -706,9 +709,12 @@ public class SslTransportLayer implements TransportLayer { SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer); netWriteBuffer.flip(); -//handle ssl renegotiation -if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) +// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed +if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && +wrapResult.getStatus() == Status.OK && +
[kafka] branch trunk updated (f2aa0c4 -> 5aed178)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from f2aa0c4 MINOR: Disable SocketServerTest.closingChannelWithBufferedReceives and SocketServerTest.remoteCloseWithoutBufferedReceives (#11960) add 5aed178 KAFKA-13418: Support key updates with TLS 1.3 (#11966) No new revisions were added by this update. Summary of changes: .../kafka/common/network/SslTransportLayer.java| 16 ++-- .../apache/kafka/common/network/SelectorTest.java | 5 -- .../kafka/common/network/SslSelectorTest.java | 44 ++- .../kafka/common/network/Tls12SelectorTest.java| 72 + .../kafka/common/network/Tls13SelectorTest.java| 92 ++ 5 files changed, 180 insertions(+), 49 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java create mode 100644 clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
[kafka] branch trunk updated: MINOR: Disable SocketServerTest.closingChannelWithBufferedReceives and SocketServerTest.remoteCloseWithoutBufferedReceives (#11960)
This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f2aa0c4 MINOR: Disable SocketServerTest.closingChannelWithBufferedReceives and SocketServerTest.remoteCloseWithoutBufferedReceives (#11960) f2aa0c4 is described below commit f2aa0c439cb0045e6505d1bbdf8731a253ca Author: David Jacot AuthorDate: Tue Mar 29 14:31:12 2022 +0200 MINOR: Disable SocketServerTest.closingChannelWithBufferedReceives and SocketServerTest.remoteCloseWithoutBufferedReceives (#11960) This reverts commit d706d6cac4622153973d131417e809ee57c60de0. Reviewers: Bruno Cadonna --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 849646c..33d15ad1 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -1389,6 +1389,7 @@ class SocketServerTest { * buffered receive. */ @Test + @Disabled // TODO: re-enabled until KAFKA-13735 is fixed def remoteCloseWithoutBufferedReceives(): Unit = { verifyRemoteCloseWithBufferedReceives(numComplete = 0, hasIncomplete = false) } @@ -1426,6 +1427,7 @@ class SocketServerTest { * The channel must be closed after pending receives are processed. */ @Test + @Disabled // TODO: re-enable after KAFKA-13736 is fixed def closingChannelWithBufferedReceives(): Unit = { verifyRemoteCloseWithBufferedReceives(numComplete = 3, hasIncomplete = false, makeClosing = true) }
[kafka] branch 3.2 updated: KAFKA-6718: Add documentation for KIP-708 (#11923)
This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new 3143e7f KAFKA-6718: Add documentation for KIP-708 (#11923) 3143e7f is described below commit 3143e7f13e9a1e03e9fd35d6cc528e6ec4a3f106 Author: Levani Kokhreidze AuthorDate: Tue Mar 29 15:08:51 2022 +0300 KAFKA-6718: Add documentation for KIP-708 (#11923) Adds documentation for KIP-708: Rack awareness for Kafka Streams Co-authored-by: Bruno Cadonna Reviewers: Luke Chen , Bruno Cadonna --- .../org/apache/kafka/common/config/ConfigDef.java | 9 +++-- .../apache/kafka/common/config/ConfigDefTest.java | 5 +++ docs/streams/architecture.html | 6 docs/streams/developer-guide/config-streams.html | 42 ++ 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 8c91a25..9331f99 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.common.config; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.utils.Utils; @@ -33,8 +31,10 @@ import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * This class is used for specifying the set of expected configurations. For each configuration, you can specify @@ -1140,6 +1140,11 @@ public class ConfigDef { throw new ConfigException(name, value, "exceeds maximum list size of [" + maxSize + "]."); } } + +@Override +public String toString() { +return "List containing maximum of " + maxSize + " elements"; +} } public static class ConfigKey { diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 0e5af1f..76c20df 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -758,4 +758,9 @@ public class ConfigDefTest { "lst doc")); } +@Test +public void testListSizeValidatorToString() { +assertEquals("List containing maximum of 5 elements", ListSize.atMostOfSize(5).toString()); +} + } diff --git a/docs/streams/architecture.html b/docs/streams/architecture.html index a1773c5..e561231 100644 --- a/docs/streams/architecture.html +++ b/docs/streams/architecture.html @@ -161,6 +161,12 @@ Starting in 2.6, Kafka Streams will guarantee that a task is only ever assigned to an instance with a fully caught-up local copy of the state, if such an instance exists. Standby tasks will increase the likelihood that a caught-up instance exists in the case of a failure. + +You can also configure standby replicas with rack awareness. When configured, Kafka Streams will attempt to +distribute a standby task on a different "rack" than the active one, thus having a faster recovery time when the +rack of the active tasks fails. See rack.aware.assignment.tags +in the Kafka Streams Developer Guide section. + Previous diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index dd9298d..0aee6b6 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -84,6 +84,7 @@ settings.put(... , ...); partition.grouper probing.rebalance.interval.ms processing.guarantee + rack.aware.assignment.tags replication.factor rocksdb.config.setter state.dir @@ -383,6 +384,13 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1); The amount of time in milliseconds to block waiting for input. 100 milliseconds + rack.aware.assignment.tags +Medium +List of tag keys used to distribute standby replicas across Kafka Streams + clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over + clients with different tag values. +the empty list +
[kafka] branch trunk updated (db2485c -> 35ae4f2)
This is an automated email from the ASF dual-hosted git repository. cadonna pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from db2485c KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942) add 35ae4f2 KAFKA-6718: Add documentation for KIP-708 (#11923) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/common/config/ConfigDef.java | 9 +++-- .../apache/kafka/common/config/ConfigDefTest.java | 5 +++ docs/streams/architecture.html | 6 docs/streams/developer-guide/config-streams.html | 42 ++ 4 files changed, 60 insertions(+), 2 deletions(-)
[kafka] branch 3.2 updated: KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942)
This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new f3eab7b KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942) f3eab7b is described below commit f3eab7b827b96fc69b4679b0e11656e00af776b7 Author: bozhao12 <102274736+bozha...@users.noreply.github.com> AuthorDate: Tue Mar 29 16:13:05 2022 +0800 KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942) When a replica selector is configured, the partition leader computes a preferred read replica for any fetch from the consumers. When the preferred read replica is not the leader, the leader returns the preferred read replica with `FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)` to the `ReplicaManager`. This causes the fetch to go into in the fetch purgatory because the exit conditions are not met. In turns out that the delayed fetch is not completed until [...] This patch fixes the issue by completing the fetch request immediately when a preferred read replica is defined. Reviewers: David Jacot --- .../main/scala/kafka/server/ReplicaManager.scala | 8 ++- .../unit/kafka/server/ReplicaManagerTest.scala | 76 -- 2 files changed, 75 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 42124aa..4b77a4a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1028,15 +1028,17 @@ class ReplicaManager(val config: KafkaConfig, var bytesReadable: Long = 0 var errorReadingData = false var hasDivergingEpoch = false +var hasPreferredReadReplica = false val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult] logReadResults.foreach { case (topicIdPartition, logReadResult) => brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark() brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() - if (logReadResult.error != Errors.NONE) errorReadingData = true if (logReadResult.divergingEpoch.nonEmpty) hasDivergingEpoch = true + if (logReadResult.preferredReadReplica.nonEmpty) +hasPreferredReadReplica = true bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes logReadResultMap.put(topicIdPartition, logReadResult) } @@ -1046,7 +1048,9 @@ class ReplicaManager(val config: KafkaConfig, //3) has enough data to respond //4) some error happens while reading data //5) we found a diverging epoch -if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) { +//6) has a preferred read replica +if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || + hasDivergingEpoch || hasPreferredReadReplica) { val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = isFromFollower && isAddingReplica(tp.topicPartition, replicaId) tp -> result.toFetchPartitionData(isReassignmentFetch) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index dd644c8..a17c70b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -1265,7 +1265,7 @@ class ReplicaManagerTest { initializeLogAndTopicId(replicaManager, tp0, topicId) - // Make this replica the follower + // Make this replica the leader val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, Seq(new LeaderAndIsrPartitionState() .setTopicName(topic) @@ -1281,14 +1281,14 @@ class ReplicaManagerTest { Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) - val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + val metadata = new DefaultClientMetadata("rack-a", "client-id", InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") val consumerResult = fetchAsConsumer(replicaManager, tidp0, new PartitionData(Uuid.ZERO_UUID, 0, 0, 10, Optional.empty()), clientMetadata = Some(metadata)) - // Fetch from follower
[kafka] branch trunk updated (19a6269 -> db2485c)
This is an automated email from the ASF dual-hosted git repository. dajac pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 19a6269 MINOR: Fix log4j entry in RepartitionTopics (#11958) add db2485c KAFKA-13767; Fetch from consumers should return immediately when preferred read replica is defined by the leader (#11942) No new revisions were added by this update. Summary of changes: .../main/scala/kafka/server/ReplicaManager.scala | 8 ++- .../unit/kafka/server/ReplicaManagerTest.scala | 76 -- 2 files changed, 75 insertions(+), 9 deletions(-)
[kafka] branch trunk updated (2a27059 -> 19a6269)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 2a27059 MINOR: Improved display names for parameterized KRaft and ZK tests (#11957) add 19a6269 MINOR: Fix log4j entry in RepartitionTopics (#11958) No new revisions were added by this update. Summary of changes: .../kafka/streams/processor/internals/RepartitionTopics.java | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-)