[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846950#comment-17846950 ] Chia-Ping Tsai commented on KAFKA-16774: [~cadonna] thanks for your confirm. Please feel free to submit your solution/patch for it :) > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16782) Some partition's segments are suddenly not deleted anymore
Roland Sommer created KAFKA-16782: - Summary: Some partition's segments are suddenly not deleted anymore Key: KAFKA-16782 URL: https://issues.apache.org/jira/browse/KAFKA-16782 Project: Kafka Issue Type: Bug Reporter: Roland Sommer I recently discovered an odd behaviour in one of our kafka clusters (KRaft-based, v3.7.0): We have a topic for distributed log collection with 48 partitions. Retention is set to 84 hours, we have the default {{cleanup.policy=delete}} in place. For all but two partitions this works as expected. In two partition directories there are files going back to january and consuming the specific partitions yields data from january (showing it's not only the files lying around, they are actually processed). Topic settings as per {{kafka-topics.sh --describe}}: {{Topic: syslog TopicId: AeJLnYPnQFOtMc0ZjpH7sw PartitionCount: 48 ReplicationFactor: 2 Configs: compression.type=snappy,cleanup.policy=delete,segment.bytes=1073741824,retention.ms=30240,max.message.bytes=2097152}} Searching the cluster logs, there is no indicator of what could be the reason here (at least I did not spot anything suspicious up until now). Up to the time were deletion stopped, there are log entries showing the deleteion of old log segments, but that simply stopped. As far as I can see, there has not been any change on the cluster at that point. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846940#comment-17846940 ] Bruno Cadonna commented on KAFKA-16774: --- [~chia7712] Thank you for your analysis! Without looking at your comment, I came to the same conclusion about the cause of the flakiness. I was puzzled at first because {{allTasks()}} should only be called by the stream thread, so I did not understand were the concurrency comes from. Then I took a closer look at the test and then I saw that the stream thread is started in the test which explains the concurrency. > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15541) Improved StateStore Iterator metrics for detecting leaks
[ https://issues.apache.org/jira/browse/KAFKA-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-15541: - Summary: Improved StateStore Iterator metrics for detecting leaks (was: RocksDB Iterator Metrics) > Improved StateStore Iterator metrics for detecting leaks > > > Key: KAFKA-15541 > URL: https://issues.apache.org/jira/browse/KAFKA-15541 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > Labels: kip, kip-required > > [KIP-989: RocksDB Iterator > Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics] > RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due > to [blocks being "pinned" > in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators]. > Pinned blocks can currently be tracked via the per-store > {{block-cache-pinned-usage}} metric. However, it's common [(and even > recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb] > to share the Block Cache among all stores in an application, to enable users > to globally bound native memory used by RocksDB. This results in the > {{block-cache-pinned-usage}} reporting the same memory usage for every store > in the application, irrespective of which store is actually pinning blocks in > the block cache. > To aid users in finding leaked Iterators, as well as identifying the cause of > a high number of pinned blocks, we introduce two new metrics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846912#comment-17846912 ] Chia-Ping Tsai edited comment on KAFKA-16774 at 5/16/24 11:11 AM: -- [~mjsax] nice question. It seems to me that is a flaky. IIRC, `onPartitionsAssigned` should be executed by thread same to the one calling consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is called by junit tread [1] and so it causes race condition. We can fix it by returning a copy of `pendingTasksToInit` [2] with sync. That is similar to `allTasksPerId` [3]. WDYT? [0] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147 [1] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408 [2] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112 [3] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302 was (Author: chia7712): [~mjsax] nice question. It seems to me that is a flaky. IIRC, `onPartitionsAssigned` should be executed by thread same to the one calling consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is called by junit tread [1] and so it causes race condition. We can fix it by returning a copy of `pendingTasksToInit` [2]. That is similar to `allTasksPerId` [3]. WDYT? [0] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147 [1] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408 [2] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112 [3] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302 > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846912#comment-17846912 ] Chia-Ping Tsai commented on KAFKA-16774: [~mjsax] nice question. It seems to me that is a flaky. IIRC, `onPartitionsAssigned` should be executed by thread same to the one calling consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is called by junit tread [1] and so it causes race condition. We can fix it by returning a copy of `pendingTasksToInit` [2]. That is similar to `allTasksPerId` [3]. WDYT? [0] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147 [1] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408 [2] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112 [3] https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302 > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16668) Enable to set tags by `ClusterTest`
[ https://issues.apache.org/jira/browse/KAFKA-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16668. Fix Version/s: 3.8.0 Resolution: Fixed > Enable to set tags by `ClusterTest` > > > Key: KAFKA-16668 > URL: https://issues.apache.org/jira/browse/KAFKA-16668 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Johnny Hsu >Priority: Minor > Fix For: 3.8.0 > > > Currently, the display name can be customized by only `name` > (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42). > However, the "key" is hard-code to "name=xxx". Also, it is impossible to set > more "tags" for display name. > https://github.com/apache/kafka/pull/15766 is a example that we want to add > "xxx=bbb" to display name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16781) Expose advertised.listeners in controller node
[ https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16781: -- Assignee: TengYao Chi > Expose advertised.listeners in controller node > -- > > Key: KAFKA-16781 > URL: https://issues.apache.org/jira/browse/KAFKA-16781 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: TengYao Chi >Priority: Major > Labels: need-kip, newbie, newbie++ > > After > [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration], > we allow clients to talk to the KRaft controller node directly. But unlike > broker node, we don't allow users to config advertised.listeners for clients > to connect to. Without this config, the client cannot connect to the > controller node if the controller is sitting behind NAT network while the > client is in the external network. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16781) Expose advertised.listeners in controller node
[ https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846890#comment-17846890 ] TengYao Chi commented on KAFKA-16781: - I will handle this issue :) > Expose advertised.listeners in controller node > -- > > Key: KAFKA-16781 > URL: https://issues.apache.org/jira/browse/KAFKA-16781 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Major > Labels: need-kip, newbie, newbie++ > > After > [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration], > we allow clients to talk to the KRaft controller node directly. But unlike > broker node, we don't allow users to config advertised.listeners for clients > to connect to. Without this config, the client cannot connect to the > controller node if the controller is sitting behind NAT network while the > client is in the external network. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16539) Can't update specific broker configs in pre-migration mode
[ https://issues.apache.org/jira/browse/KAFKA-16539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16539. Resolution: Fixed > Can't update specific broker configs in pre-migration mode > -- > > Key: KAFKA-16539 > URL: https://issues.apache.org/jira/browse/KAFKA-16539 > Project: Kafka > Issue Type: Bug > Components: config, kraft >Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2 >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > In migration mode, ZK brokers will have a forwarding manager configured. This > is used to forward requests to the KRaft controller once we get to that part > of the migration. However, prior to KRaft taking over as the controller > (known as pre-migration mode), the ZK brokers are still attempting to forward > IncrementalAlterConfigs to the controller. > This works fine for cluster level configs (e.g., "-entity-type broker > --entity-default"), but this fails for specific broker configs (e.g., > "-entity-type broker --entity-id 1"). > This affects BROKER and BROKER_LOGGER config types. > To workaround this bug, you can either disable migrations on the brokers > (assuming no migration has taken place), or proceed with the migration and > get to the point where KRaft is the controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16539) Can't update specific broker configs in pre-migration mode
[ https://issues.apache.org/jira/browse/KAFKA-16539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16539: --- Fix Version/s: (was: 3.6.3) > Can't update specific broker configs in pre-migration mode > -- > > Key: KAFKA-16539 > URL: https://issues.apache.org/jira/browse/KAFKA-16539 > Project: Kafka > Issue Type: Bug > Components: config, kraft >Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2 >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > In migration mode, ZK brokers will have a forwarding manager configured. This > is used to forward requests to the KRaft controller once we get to that part > of the migration. However, prior to KRaft taking over as the controller > (known as pre-migration mode), the ZK brokers are still attempting to forward > IncrementalAlterConfigs to the controller. > This works fine for cluster level configs (e.g., "-entity-type broker > --entity-default"), but this fails for specific broker configs (e.g., > "-entity-type broker --entity-id 1"). > This affects BROKER and BROKER_LOGGER config types. > To workaround this bug, you can either disable migrations on the brokers > (assuming no migration has taken place), or proceed with the migration and > get to the point where KRaft is the controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16781) Expose advertised.listeners in controller node
Luke Chen created KAFKA-16781: - Summary: Expose advertised.listeners in controller node Key: KAFKA-16781 URL: https://issues.apache.org/jira/browse/KAFKA-16781 Project: Kafka Issue Type: Improvement Reporter: Luke Chen After [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration], we allow clients to talk to the KRaft controller node directly. But unlike broker node, we don't allow users to config advertised.listeners for clients to connect to. Without this config, the client cannot connect to the controller node if the controller is sitting behind NAT network while the client is in the external network. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16781) Expose advertised.listeners in controller node
[ https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16781: -- Labels: need-kip newbie newbie++ (was: ) > Expose advertised.listeners in controller node > -- > > Key: KAFKA-16781 > URL: https://issues.apache.org/jira/browse/KAFKA-16781 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Major > Labels: need-kip, newbie, newbie++ > > After > [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration], > we allow clients to talk to the KRaft controller node directly. But unlike > broker node, we don't allow users to config advertised.listeners for clients > to connect to. Without this config, the client cannot connect to the > controller node if the controller is sitting behind NAT network while the > client is in the external network. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)
[ https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846880#comment-17846880 ] Loïc Greffier commented on KAFKA-16448: --- [~mjsax] - You can assign it to me > Add Kafka Streams exception handler for exceptions occuring during processing > (KIP-1033) > > > Key: KAFKA-16448 > URL: https://issues.apache.org/jira/browse/KAFKA-16448 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina > Priority: Minor > > Jira to follow work on KIP: [KIP-1033: Add Kafka Streams exception handler > for exceptions occuring during > processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic
[ https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-16780: - Description: h3. Logic to read aborted txns: # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a non-txn topic, then the broker has to [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] all the local log segments to collect the aborted transactions since there won't be any entry in the transaction index. # The same [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] is applied while reading from remote storage. In this case, when the FETCH request is reading data from the first remote log segment, then it has to fetch the transaction indexes of all the remaining remote-log segments, and then the call lands to the local-log segments before responding to the FETCH request which increases the time taken to serve the requests. The [EoS Abort Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] design doc explains how the transaction index file filters out the aborted transaction records. The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation level but read the non-txn topics. If the topic is enabled with the transaction, then we expect the transaction to either commit/rollback within 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to search only a few remote log segments to collect the aborted txns. was: h3. Logic to read aborted txns: # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a non-txn topic, then the broker has to [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] all the local log segments to collect the aborted transactions since there won't be any entry in the transaction index. # The same [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] is applied while reading from remote storage. In this case, when the FETCH request is reading data from the first remote log segment, then it has to fetch the transaction indexes of all the remaining remote-log segments, and then the call lands to the local-log segments before responding to the FETCH request which increases the time taken to serve the requests. The [EoS Abort Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] design doc explains how the transaction index file filters out the aborted transaction records. The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation level but read the normal topics. If the topic is enabled with the transaction, then we expect the transaction to either commit/rollback within 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to search only a few remote log segments to collect the aborted txns. > Txn consumer exerts pressure on remote storage when reading non-txn topic > - > > Key: KAFKA-16780 > URL: https://issues.apache.org/jira/browse/KAFKA-16780 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Priority: Major > > h3. Logic to read aborted txns: > # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads > a non-txn topic, then the broker has to > [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] > all the local log segments to collect the aborted transactions since there > won't be any entry in the transaction index. > # The same > [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] > is applied while reading from remote storage. In this case, when the FETCH > request is reading data from the first remote log segment, then it has to > fetch the transaction indexes of all the remaining remote-log segments, and > then the call lands to the local-log segments before responding to the FETCH > request which increases the time taken to serve the requests. > The [EoS Abort > Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] > design doc explains how the transaction index file filters out the aborted > transaction records. > The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation > level but read the non-txn topics. If the topic is enabled with the > transaction, then we expect the transaction to either commit/rollback within > 15 minutes (defau
[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic
[ https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-16780: - Description: h3. Logic to read aborted txns: # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a non-txn topic, then the broker has to [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] all the local log segments to collect the aborted transactions since there won't be any entry in the transaction index. # The same [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] is applied while reading from remote storage. In this case, when the FETCH request is reading data from the first remote log segment, then it has to fetch the transaction indexes of all the remaining remote-log segments, and then the call lands to the local-log segments before responding to the FETCH request which increases the time taken to serve the requests. The [EoS Abort Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] design doc explains how the transaction index file filters out the aborted transaction records. The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation level but read the normal topics. If the topic is enabled with the transaction, then we expect the transaction to either commit/rollback within 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to search only a few remote log segments to collect the aborted txns. was: h3. Logic to read aborted txns: # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a non-txn topic, then the broker has to [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] all the local log segments to collect the aborted transactions since there won't be any entry in the transaction index. # The same [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] is applied while reading from remote storage. In this case, when the FETCH request is reading data from the first remote log segment, then it has to fetch the transaction indexes of all the remaining remote-log segments, and then the call lands to the local-log segments before responding to the FETCH request which increases the time taken to serve the requests. The [EoS Abort Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] design doc explains how the transaction index file filters out the aborted transaction records. The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation level but read the normal topics. If the topic is enabled with the transaction, then we expect the transaction to either commit/rollback within 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to search only few remote log segments to collect the aborted txns. > Txn consumer exerts pressure on remote storage when reading non-txn topic > - > > Key: KAFKA-16780 > URL: https://issues.apache.org/jira/browse/KAFKA-16780 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Priority: Major > > h3. Logic to read aborted txns: > # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads > a non-txn topic, then the broker has to > [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] > all the local log segments to collect the aborted transactions since there > won't be any entry in the transaction index. > # The same > [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] > is applied while reading from remote storage. In this case, when the FETCH > request is reading data from the first remote log segment, then it has to > fetch the transaction indexes of all the remaining remote-log segments, and > then the call lands to the local-log segments before responding to the FETCH > request which increases the time taken to serve the requests. > The [EoS Abort > Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] > design doc explains how the transaction index file filters out the aborted > transaction records. > The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation > level but read the normal topics. If the topic is enabled with the > transaction, then we expect the transaction to either commit/rollback within > 15 minutes (default t
[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic
[ https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-16780: - Description: h3. Logic to read aborted txns: # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a non-txn topic, then the broker has to [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] all the local log segments to collect the aborted transactions since there won't be any entry in the transaction index. # The same [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] is applied while reading from remote storage. In this case, when the FETCH request is reading data from the first remote log segment, then it has to fetch the transaction indexes of all the remaining remote-log segments, and then the call lands to the local-log segments before responding to the FETCH request which increases the time taken to serve the requests. The [EoS Abort Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] design doc explains how the transaction index file filters out the aborted transaction records. The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation level but read the normal topics. If the topic is enabled with the transaction, then we expect the transaction to either commit/rollback within 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to search only few remote log segments to collect the aborted txns. was: h3. Logic to read aborted txns: # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a non-txn topic, then the broker has to [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] all the local log segments to collect the aborted transactions since there won't be any entry in the transaction index. # The same [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] is applied while reading from remote storage. In this case, when the FETCH request is reading data from the first remote log segment, then it has to fetch the transaction indexes of all the remaining remote-log segments, and then the call lands to the local-log segments before responding to the FETCH request which increases the time taken to serve the requests. The [EoS Abort Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] design doc explains how the transaction index file filters out the aborted transaction records. The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation level but read the normal topics. If the topic is enabled with the transaction, then we expect the transaction to either commit/rollback within 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to search only for one (or) two remote log segments. > Txn consumer exerts pressure on remote storage when reading non-txn topic > - > > Key: KAFKA-16780 > URL: https://issues.apache.org/jira/browse/KAFKA-16780 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Priority: Major > > h3. Logic to read aborted txns: > # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads > a non-txn topic, then the broker has to > [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] > all the local log segments to collect the aborted transactions since there > won't be any entry in the transaction index. > # The same > [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] > is applied while reading from remote storage. In this case, when the FETCH > request is reading data from the first remote log segment, then it has to > fetch the transaction indexes of all the remaining remote-log segments, and > then the call lands to the local-log segments before responding to the FETCH > request which increases the time taken to serve the requests. > The [EoS Abort > Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] > design doc explains how the transaction index file filters out the aborted > transaction records. > The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation > level but read the normal topics. If the topic is enabled with the > transaction, then we expect the transaction to either commit/rollback within > 15 minutes (default transaction.max.tim
[jira] [Created] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic
Kamal Chandraprakash created KAFKA-16780: Summary: Txn consumer exerts pressure on remote storage when reading non-txn topic Key: KAFKA-16780 URL: https://issues.apache.org/jira/browse/KAFKA-16780 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash h3. Logic to read aborted txns: # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a non-txn topic, then the broker has to [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394] all the local log segments to collect the aborted transactions since there won't be any entry in the transaction index. # The same [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436] is applied while reading from remote storage. In this case, when the FETCH request is reading data from the first remote log segment, then it has to fetch the transaction indexes of all the remaining remote-log segments, and then the call lands to the local-log segments before responding to the FETCH request which increases the time taken to serve the requests. The [EoS Abort Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc] design doc explains how the transaction index file filters out the aborted transaction records. The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation level but read the normal topics. If the topic is enabled with the transaction, then we expect the transaction to either commit/rollback within 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to search only for one (or) two remote log segments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16771) First log directory printed twice when formatting storage
[ https://issues.apache.org/jira/browse/KAFKA-16771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846858#comment-17846858 ] Chia-Ping Tsai commented on KAFKA-16771: [~gongxuanzhang] I have assigned this ticket to you > First log directory printed twice when formatting storage > - > > Key: KAFKA-16771 > URL: https://issues.apache.org/jira/browse/KAFKA-16771 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Assignee: xuanzhang gong >Priority: Major > > If multiple log directories are set, when running bin/kafka-storage.sh > format, the first directory is printed twice. For example: > {noformat} > bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c > config/kraft/server.properties --release-version 3.6 > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY}) > Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. > Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. > Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2. > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16771) First log directory printed twice when formatting storage
[ https://issues.apache.org/jira/browse/KAFKA-16771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16771: -- Assignee: xuanzhang gong > First log directory printed twice when formatting storage > - > > Key: KAFKA-16771 > URL: https://issues.apache.org/jira/browse/KAFKA-16771 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Assignee: xuanzhang gong >Priority: Major > > If multiple log directories are set, when running bin/kafka-storage.sh > format, the first directory is printed twice. For example: > {noformat} > bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c > config/kraft/server.properties --release-version 3.6 > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY}) > Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. > Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. > Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2. > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16779) Kafka retains logs past specified retention
[ https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846853#comment-17846853 ] Lin Siyuan commented on KAFKA-16779: I'm very sorry,Nicholas Feinberg. I misinterpreted the description. I've rolled back. > Kafka retains logs past specified retention > --- > > Key: KAFKA-16779 > URL: https://issues.apache.org/jira/browse/KAFKA-16779 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Nicholas Feinberg >Priority: Major > Labels: expiration, retention > Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, > kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, > state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz > > > In a Kafka cluster with all topics set to four days of retention or longer > (34560ms), most brokers seem to be retaining six days of data. > This is true even for topics which have high throughput (500MB/s, 50k msgs/s) > and thus are regularly rolling new log segments. We observe this unexpectedly > high retention both via disk usage statistics and by requesting the oldest > available messages from Kafka. > Some of these brokers crashed with an 'mmap failed' error (attached). When > those brokers started up again, they returned to the expected four days of > retention. > Manually restarting brokers also seems to cause them to return to four days > of retention. Demoting and promoting brokers only has this effect on a small > part of the data hosted on a broker. > These hosts had ~170GiB of free memory available. We saw no signs of pressure > on either system or JVM heap memory before or after they reported this error. > Committed memory seems to be around 10%, so this doesn't seem to be an > overcommit issue. > This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). > Prior to the upgrade, it was running on Kafka 2.4. > We last reduced retention for ops on May 7th, after which we restored > retention to our default of four days. This was the second time we've > temporarily reduced and restored retention since the upgrade. This problem > did not manifest the previous time we did so, nor did it manifest on our > other Kafka 3.7 clusters. > We are running on AWS > [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We > have 23 brokers, each with 24 disks. We're running in a JBOD configuration > (i.e. unraided). > Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, > we're still using Zookeeper. > Sample broker logs are attached. The 05-12 and 05-14 logs are from separate > hosts. Please let me know if I can provide any further information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846852#comment-17846852 ] Matej Sprysl commented on KAFKA-15242: -- Hi Matthias, did not find the issue you are mentioning, thanks for pointing that out! The other issue is a subset of this one, not sure if adding a `MockFixedKeyProcessorContext` would fully solve this one. > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16779) Kafka retains logs past specified retention
[ https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lin Siyuan updated KAFKA-16779: --- Description: In a Kafka cluster with all topics set to four days of retention or longer (34560ms), most brokers seem to be retaining six days of data. This is true even for topics which have high throughput (500MB/s, 50k msgs/s) and thus are regularly rolling new log segments. We observe this unexpectedly high retention both via disk usage statistics and by requesting the oldest available messages from Kafka. Some of these brokers crashed with an 'mmap failed' error (attached). When those brokers started up again, they returned to the expected four days of retention. Manually restarting brokers also seems to cause them to return to four days of retention. Demoting and promoting brokers only has this effect on a small part of the data hosted on a broker. These hosts had ~170GiB of free memory available. We saw no signs of pressure on either system or JVM heap memory before or after they reported this error. Committed memory seems to be around 10%, so this doesn't seem to be an overcommit issue. This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). Prior to the upgrade, it was running on Kafka 2.4. We last reduced retention for ops on May 7th, after which we restored retention to our default of four days. This was the second time we've temporarily reduced and restored retention since the upgrade. This problem did not manifest the previous time we did so, nor did it manifest on our other Kafka 3.7 clusters. We are running on AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We have 23 brokers, each with 24 disks. We're running in a JBOD configuration (i.e. unraided). Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, we're still using Zookeeper. Sample broker logs are attached. The 05-12 and 05-14 logs are from separate hosts. Please let me know if I can provide any further information. was: 在将所有主题设置为保留四天或更长时间(34560毫秒)的 Kafka 集群中,大多数代理似乎保留了六天的数据。 即使对于具有高吞吐量(500MB/s,50k 消息/秒)的主题也是如此,因此会定期滚动新的日志段。我们通过磁盘使用情况统计信息和从 Kafka 请求最早的可用消息来观察到这种出乎意料的高保留率。 其中一些代理崩溃并显示“mmap 失败”错误(已附)。当这些经纪人再次启动时,他们又回到了预期的四天保留期。 手动重新启动代理似乎也会导致它们恢复到四天的保留期。降级和提升代理仅对代理上托管的一小部分数据产生此影响。 这些主机有 ~170GiB 的可用内存。在他们报告此错误之前或之后,我们没有看到系统或 JVM 堆内存受到压力的迹象。提交的内存似乎在 10% 左右,因此这似乎不是过度提交问题。 这个 Kafka 集群在两周前(4 月 29 日)升级到了 Kafka 3.7。在升级之前,它运行在 Kafka 2.4 上。 我们上次在 5 月 7 日减少了 ops 的保留期,之后我们将保留期恢复到默认的 4 天。这是自升级以来,我们第二次暂时减少和恢复保留期。这个问题在我们之前这样做时没有表现出来,也没有在我们的其他 Kafka 3.7 集群上表现出来。 我们在 AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] 主机上运行。我们有 23 个代理,每个代理有 24 个磁盘。我们在 JBOD 配置中运行(即未突袭)。 由于此集群是从 Kafka 2.4 升级而来的,并且由于我们使用的是 JBOD,因此我们仍然使用 Zookeeper。 附加了示例代理日志。05-12 和 05-14 日志来自不同的主机。如果我能提供任何进一步的信息,请告诉我。 > Kafka retains logs past specified retention > --- > > Key: KAFKA-16779 > URL: https://issues.apache.org/jira/browse/KAFKA-16779 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Nicholas Feinberg >Priority: Major > Labels: expiration, retention > Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, > kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, > state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz > > > In a Kafka cluster with all topics set to four days of retention or longer > (34560ms), most brokers seem to be retaining six days of data. > This is true even for topics which have high throughput (500MB/s, 50k msgs/s) > and thus are regularly rolling new log segments. We observe this unexpectedly > high retention both via disk usage statistics and by requesting the oldest > available messages from Kafka. > Some of these brokers crashed with an 'mmap failed' error (attached). When > those brokers started up again, they returned to the expected four days of > retention. > Manually restarting brokers also seems to cause them to return to four days > of retention. Demoting and promoting brokers only has this effect on a small > part of the data hosted on a broker. > These hosts had ~170GiB of free memory available. We saw no signs of pressure > on either system or JVM heap memory before or after they reported this error. > Committed memory seems to be around 10%, so this doesn't seem to be an > overcommit issue. > This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). > Prior to the upgrade, it was running on Kafka 2.4. > We last reduced retention for ops on May 7th, after which we restored > retention to our default of four days. This was the second time we've > temporarily reduced
[jira] [Updated] (KAFKA-16779) Kafka retains logs past specified retention
[ https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lin Siyuan updated KAFKA-16779: --- Description: 在将所有主题设置为保留四天或更长时间(34560毫秒)的 Kafka 集群中,大多数代理似乎保留了六天的数据。 即使对于具有高吞吐量(500MB/s,50k 消息/秒)的主题也是如此,因此会定期滚动新的日志段。我们通过磁盘使用情况统计信息和从 Kafka 请求最早的可用消息来观察到这种出乎意料的高保留率。 其中一些代理崩溃并显示“mmap 失败”错误(已附)。当这些经纪人再次启动时,他们又回到了预期的四天保留期。 手动重新启动代理似乎也会导致它们恢复到四天的保留期。降级和提升代理仅对代理上托管的一小部分数据产生此影响。 这些主机有 ~170GiB 的可用内存。在他们报告此错误之前或之后,我们没有看到系统或 JVM 堆内存受到压力的迹象。提交的内存似乎在 10% 左右,因此这似乎不是过度提交问题。 这个 Kafka 集群在两周前(4 月 29 日)升级到了 Kafka 3.7。在升级之前,它运行在 Kafka 2.4 上。 我们上次在 5 月 7 日减少了 ops 的保留期,之后我们将保留期恢复到默认的 4 天。这是自升级以来,我们第二次暂时减少和恢复保留期。这个问题在我们之前这样做时没有表现出来,也没有在我们的其他 Kafka 3.7 集群上表现出来。 我们在 AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] 主机上运行。我们有 23 个代理,每个代理有 24 个磁盘。我们在 JBOD 配置中运行(即未突袭)。 由于此集群是从 Kafka 2.4 升级而来的,并且由于我们使用的是 JBOD,因此我们仍然使用 Zookeeper。 附加了示例代理日志。05-12 和 05-14 日志来自不同的主机。如果我能提供任何进一步的信息,请告诉我。 was: In a Kafka cluster with all topics set to four days of retention or longer (34560ms), most brokers seem to be retaining six days of data. This is true even for topics which have high throughput (500MB/s, 50k msgs/s) and thus are regularly rolling new log segments. We observe this unexpectedly high retention both via disk usage statistics and by requesting the oldest available messages from Kafka. Some of these brokers crashed with an 'mmap failed' error (attached). When those brokers started up again, they returned to the expected four days of retention. Manually restarting brokers also seems to cause them to return to four days of retention. Demoting and promoting brokers only has this effect on a small part of the data hosted on a broker. These hosts had ~170GiB of free memory available. We saw no signs of pressure on either system or JVM heap memory before or after they reported this error. Committed memory seems to be around 10%, so this doesn't seem to be an overcommit issue. This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). Prior to the upgrade, it was running on Kafka 2.4. We last reduced retention for ops on May 7th, after which we restored retention to our default of four days. This was the second time we've temporarily reduced and restored retention since the upgrade. This problem did not manifest the previous time we did so, nor did it manifest on our other Kafka 3.7 clusters. We are running on AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We have 23 brokers, each with 24 disks. We're running in a JBOD configuration (i.e. unraided). Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, we're still using Zookeeper. Sample broker logs are attached. The 05-12 and 05-14 logs are from separate hosts. Please let me know if I can provide any further information. > Kafka retains logs past specified retention > --- > > Key: KAFKA-16779 > URL: https://issues.apache.org/jira/browse/KAFKA-16779 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Nicholas Feinberg >Priority: Major > Labels: expiration, retention > Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, > kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, > state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz > > > 在将所有主题设置为保留四天或更长时间(34560毫秒)的 Kafka 集群中,大多数代理似乎保留了六天的数据。 > 即使对于具有高吞吐量(500MB/s,50k 消息/秒)的主题也是如此,因此会定期滚动新的日志段。我们通过磁盘使用情况统计信息和从 Kafka > 请求最早的可用消息来观察到这种出乎意料的高保留率。 > 其中一些代理崩溃并显示“mmap 失败”错误(已附)。当这些经纪人再次启动时,他们又回到了预期的四天保留期。 > 手动重新启动代理似乎也会导致它们恢复到四天的保留期。降级和提升代理仅对代理上托管的一小部分数据产生此影响。 > 这些主机有 ~170GiB 的可用内存。在他们报告此错误之前或之后,我们没有看到系统或 JVM 堆内存受到压力的迹象。提交的内存似乎在 10% > 左右,因此这似乎不是过度提交问题。 > 这个 Kafka 集群在两周前(4 月 29 日)升级到了 Kafka 3.7。在升级之前,它运行在 Kafka 2.4 上。 > 我们上次在 5 月 7 日减少了 ops 的保留期,之后我们将保留期恢复到默认的 4 > 天。这是自升级以来,我们第二次暂时减少和恢复保留期。这个问题在我们之前这样做时没有表现出来,也没有在我们的其他 Kafka 3.7 集群上表现出来。 > 我们在 AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] > 主机上运行。我们有 23 个代理,每个代理有 24 个磁盘。我们在 JBOD 配置中运行(即未突袭)。 > 由于此集群是从 Kafka 2.4 升级而来的,并且由于我们使用的是 JBOD,因此我们仍然使用 Zookeeper。 > 附加了示例代理日志。05-12 和 05-14 日志来自不同的主机。如果我能提供任何进一步的信息,请告诉我。 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16333) Removed Deprecated methods KTable#join
[ https://issues.apache.org/jira/browse/KAFKA-16333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846821#comment-17846821 ] Matthias J. Sax commented on KAFKA-16333: - In general yes. But we should wait until a final decision was made if the release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks. > Removed Deprecated methods KTable#join > -- > > Key: KAFKA-16333 > URL: https://issues.apache.org/jira/browse/KAFKA-16333 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > KTable#join() methods taking a `Named` parameter got deprecated in 3.1 > release via https://issues.apache.org/jira/browse/KAFKA-13813 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16329) Remove Deprecated Task/ThreadMetadata classes and related methods
[ https://issues.apache.org/jira/browse/KAFKA-16329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846820#comment-17846820 ] Matthias J. Sax commented on KAFKA-16329: - In general yes. But we should wait until a final decision was made if the release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks. > Remove Deprecated Task/ThreadMetadata classes and related methods > - > > Key: KAFKA-16329 > URL: https://issues.apache.org/jira/browse/KAFKA-16329 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > Deprecated in AK 3.0 via > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-744%3A+Migrate+TaskMetadata+and+ThreadMetadata+to+an+interface+with+internal+implementation] > > * > org.apache.kafka.streams.processor.TaskMetadata > * org.apache.kafka.streams.processo.ThreadMetadata > * org.apache.kafka.streams.KafkaStreams#localThredMetadata > * org.apache.kafka.streams.state.StreamsMetadata > * org.apache.kafka.streams.KafkaStreams#allMetadata > * org.apache.kafka.streams.KafkaStreams#allMetadataForStore > This is related https://issues.apache.org/jira/browse/KAFKA-16330 and both > ticket should be worked on together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16330) Remove Deprecated methods/variables from TaskId
[ https://issues.apache.org/jira/browse/KAFKA-16330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846819#comment-17846819 ] Matthias J. Sax commented on KAFKA-16330: - In general yes. But we should wait until a final decision was made if the release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks. > Remove Deprecated methods/variables from TaskId > --- > > Key: KAFKA-16330 > URL: https://issues.apache.org/jira/browse/KAFKA-16330 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > Cf > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > > This ticket relates to https://issues.apache.org/jira/browse/KAFKA-16329 and > both should be worked on together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)
[ https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846818#comment-17846818 ] Matthias J. Sax commented on KAFKA-16448: - [~muralibasani] – Thanks for you interest. This ticket is already WIP and the corresponding KIP-1033 was accepted, and there is already a PR... [~Dabz] should we assign this ticket to you or Loic (not sure if he has an Jira account – if not, we should create one so we can assign the ticket to him?) > Add Kafka Streams exception handler for exceptions occuring during processing > (KIP-1033) > > > Key: KAFKA-16448 > URL: https://issues.apache.org/jira/browse/KAFKA-16448 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina > Priority: Minor > > Jira to follow work on KIP: [KIP-1033: Add Kafka Streams exception handler > for exceptions occuring during > processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846815#comment-17846815 ] Matthias J. Sax commented on KAFKA-16774: - Wondering if this is actually a flaky test, or if it exposes a real bug? > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16774: Labels: flaky-test (was: ) > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16774: Component/s: streams unit tests > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846814#comment-17846814 ] Matthias J. Sax edited comment on KAFKA-15242 at 5/16/24 5:32 AM: -- There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP – ie, there is KIP-1027. I believe it would cover this ticket? Wondering of we can close this ticket as duplicate? was (Author: mjsax): There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP – ie, there is KIP-1027. I believe it would cover this ticket? > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846814#comment-17846814 ] Matthias J. Sax commented on KAFKA-15242: - There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP – ie, there is KIP-1027. I believe it would cover this ticket? > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy
[ https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846787#comment-17846787 ] Lenin Joseph commented on KAFKA-16656: -- Hi [~ChrisEgerton] , the issue seems to be due to aliases containing the replication.policy.separator. ex: replication.policy.separator: "-" spec: clusters: - alias: cluster-1 - alias: cluster-2 I removed the "-" from the aliases to resolve the issue. > Using a custom replication.policy.separator with DefaultReplicationPolicy > - > > Key: KAFKA-16656 > URL: https://issues.apache.org/jira/browse/KAFKA-16656 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.5.1 >Reporter: Lenin Joseph >Priority: Major > > Hi, > In the case of bidirectional replication using mm2, when we tried using a > custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , > we see cyclic replication of topics. Could you confirm whether it's mandatory > to use a CustomReplicationPolicy whenever we want to use a separator other > than a "." ? > Regards, > Lenin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16771) First log directory printed twice when formatting storage
[ https://issues.apache.org/jira/browse/KAFKA-16771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846781#comment-17846781 ] xuanzhang gong commented on KAFKA-16771: i will handle this issue > First log directory printed twice when formatting storage > - > > Key: KAFKA-16771 > URL: https://issues.apache.org/jira/browse/KAFKA-16771 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Priority: Major > > If multiple log directories are set, when running bin/kafka-storage.sh > format, the first directory is printed twice. For example: > {noformat} > bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c > config/kraft/server.properties --release-version 3.6 > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY}) > Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. > Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. > Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2. > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16779) Kafka retains logs past specified retention
Nicholas Feinberg created KAFKA-16779: - Summary: Kafka retains logs past specified retention Key: KAFKA-16779 URL: https://issues.apache.org/jira/browse/KAFKA-16779 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Nicholas Feinberg Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz In a Kafka cluster with all topics set to four days of retention or longer (34560ms), most brokers seem to be retaining six days of data. This is true even for topics which have high throughput (500MB/s, 50k msgs/s) and thus are regularly rolling new log segments. We observe this unexpectedly high retention both via disk usage statistics and by requesting the oldest available messages from Kafka. Some of these brokers crashed with an 'mmap failed' error (attached). When those brokers started up again, they returned to the expected four days of retention. Manually restarting brokers also seems to cause them to return to four days of retention. Demoting and promoting brokers only has this effect on a small part of the data hosted on a broker. These hosts had ~170GiB of free memory available. We saw no signs of pressure on either system or JVM heap memory before or after they reported this error. Committed memory seems to be around 10%, so this doesn't seem to be an overcommit issue. This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). Prior to the upgrade, it was running on Kafka 2.4. We last reduced retention for ops on May 7th, after which we restored retention to our default of four days. This was the second time we've temporarily reduced and restored retention since the upgrade. This problem did not manifest the previous time we did so, nor did it manifest on our other Kafka 3.7 clusters. We are running on AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We have 23 brokers, each with 24 disks. We're running in a JBOD configuration (i.e. unraided). Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, we're still using Zookeeper. Sample broker logs are attached. The 05-12 and 05-14 logs are from separate hosts. Please let me know if I can provide any further information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition
Philip Nee created KAFKA-16778: -- Summary: AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition Key: KAFKA-16778 URL: https://issues.apache.org/jira/browse/KAFKA-16778 Project: Kafka Issue Type: Bug Reporter: Philip Nee {code:java} java.lang.IllegalStateException: No current assignment for partition output-topic-26 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) at org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542) at org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411) at org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94) {code} The setup is - running 30 consumers consuming from a 300 partitions topic. We can occasionally get an IllegalStateException from the consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846763#comment-17846763 ] Justine Olshan commented on KAFKA-16692: Hey [~akaltsikis] yeah, those are the release notes for 3.6. We can probably edit the kafka-site repo to get the change to show up in real time, but updating via the kafka repo requires waiting for the next release for the site to update. > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [h
[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-16692: --- Affects Version/s: 3.8 > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-16692: --- Affects Version/s: 3.7.0 > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846751#comment-17846751 ] Angelos Kaltsikis commented on KAFKA-16692: --- Tbh I was thinking that we should add it here https://kafka.apache.org/36/documentation.html#upgrade_360_notable My main concern is till your fix is released with 3.6.3 anyone upgrading from <= 3.5.2 that use transactions have the risk on facing the same issues we faced. It goes without saying, a big Thanks a ton for the contribution and the fix. > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsTo
[jira] [Assigned] (KAFKA-16626) Uuid to String for subscribed topic names in assignment spec
[ https://issues.apache.org/jira/browse/KAFKA-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim reassigned KAFKA-16626: Assignee: Jeff Kim (was: Ritika Reddy) > Uuid to String for subscribed topic names in assignment spec > > > Key: KAFKA-16626 > URL: https://issues.apache.org/jira/browse/KAFKA-16626 > Project: Kafka > Issue Type: Sub-task >Reporter: Ritika Reddy >Assignee: Jeff Kim >Priority: Major > > In creating the assignment spec from the existing consumer subscription > metadata, quite some time is spent in converting the String to a Uuid > Change from Uuid to String for the subscribed topics in assignment spec and > convert on the fly -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Description: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when called continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, but filing this one to provide more context and point out the test failures and suggested new tests,. All fail even with the current patch in KAFKA-16637 so needs investigation. was: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consu
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16777: -- Component/s: clients > New consumer should throw NoOffsetForPartitionException on continuous poll > zero if no reset strategy > > > Key: KAFKA-16777 > URL: https://issues.apache.org/jira/browse/KAFKA-16777 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > If the consumer does not define an offset reset strategy, a call to poll > should fail with NoOffsetForPartitionException. That works as expected on the > new consumer when polling with a timeout > 0 (existing integration test > [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), > but fails when polling continuously with ZERO timeout. > This can be easily reproduced with a new integration test like this (passes > for the legacy consumer but fails for the new consumer). We should add it as > part of the fix, for better coverage: > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, > groupProtocol: String): Unit = { > this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "none") > val consumer = createConsumer(configOverrides = this.consumerConfig) > consumer.assign(List(tp).asJava) > // continuous poll should eventually fail because there is no offset > reset strategy set (fail only when resetting positions after coordinator is > known) > TestUtils.tryUntilNoAssertionError() { > assertThrows(classOf[NoOffsetForPartitionException], () => > consumer.poll(Duration.ZERO)) > } > } > {code} > Also this is covered in the unit test > [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], > that is currently enabled only for the LegacyConsumer. After fixing this > issue we should be able to enable it for the new consumer too. > The issue seems to be around calling poll with ZERO timeout, that even when > continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, > so the updateFetchPositions never makes it to > [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], > where the exception is thrown. > > There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, > but filing this one to provide more context and point out the test failures > and suggested new tests,. All fail even with the current patch in KAFKA-16637 > so needs investigation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Description: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, but filing this one to provide more context and point out the test failures and suggested new tests,. All fail even with the current patch in KAFKA-16637 so needs investigation. was: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/
[jira] [Created] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
Lianet Magrans created KAFKA-16777: -- Summary: New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy Key: KAFKA-16777 URL: https://issues.apache.org/jira/browse/KAFKA-16777 Project: Kafka Issue Type: Bug Components: consumer Reporter: Lianet Magrans If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846742#comment-17846742 ] Justine Olshan commented on KAFKA-16692: [~akaltsikis] Hmmm...I've included information in release notes about bugs like this, but not sure if there is a place to update between releases. Good new though is that the PR is almost ready. Just running a final batch of tests. > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apa
[jira] [Assigned] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-16622: - Assignee: Edoardo Comar > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16776) Builds flaky-failing with leaked client-metrics-reaper thread
[ https://issues.apache.org/jira/browse/KAFKA-16776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16776: Description: I observed the assertion added by KAFKA-16477 failing in my build: [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15469/6/tests] Reproduced here: {noformat} Build / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.TopicCommandIntegrationTest<1s Build / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.AuthorizerIntegrationTest<1s Build / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest<1s Build / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest<1s Build / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest<1s{noformat} The exceptions for these tests are as follows: {noformat} org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @BeforeAll: `executor-client-metrics` ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896) at app//kafka.server.QuorumTestHarness.setUpClass(QuorumTestHarness.scala:474) ... Suppressed: org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @AfterAll: `executor-client-metrics` ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896) at app//kafka.server.QuorumTestHarness.tearDownClass(QuorumTestHarness.scala:482){noformat} But there is not any additional context about who leaked this thread. was: I observed the assertion added by KAFKA-16477 failing in my build: [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15469/6/tests] Reproduced here: {noformat} Build / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.TopicCommandIntegrationTest<1sBuild / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.AuthorizerIntegrationTest<1sBuild / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest<1sBuild / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest<1sBuild / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest<1s{noformat} The exceptions for these tests are as follows: {noformat} org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @BeforeAll: `executor-client-metrics` ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896) at app//kafka.server.QuorumTestHarness.setUpClass(QuorumTestHarness.scala:474) ... Suppressed: org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @AfterAll: `executor-client-metrics` ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at app//kafka.utils.TestUtils$
[jira] [Created] (KAFKA-16776) Builds flaky-failing with leaked client-metrics-reaper thread
Greg Harris created KAFKA-16776: --- Summary: Builds flaky-failing with leaked client-metrics-reaper thread Key: KAFKA-16776 URL: https://issues.apache.org/jira/browse/KAFKA-16776 Project: Kafka Issue Type: Test Affects Versions: 3.8.0 Reporter: Greg Harris I observed the assertion added by KAFKA-16477 failing in my build: [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15469/6/tests] Reproduced here: {noformat} Build / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.TopicCommandIntegrationTest<1sBuild / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.AuthorizerIntegrationTest<1sBuild / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest<1sBuild / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest<1sBuild / JDK 17 and Scala 2.13 / initializationError – org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest<1s{noformat} The exceptions for these tests are as follows: {noformat} org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @BeforeAll: `executor-client-metrics` ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896) at app//kafka.server.QuorumTestHarness.setUpClass(QuorumTestHarness.scala:474) ... Suppressed: org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @AfterAll: `executor-client-metrics` ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896) at app//kafka.server.QuorumTestHarness.tearDownClass(QuorumTestHarness.scala:482){noformat} But there is not any additional context about who leaked this thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16775) Fix flaky PlaintextAdminIntegrationTest#testCreateExistingTopicsThrowTopicExistsException
Chia-Ping Tsai created KAFKA-16775: -- Summary: Fix flaky PlaintextAdminIntegrationTest#testCreateExistingTopicsThrowTopicExistsException Key: KAFKA-16775 URL: https://issues.apache.org/jira/browse/KAFKA-16775 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai org.opentest4j.AssertionFailedError: timed out waiting for topics at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:138) at app//kafka.api.BaseAdminIntegrationTest.waitForTopics(BaseAdminIntegrationTest.scala:236) at app//kafka.api.PlaintextAdminIntegrationTest.testCreateExistingTopicsThrowTopicExistsException(PlaintextAdminIntegrationTest.scala:140) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
Chia-Ping Tsai created KAFKA-16774: -- Summary: fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled Key: KAFKA-16774 URL: https://issues.apache.org/jira/browse/KAFKA-16774 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai java.util.ConcurrentModificationException at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) at org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) at org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) at org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) at org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16773) Fix flaky QuorumControllerTest#testDelayedConfigurationOperations
Chia-Ping Tsai created KAFKA-16773: -- Summary: Fix flaky QuorumControllerTest#testDelayedConfigurationOperations Key: KAFKA-16773 URL: https://issues.apache.org/jira/browse/KAFKA-16773 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0 at org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:788) at org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1267) at org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:546) at org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:878) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:868) at org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is larger than the current epoch 0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846705#comment-17846705 ] Igor Soarez commented on KAFKA-16760: - [~showuon] I a few hours looking into this but couldn't yet figure out why the test is failing. Regarding the logs you first mentioned: Will await the new LeaderAndIsr state before resuming fetching. I'm not sure this is normal. I've also not found an integration test covering alterLogDir for KRaft, so we need to add that! > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16772) Introduce a type for kraft.version
José Armando García Sancio created KAFKA-16772: -- Summary: Introduce a type for kraft.version Key: KAFKA-16772 URL: https://issues.apache.org/jira/browse/KAFKA-16772 Project: Kafka Issue Type: Sub-task Components: kraft Reporter: José Armando García Sancio Instead of using a "short" to represent the kraft.version introduce a class or enum to provide a better abstraction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition
[ https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846684#comment-17846684 ] Greg Harris commented on KAFKA-16768: - [~muralibasani] Do you mean call Acceptor#close? That might cause an infinite loop (because Acceptor#close calls Processor#close which calls Processor#closeAll). I think if the Processor closeAll (where newConnections is drained) could call just the acceptor thread `join` and get the same effect, but without the infinite loop. > SocketServer leaks accepted SocketChannel instances due to race condition > - > > Key: KAFKA-16768 > URL: https://issues.apache.org/jira/browse/KAFKA-16768 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.8.0 >Reporter: Greg Harris >Priority: Major > > The SocketServer has threads for Acceptors and Processors. These threads > communicate via Processor#accept/Processor#configureNewConnections and the > `newConnections` queue. > During shutdown, the Acceptor and Processors are each stopped by setting > shouldRun to false, and then shutdown proceeds asynchronously in all > instances together. This leads to a race condition where an Acceptor accepts > a SocketChannel and queues it to a Processor, but that Processor instance has > already started shutting down and has already drained the newConnections > queue. > KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely > different implementation but has the same flaw. > An example execution order that includes this leak: > 1. Acceptor#accept() is called, and a new SocketChannel is accepted. > 2. Acceptor#assignNewConnection() begins > 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor > and attached Processor instances > 4. Processor#run() checks the shouldRun variable, and exits the loop > 5. Processor#closeAll() executes, and drains the `newConnections` variable > 6. Processor#run() returns and the Processor thread terminates > 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the > SocketChannel to `newConnections` > 8. Acceptor#assignNewConnection() returns > 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the > Acceptor thread terminates. > 10. Acceptor#close() joins all of the terminated threads, and returns > At the end of this sequence, there are still open SocketChannel instances in > newConnections, which are then considered leaked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)
[ https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846675#comment-17846675 ] Muralidhar Basani commented on KAFKA-16448: --- [~Dabz] can I try looking into this ? > Add Kafka Streams exception handler for exceptions occuring during processing > (KIP-1033) > > > Key: KAFKA-16448 > URL: https://issues.apache.org/jira/browse/KAFKA-16448 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina > Priority: Minor > > Jira to follow work on KIP: [KIP-1033: Add Kafka Streams exception handler > for exceptions occuring during > processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16771) First log directory printed twice when formatting storage
Mickael Maison created KAFKA-16771: -- Summary: First log directory printed twice when formatting storage Key: KAFKA-16771 URL: https://issues.apache.org/jira/browse/KAFKA-16771 Project: Kafka Issue Type: Task Components: tools Affects Versions: 3.7.0 Reporter: Mickael Maison If multiple log directories are set, when running bin/kafka-storage.sh format, the first directory is printed twice. For example: {noformat} bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties --release-version 3.6 metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY}) Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2. {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16770) Coalesce records into bigger batches
David Jacot created KAFKA-16770: --- Summary: Coalesce records into bigger batches Key: KAFKA-16770 URL: https://issues.apache.org/jira/browse/KAFKA-16770 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16603) Data loss when kafka connect sending data to Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846662#comment-17846662 ] Chris Egerton commented on KAFKA-16603: --- [~dasarianil] In most cases, a single worker-global consumer is used to read offsets from the offsets topic. If a connector uses a separate offsets topic (which can happen if it is explicitly configured to use one via the [offsets.storage.topic|https://kafka.apache.org/documentation.html#sourceconnectorconfigs_offsets.storage.topic] property in its configuration, or if exactly-once support is enabled and the connector is configured to write to a different Kafka cluster than the one used by the Kafka Connect worker for its internal topics), then a new consumer is spun up for that connector, but that consumer will also be constantly polling the topic for changes and reads to the end of that topic should not lead to a significant bump in load on the Kafka cluster. As far as communicating committed offsets to the {{SourceConnector}} instance goes, we cannot reuse state stored in the JVM to do this in most cases because Kafka Connect is intended to run in distributed mode with multiple workers, and there is no guarantee that the {{SourceConnector}} instance is running on the same worker as any of its tasks. Again, though, we could possibly add some hook to the {{SourceConnector}} class that lets implementations know when new offsets have been successfully committed by any of its tasks. Would that be sufficient? > Data loss when kafka connect sending data to Kafka > -- > > Key: KAFKA-16603 > URL: https://issues.apache.org/jira/browse/KAFKA-16603 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.3.1 >Reporter: Anil Dasari >Priority: Major > > We are experiencing a data loss when Kafka Source connector is failed to send > data to Kafka topic and offset topic. > Kafka cluster and Kafka connect details: > # Kafka connect version i.e client : Confluent community version 7.3.1 i.e > Kafka 3.3.1 > # Kafka version: 0.11.0 (server) > # Cluster size : 3 brokers > # Number of partitions in all topics = 3 > # Replication factor = 3 > # Min ISR set 2 > # Uses no transformations in Kafka connector > # Use default error tolerance i.e None. > Our connector checkpoints the offsets info received in > SourceTask#commitRecord and resume the data process from the persisted > checkpoint. > The data loss is noticed when broker is unresponsive for few mins due to high > load and kafka connector was restarted. Also, Kafka connector graceful > shutdown failed. > Logs: > > {code:java} > [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group > coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) > Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator > 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due > to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be > attempted. > Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from > last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) > Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected. > Apr 22, 2024 @ 15:56:16.708 [Producer > clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 > disconnected. > Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 > disconnected. > Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator > 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due > to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be > attempted. > Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log > **) > Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, > groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was > unreachable for 3000ms. Revoking previous assignment Assignment{error=0, > leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', > leaderUrl='http://10.75.100.46:8083/', offset=4, > connectorIds=[d094a5d7bbb046b99d62398cb84d648c], > taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], > revokedTask
[jira] [Commented] (KAFKA-16330) Remove Deprecated methods/variables from TaskId
[ https://issues.apache.org/jira/browse/KAFKA-16330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846661#comment-17846661 ] Muralidhar Basani commented on KAFKA-16330: --- I think very soon 4.0.0 might be the upcoming release after 3.8.0. May I pick this up ? > Remove Deprecated methods/variables from TaskId > --- > > Key: KAFKA-16330 > URL: https://issues.apache.org/jira/browse/KAFKA-16330 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > Cf > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > > This ticket relates to https://issues.apache.org/jira/browse/KAFKA-16329 and > both should be worked on together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16329) Remove Deprecated Task/ThreadMetadata classes and related methods
[ https://issues.apache.org/jira/browse/KAFKA-16329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846660#comment-17846660 ] Muralidhar Basani commented on KAFKA-16329: --- I think very soon 4.0.0 might be the upcoming release after 3.8.0. May I pick this up ? > Remove Deprecated Task/ThreadMetadata classes and related methods > - > > Key: KAFKA-16329 > URL: https://issues.apache.org/jira/browse/KAFKA-16329 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > Deprecated in AK 3.0 via > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-744%3A+Migrate+TaskMetadata+and+ThreadMetadata+to+an+interface+with+internal+implementation] > > * > org.apache.kafka.streams.processor.TaskMetadata > * org.apache.kafka.streams.processo.ThreadMetadata > * org.apache.kafka.streams.KafkaStreams#localThredMetadata > * org.apache.kafka.streams.state.StreamsMetadata > * org.apache.kafka.streams.KafkaStreams#allMetadata > * org.apache.kafka.streams.KafkaStreams#allMetadataForStore > This is related https://issues.apache.org/jira/browse/KAFKA-16330 and both > ticket should be worked on together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16333) Removed Deprecated methods KTable#join
[ https://issues.apache.org/jira/browse/KAFKA-16333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846658#comment-17846658 ] Muralidhar Basani commented on KAFKA-16333: --- I think very soon 4.0.0 might be the upcoming release after 3.8.0. May I pick this up ? > Removed Deprecated methods KTable#join > -- > > Key: KAFKA-16333 > URL: https://issues.apache.org/jira/browse/KAFKA-16333 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > KTable#join() methods taking a `Named` parameter got deprecated in 3.1 > release via https://issues.apache.org/jira/browse/KAFKA-13813 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846632#comment-17846632 ] Matej Sprysl commented on KAFKA-15242: -- I would also like this fixed. Currently, the API provides no other way to test FixedKeyProcessors other than creating a full test driver topology. > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition
[ https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846607#comment-17846607 ] Muralidhar Basani commented on KAFKA-16768: --- To fix this, when Processor#closeAll() is called, should we close all the acceptors in dataPlaneAcceptors, so that no new connections are accepted ? > SocketServer leaks accepted SocketChannel instances due to race condition > - > > Key: KAFKA-16768 > URL: https://issues.apache.org/jira/browse/KAFKA-16768 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.8.0 >Reporter: Greg Harris >Priority: Major > > The SocketServer has threads for Acceptors and Processors. These threads > communicate via Processor#accept/Processor#configureNewConnections and the > `newConnections` queue. > During shutdown, the Acceptor and Processors are each stopped by setting > shouldRun to false, and then shutdown proceeds asynchronously in all > instances together. This leads to a race condition where an Acceptor accepts > a SocketChannel and queues it to a Processor, but that Processor instance has > already started shutting down and has already drained the newConnections > queue. > KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely > different implementation but has the same flaw. > An example execution order that includes this leak: > 1. Acceptor#accept() is called, and a new SocketChannel is accepted. > 2. Acceptor#assignNewConnection() begins > 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor > and attached Processor instances > 4. Processor#run() checks the shouldRun variable, and exits the loop > 5. Processor#closeAll() executes, and drains the `newConnections` variable > 6. Processor#run() returns and the Processor thread terminates > 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the > SocketChannel to `newConnections` > 8. Acceptor#assignNewConnection() returns > 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the > Acceptor thread terminates. > 10. Acceptor#close() joins all of the terminated threads, and returns > At the end of this sequence, there are still open SocketChannel instances in > newConnections, which are then considered leaked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846592#comment-17846592 ] Angelos Kaltsikis edited comment on KAFKA-16692 at 5/15/24 11:36 AM: - Shall we explicitly state this on the documents till your fix is released? I am happy to ship a PR about that :) was (Author: akaltsikis): Shall we explicitly state this on the documents till your fix is released? > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the netwo
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846591#comment-17846591 ] Justine Olshan commented on KAFKA-16692: The delay in the fix is coming from the fact that reproducing the issue in a system test is hard because while I can get the error to appear, it is hard to consistently get a test to fail due to it. I assure you I'm making some progress and hope to have a fix by the end of the week :) > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we crea
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846592#comment-17846592 ] Angelos Kaltsikis commented on KAFKA-16692: --- Shall we explicitly state this on the documents till your fix is released? > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846590#comment-17846590 ] Justine Olshan commented on KAFKA-16692: Hi [~akaltsikis]. Yes the bug is due to the verification requests during the upgrade. If you disable the verification, you won't hit the bug. However, the feature was designed to allow upgrades, and there seems to be a small bug in the logic to gate the requests during the upgrade. I'm working on fixing that. In the meantime, disabling the feature on upgrade is a work around. > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitions
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846577#comment-17846577 ] Angelos Kaltsikis commented on KAFKA-16692: --- It seems that adding the `transaction.partition.verification.enable=false` to the 3.6.2 brokers makes the issue disappear. That means that when upgrading from 3.5.2 to 3.6.2, if the above config exists, there are no visible errors on the Kafka side or on the client side. The same is true when downgrading from 3.6.2 to 3.5.2 > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}No
[jira] [Comment Edited] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846572#comment-17846572 ] Angelos Kaltsikis edited comment on KAFKA-16692 at 5/15/24 10:03 AM: - We faced the same issue when we tried to upgrade from 3.5.2 to 3.6.2 a few days ago. This is reproducible every time i am trying to upgrade from 3.5.2->3.6.2 or vice-versa (reverting from 3.6.2 -> 3.5.2) We are using a lot of clients that use transactions, thus those transactions fail on the client side every time we do those operations. The interesting part is that as soon as the upgrade/downgrade is finished being rolled the errors disappear and clients seem to work fine. Thus this is visible only during the rolling deployment for the upgrade. Let me know if i can help somehow was (Author: akaltsikis): We faced the same issue when we tried to upgrade from 3.5.2 to 3.6.2 a few days ago. This is reproducible every time i am trying to upgrade from 3.5.2->3.6.2 or vice-versa (reverting from 3.6.2 -> 3.5.2) We are using a lot of clients that use transactions, thus those transactions fail on the client side every time we do those operations. Let me know if i can help somehow > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846572#comment-17846572 ] Angelos Kaltsikis commented on KAFKA-16692: --- We faced the same issue when we tried to upgrade from 3.5.2 to 3.6.2 a few days ago. This is reproducible every time i am trying to upgrade from 3.5.2->3.6.2 or vice-versa (reverting from 3.6.2 -> 3.5.2) We are using a lot of clients that use transactions, thus those transactions fail on the client side every time we do those operations. Let me know if i can help somehow > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPart
[jira] [Comment Edited] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846552#comment-17846552 ] Igor Soarez edited comment on KAFKA-16760 at 5/15/24 8:57 AM: -- [~showuon] thanks for updating the branch, I can reproduce it now and will have a look. was (Author: soarez): [~showuon] thanks for updating the branch, I'll have a look at this. > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846552#comment-17846552 ] Igor Soarez commented on KAFKA-16760: - [~showuon] thanks for updating the branch, I'll have a look at this. > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join
[ https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846544#comment-17846544 ] Karsten Stöckmann commented on KAFKA-16700: --- [~ayoubomari] Hm, according to documentation, it actually is also about {{null}} foreign keys. {quote} * left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.{quote} Thus, this should not come into play as a potential issue here. (Also, as mentioned, foreign keys are never {{null}} as their source table column is not nullable.) > Kafka Streams: possible message loss on KTable-KTable FK Left Join > -- > > Key: KAFKA-16700 > URL: https://issues.apache.org/jira/browse/KAFKA-16700 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 > Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and > 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka > Operators >Reporter: Karsten Stöckmann >Priority: Major > Labels: dsl, joins, streams > > We are experiencing significant, yet intermittent / non-deterministic / > unexplainable message loss on a Kafka Streams topology while performing a > *KTable-KTable* {*}FK Left Join{*}. > Assume the following snippet: > {code:java} > streamsBuilder > .table( > folderTopicName, > Consumed.with( > folderKeySerde, > folderSerde)) > .leftJoin( > agencies, // KTable > Folder::agencyIdValue, > AggregateFolder::new, > TableJoined.as("folder-to-agency"), > Materialized > .as("folder-to-agency-materialized") > .withKeySerde(folderKeySerde) > .withValueSerde(aggregateFolderSerde)) > .leftJoin( > documents, > {code} > The setup is as follows: > A Debezium Connector for PostgreSQL streams database changes into various > Kafka topics. A series of Quarkus Kafka Streams applications then performs > aggregation operations on those topics to create index documents later to be > sent into an OpenSearch system. > When firing up the Kafka Streams infrastructure to work on initially > populated Kafka Topics (i.e. a snapshot of all relevant table data has been > streamed to Kafka), the above shown KTable-KTable FK Left Join seems to > produce message loss on the first of a series of FK Left Joins; the right > hand {{KTable}} is consumed from an aggregated > topic fed from another Kafka Streams topology / application. > On a (heavily reduced) test data set of 6828 messages in the > {{folderTopicName}} Topic, we observe the following results: > * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages > * {{{}folder-to-agency-subscription-response{}}}: *3048* messages > * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages > * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages. > Telling from the nature of a (FK) Left Join, I'd expect all messages from the > left hand topic should produce an aggregate even if no matching message is > found in the right hand topic. > Message loss unpredictably varies across tests and seems not to be bound to > specific keys or messages. > As it seems, this can only be observed when initially firing up the Streams > infrastructure to process the message 'backlog' that had been snapshotted by > Debezium. A manual snapshot triggered later (i.e. Streams applications > already running) seems not to show this behaviour. Additionally, as of yet we > observed this kind of message loss only when running multiple replicas of the > affected application. When carrying out the tests with only one replica, > everything seems to work as expected. We've tried to leverage > {{group.initial.rebalance.delay.ms}} in order to rule out possible > rebalancing issues, but to no avail. > Our Kafka configuration: > {code:yaml} > offsets.topic.replication.factor: 3 > transaction.state.log.replication.factor: 3 > transaction.state.log.min.isr: 2 > default.replication.factor: 3 > min.insync.replicas: 2 > message.max.bytes: "20971520" > {code} > Our Kafka Streams application configuration: > {code:yaml} > kafka-streams.num.stream.threads: 5 > kafka-streams.num.standby.replicas: 1 > kafka-streams.auto.offset.reset: earliest > kafka
[jira] [Created] (KAFKA-16769) Delete deprecated add.source.alias.to.metrics configuration
Mickael Maison created KAFKA-16769: -- Summary: Delete deprecated add.source.alias.to.metrics configuration Key: KAFKA-16769 URL: https://issues.apache.org/jira/browse/KAFKA-16769 Project: Kafka Issue Type: Task Components: mirrormaker Reporter: Mickael Maison Assignee: Mickael Maison Fix For: 4.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846522#comment-17846522 ] Laymain commented on KAFKA-16361: - Thanks [~ableegoldman] and [~flashmouse], I am glad to learn that the fix has been merged. As a workaround, it is also possible to use a different Assignor; in our case, for instance, foregoing rack awareness would incur significant additional costs. > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = > topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], > offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1],
[jira] (KAFKA-16616) refactor mergeWith in MetadataSnapshot
[ https://issues.apache.org/jira/browse/KAFKA-16616 ] Cao Manh Dat deleted comment on KAFKA-16616: -- was (Author: caomanhdat): Hi [~alyssahuang] , can I work on this item? > refactor mergeWith in MetadataSnapshot > -- > > Key: KAFKA-16616 > URL: https://issues.apache.org/jira/browse/KAFKA-16616 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Alyssa Huang >Priority: Minor > > Right now we keep track of topic ids and partition metadata to add/update > separately in mergeWith (e.g. two maps passed as arguments). This means we > iterate over topic metadata twice which could be costly when we're dealing > with a large number of updates. > `updatePartitionLeadership` which calls `mergeWith` does something similarly > (generates map of topic ids to update in a loop separate from the list of > partition metadata to update) and should be refactored as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16616) refactor mergeWith in MetadataSnapshot
[ https://issues.apache.org/jira/browse/KAFKA-16616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846515#comment-17846515 ] Cao Manh Dat commented on KAFKA-16616: -- Hi [~alyssahuang] , can I work on this item? > refactor mergeWith in MetadataSnapshot > -- > > Key: KAFKA-16616 > URL: https://issues.apache.org/jira/browse/KAFKA-16616 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Alyssa Huang >Priority: Minor > > Right now we keep track of topic ids and partition metadata to add/update > separately in mergeWith (e.g. two maps passed as arguments). This means we > iterate over topic metadata twice which could be costly when we're dealing > with a large number of updates. > `updatePartitionLeadership` which calls `mergeWith` does something similarly > (generates map of topic ids to update in a loop separate from the list of > partition metadata to update) and should be refactored as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error
[ https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846508#comment-17846508 ] Luke Chen commented on KAFKA-16760: --- [~soarez], I'm really sorry, I can't believe I didn't commit my change upto the branch yesterday. I just updated the branch, and it reliably failed in my env after 3 run. Please give it a try again. I'd like to know if this is the expected behavior, or it is a bug? > alterReplicaLogDirs failed even if responded with none error > > > Key: KAFKA-16760 > URL: https://issues.apache.org/jira/browse/KAFKA-16760 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When firing alterLogDirRequest, it gets error NONE result. But actually, the > alterLogDir never happened with these errors: > {code:java} > [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 has an older epoch (0) than the current leader. Will await the new > LeaderAndIsr state before resuming fetching. > (kafka.server.ReplicaAlterLogDirsThread:66) > [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition > topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70) > {code} > Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. > This can be reproduced in this > [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running > this test: > {code:java} > ./gradlew cleanTest storage:test --tests > org.apache.kafka.tiered.storage.integration.AlterLogDirTest > {code} > The complete logs can be found here: > https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14
[ https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16763: -- Assignee: 黃竣陽 (was: Chia-Ping Tsai) > Upgrade to scala 2.12.19 and scala 2.13.14 > -- > > Key: KAFKA-16763 > URL: https://issues.apache.org/jira/browse/KAFKA-16763 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Minor > > scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19) > > scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16671) Revisit SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured
[ https://issues.apache.org/jira/browse/KAFKA-16671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16671. Fix Version/s: 3.8.0 Resolution: Fixed > Revisit SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured > > > Key: KAFKA-16671 > URL: https://issues.apache.org/jira/browse/KAFKA-16671 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > Fix For: 3.8.0 > > > loop 1000times on my local, and all pass. Let's enable the test to see what > happens in our CI -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846483#comment-17846483 ] appchemist commented on KAFKA-16764: Hi [~lianetm] I would like to take this issue > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition
[ https://issues.apache.org/jira/browse/KAFKA-16765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846475#comment-17846475 ] Greg Harris commented on KAFKA-16765: - This is also a bug in EchoServer: [https://github.com/apache/kafka/blob/cb968845ecb3cb0982182d9dd437ecf652fe38d3/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java#L76-L81] and ServerShutdownTest: [https://github.com/apache/kafka/blob/cb968845ecb3cb0982182d9dd437ecf652fe38d3/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala#L274-L275] except those don't require a race condition to happen. > NioEchoServer leaks accepted SocketChannel instances due to race condition > -- > > Key: KAFKA-16765 > URL: https://issues.apache.org/jira/browse/KAFKA-16765 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 3.8.0 >Reporter: Greg Harris >Priority: Minor > > The NioEchoServer has an AcceptorThread that calls accept() to open new > SocketChannel instances and insert them into the `newChannels` List, and a > main thread that drains the `newChannels` List and moves them to the > `socketChannels` List. > During shutdown, the serverSocketChannel is closed, which causes both threads > to exit their while loops. It is possible for the NioEchoServer main thread > to sense the serverSocketChannel close and terminate before the Acceptor > thread does, and for the Acceptor thread to put a SocketChannel in > `newChannels` before terminating. This instance is never closed by either > thread, because it is never moved to `socketChannels`. > A precise execution order that has this leak is: > 1. NioEchoServer thread locks `newChannels`. > 2. Acceptor thread accept() completes, and the SocketChannel is created > 3. Acceptor thread blocks waiting for the `newChannels` lock > 4. NioEchoServer thread releases the `newChannels` lock and does some > processing > 5. NioEchoServer#close() is called, which closes the serverSocketChannel > 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then > terminates > 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel > to `newChannels`. > 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates. > 9. NioEchoServer#close() stops blocking now that both other threads have > terminated. > The end result is that the leaked socket is left open in the `newChannels` > list at the end of close(), which is incorrect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition
[ https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16768: Description: The SocketServer has threads for Acceptors and Processors. These threads communicate via Processor#accept/Processor#configureNewConnections and the `newConnections` queue. During shutdown, the Acceptor and Processors are each stopped by setting shouldRun to false, and then shutdown proceeds asynchronously in all instances together. This leads to a race condition where an Acceptor accepts a SocketChannel and queues it to a Processor, but that Processor instance has already started shutting down and has already drained the newConnections queue. KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely different implementation but has the same flaw. An example execution order that includes this leak: 1. Acceptor#accept() is called, and a new SocketChannel is accepted. 2. Acceptor#assignNewConnection() begins 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor and attached Processor instances 4. Processor#run() checks the shouldRun variable, and exits the loop 5. Processor#closeAll() executes, and drains the `newConnections` variable 6. Processor#run() returns and the Processor thread terminates 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the SocketChannel to `newConnections` 8. Acceptor#assignNewConnection() returns 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the Acceptor thread terminates. 10. Acceptor#close() joins all of the terminated threads, and returns At the end of this sequence, there are still open SocketChannel instances in newConnections, which are then considered leaked. was: The SocketServer has threads for Acceptors and Processors. These threads communicate via Processor#accept/Processor#configureNewConnections and the `newConnections` queue. During shutdown, the Acceptor and Processors are each stopped by setting shouldRun to false, and then shutdown proceeds asynchronously in all instances together. This leads to a race condition where an Acceptor accepts a SocketChannel and queues it to a Processor, but that Processor instance has already started shutting down and has already drained the newConnections queue. > SocketServer leaks accepted SocketChannel instances due to race condition > - > > Key: KAFKA-16768 > URL: https://issues.apache.org/jira/browse/KAFKA-16768 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.8.0 >Reporter: Greg Harris >Priority: Major > > The SocketServer has threads for Acceptors and Processors. These threads > communicate via Processor#accept/Processor#configureNewConnections and the > `newConnections` queue. > During shutdown, the Acceptor and Processors are each stopped by setting > shouldRun to false, and then shutdown proceeds asynchronously in all > instances together. This leads to a race condition where an Acceptor accepts > a SocketChannel and queues it to a Processor, but that Processor instance has > already started shutting down and has already drained the newConnections > queue. > KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely > different implementation but has the same flaw. > An example execution order that includes this leak: > 1. Acceptor#accept() is called, and a new SocketChannel is accepted. > 2. Acceptor#assignNewConnection() begins > 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor > and attached Processor instances > 4. Processor#run() checks the shouldRun variable, and exits the loop > 5. Processor#closeAll() executes, and drains the `newConnections` variable > 6. Processor#run() returns and the Processor thread terminates > 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the > SocketChannel to `newConnections` > 8. Acceptor#assignNewConnection() returns > 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the > Acceptor thread terminates. > 10. Acceptor#close() joins all of the terminated threads, and returns > At the end of this sequence, there are still open SocketChannel instances in > newConnections, which are then considered leaked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition
Greg Harris created KAFKA-16768: --- Summary: SocketServer leaks accepted SocketChannel instances due to race condition Key: KAFKA-16768 URL: https://issues.apache.org/jira/browse/KAFKA-16768 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.8.0 Reporter: Greg Harris The SocketServer has threads for Acceptors and Processors. These threads communicate via Processor#accept/Processor#configureNewConnections and the `newConnections` queue. During shutdown, the Acceptor and Processors are each stopped by setting shouldRun to false, and then shutdown proceeds asynchronously in all instances together. This leads to a race condition where an Acceptor accepts a SocketChannel and queues it to a Processor, but that Processor instance has already started shutting down and has already drained the newConnections queue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14
[ https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846471#comment-17846471 ] 黃竣陽 commented on KAFKA-16763: - I will handle this issue. > Upgrade to scala 2.12.19 and scala 2.13.14 > -- > > Key: KAFKA-16763 > URL: https://issues.apache.org/jira/browse/KAFKA-16763 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19) > > scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846467#comment-17846467 ] A. Sophie Blee-Goldman edited comment on KAFKA-16361 at 5/15/24 12:24 AM: -- Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in KAFKA-15170 Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though was (Author: ableegoldman): Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in [KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170] Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assig
[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846467#comment-17846467 ] A. Sophie Blee-Goldman commented on KAFKA-16361: Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in [KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170] Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partit
[jira] [Updated] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly
[ https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15170: --- Fix Version/s: 3.8.0 3.7.1 > CooperativeStickyAssignor cannot adjust assignment correctly > > > Key: KAFKA-15170 > URL: https://issues.apache.org/jira/browse/KAFKA-15170 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment > when all consumers in group subscribe the same topic list, but it couldn't > add all partitions move owner to another consumer to > ``partitionsWithMultiplePreviousOwners``. > > the reason is in function assignOwnedPartitions hasn't add partitions that > rack-mismatch with prev owner to allRevokedPartitions, then partition only in > this list would add to partitionsWithMultiplePreviousOwners. > > In Cooperative Rebalance, partitions have changed owner must be removed from > final assignment or will lead to incorrect consume behavior, I have already > raise a pr, please take a look, thx > > [https://github.com/apache/kafka/pull/13965] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly
[ https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15170. Resolution: Fixed > CooperativeStickyAssignor cannot adjust assignment correctly > > > Key: KAFKA-15170 > URL: https://issues.apache.org/jira/browse/KAFKA-15170 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment > when all consumers in group subscribe the same topic list, but it couldn't > add all partitions move owner to another consumer to > ``partitionsWithMultiplePreviousOwners``. > > the reason is in function assignOwnedPartitions hasn't add partitions that > rack-mismatch with prev owner to allRevokedPartitions, then partition only in > this list would add to partitionsWithMultiplePreviousOwners. > > In Cooperative Rebalance, partitions have changed owner must be removed from > final assignment or will lead to incorrect consume behavior, I have already > raise a pr, please take a look, thx > > [https://github.com/apache/kafka/pull/13965] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16277: --- Fix Version/s: 3.7.1 > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Assignee: Cameron Redpath >Priority: Major > Fix For: 3.8.0, 3.7.1 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16764: -- Fix Version/s: 3.8.0 > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16767) KRaft should track HWM outside the log layer
José Armando García Sancio created KAFKA-16767: -- Summary: KRaft should track HWM outside the log layer Key: KAFKA-16767 URL: https://issues.apache.org/jira/browse/KAFKA-16767 Project: Kafka Issue Type: Improvement Components: kraft Reporter: José Armando García Sancio The current implementation of KRaft tracks the HWM using the log layer implementation. The log layer has an invariant where the HWM <= LEO. This mean that the log layer always sets the HWM to the minimum of HWM and LEO. This has the side-effect of the local KRaft reporting a HWM that is much smaller than the leader's HWM when the replica start with an empty log. E.g. a new broker or the kafka-metadata-shell. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16637) AsyncKafkaConsumer removes offset fetch responses from cache too aggressively
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846446#comment-17846446 ] Lianet Magrans commented on KAFKA-16637: Hey [~chickenchickenlove], sorry I had missed your last question. The new group rebalance protocol from KIP-848 is supported in KRaft mode only. > AsyncKafkaConsumer removes offset fetch responses from cache too aggressively > - > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
[ https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16766: --- Description: If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the same timeout message in the LegacyConsumer (defined [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]), but do not have a clear message in the Async, so we should fix them all 3. With the fix, we should write tests for each func, like the ones defined for the Legacy Consumer ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]). Note that we would need different tests, added to AsyncKafkaConsumerTest, given that the async consumer issues a FindCoordinator request in this case, but the AsyncConsumer does, so it does not account for that when matching requests/responses in the current tests. was: If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. After the fix, we should be able to write a test like the [testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246] that exist for the legacy consumer. Note that we would need a different test given that the legacy consumer does not issue a FindCoordinator request in this case but the AsyncConsumer does, so the test would have to account for that when matching requests/responses. > New consumer offsetsForTimes timeout exception does not have the proper > message > --- > > Key: KAFKA-16766 > URL: https://issues.apache.org/jira/browse/KAFKA-16766 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer > will throw a org.apache.kafka.common.errors.TimeoutException as expected, but > with the following as message: "java.util.concurrent.TimeoutException". > We should provide a clearer message, and I would even say we keep the same > message that the LegacyConsumer shows in this case, ex: "Failed to get > offsets by times in 6ms". > To fix this we should consider catching the timeout exception in the consumer > when offsetsForTimes result times out > ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), > and propagate it with the message specific to offsetsForTimes. > Same situation exists for beginningOffsets and endOffsets. All 3 funcs show > the same timeout message in the LegacyConsumer (defined > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/co
[jira] [Created] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
Lianet Magrans created KAFKA-16766: -- Summary: New consumer offsetsForTimes timeout exception does not have the proper message Key: KAFKA-16766 URL: https://issues.apache.org/jira/browse/KAFKA-16766 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Fix For: 3.8.0 If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. After the fix, we should be able to write a test like the [testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246] that exist for the legacy consumer. Note that we would need a different test given that the legacy consumer does not issue a FindCoordinator request in this case but the AsyncConsumer does, so the test would have to account for that when matching requests/responses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition
Greg Harris created KAFKA-16765: --- Summary: NioEchoServer leaks accepted SocketChannel instances due to race condition Key: KAFKA-16765 URL: https://issues.apache.org/jira/browse/KAFKA-16765 Project: Kafka Issue Type: Bug Components: core, unit tests Affects Versions: 3.8.0 Reporter: Greg Harris The NioEchoServer has an AcceptorThread that calls accept() to open new SocketChannel instances and insert them into the `newChannels` List, and a main thread that drains the `newChannels` List and moves them to the `socketChannels` List. During shutdown, the serverSocketChannel is closed, which causes both threads to exit their while loops. It is possible for the NioEchoServer main thread to sense the serverSocketChannel close and terminate before the Acceptor thread does, and for the Acceptor thread to put a SocketChannel in `newChannels` before terminating. This instance is never closed by either thread, because it is never moved to `socketChannels`. A precise execution order that has this leak is: 1. NioEchoServer thread locks `newChannels`. 2. Acceptor thread accept() completes, and the SocketChannel is created 3. Acceptor thread blocks waiting for the `newChannels` lock 4. NioEchoServer thread releases the `newChannels` lock and does some processing 5. NioEchoServer#close() is called, which closes the serverSocketChannel 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then terminates 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel to `newChannels`. 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates. 9. NioEchoServer#close() stops blocking now that both other threads have terminated. The end result is that the leaked socket is left open in the `newChannels` list at the end of close(), which is incorrect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
[ https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16764: --- Description: A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) This is probably what makes that [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] fails for the new consumer. Once this bug is fixed, we should be able to enable that test for the new consumer. was: A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > New consumer should throw InvalidTopicException on poll when invalid topic in > metadata > -- > > Key: KAFKA-16764 > URL: https://issues.apache.org/jira/browse/KAFKA-16764 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > > A call to consumer.poll should throw InvalidTopicException if an invalid > topic is discovered in metadata. This can be easily reproduced by calling > subscribe("invalid topic") and then poll, for example.The new consumer does > not throw the expected InvalidTopicException like the LegacyKafkaConsumer > does. > The legacy consumer achieves this by checking for metadata exceptions on > every iteration of the ConsumerNetworkClient (see > [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) > This is probably what makes that > [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956] > fails for the new consumer. Once this bug is fixed, we should be able to > enable that test for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
Lianet Magrans created KAFKA-16764: -- Summary: New consumer should throw InvalidTopicException on poll when invalid topic in metadata Key: KAFKA-16764 URL: https://issues.apache.org/jira/browse/KAFKA-16764 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) -- This message was sent by Atlassian Jira (v8.20.10#820010)