[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846950#comment-17846950
 ] 

Chia-Ping Tsai commented on KAFKA-16774:


[~cadonna] thanks for your confirm. Please feel free to submit your 
solution/patch for it :)

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16782) Some partition's segments are suddenly not deleted anymore

2024-05-16 Thread Roland Sommer (Jira)
Roland Sommer created KAFKA-16782:
-

 Summary: Some partition's segments are suddenly not deleted anymore
 Key: KAFKA-16782
 URL: https://issues.apache.org/jira/browse/KAFKA-16782
 Project: Kafka
  Issue Type: Bug
Reporter: Roland Sommer


I recently discovered an odd behaviour in one of our kafka clusters 
(KRaft-based, v3.7.0):

We have a topic for distributed log collection with 48 partitions. Retention is 
set to 84 hours, we have the default {{cleanup.policy=delete}} in place. For 
all but two partitions this works as expected. In two partition directories 
there are files going back to january and consuming the specific partitions 
yields data from january (showing it's not only the files lying around, they 
are actually processed).

Topic settings as per {{kafka-topics.sh --describe}}:

{{Topic: syslog TopicId: AeJLnYPnQFOtMc0ZjpH7sw PartitionCount: 48 
ReplicationFactor: 2 Configs: 
compression.type=snappy,cleanup.policy=delete,segment.bytes=1073741824,retention.ms=30240,max.message.bytes=2097152}}

Searching the cluster logs, there is no indicator of what could be the reason 
here (at least I did not spot anything suspicious up until now). Up to the time 
were deletion stopped, there are log entries showing the deleteion of old log 
segments, but that simply stopped. As far as I can see, there has not been any 
change on the cluster at that point.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846940#comment-17846940
 ] 

Bruno Cadonna commented on KAFKA-16774:
---

[~chia7712] Thank you for your analysis! Without looking at your comment, I 
came to the same conclusion about the cause of the flakiness. I was puzzled at 
first because {{allTasks()}}  should only be called by the stream thread, so I 
did not understand were the concurrency comes from. Then I took a closer look 
at the test and then I saw that the stream thread is started in the test which 
explains the concurrency. 

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15541) Improved StateStore Iterator metrics for detecting leaks

2024-05-16 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-15541:
-
Summary: Improved StateStore Iterator metrics for detecting leaks  (was: 
RocksDB Iterator Metrics)

> Improved StateStore Iterator metrics for detecting leaks
> 
>
> Key: KAFKA-15541
> URL: https://issues.apache.org/jira/browse/KAFKA-15541
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip, kip-required
>
> [KIP-989: RocksDB Iterator 
> Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics]
> RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due 
> to [blocks being "pinned" 
> in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators].
>  Pinned blocks can currently be tracked via the per-store 
> {{block-cache-pinned-usage}} metric. However, it's common [(and even 
> recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb]
>  to share the Block Cache among all stores in an application, to enable users 
> to globally bound native memory used by RocksDB. This results in the 
> {{block-cache-pinned-usage}} reporting the same memory usage for every store 
> in the application, irrespective of which store is actually pinning blocks in 
> the block cache.
> To aid users in finding leaked Iterators, as well as identifying the cause of 
> a high number of pinned blocks, we introduce two new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846912#comment-17846912
 ] 

Chia-Ping Tsai edited comment on KAFKA-16774 at 5/16/24 11:11 AM:
--

[~mjsax] nice question. It seems to me that is a flaky. IIRC, 
`onPartitionsAssigned` should be executed by thread same to the one calling 
consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is 
called by junit tread  [1] and so it causes race condition.

We can fix it by returning a copy of `pendingTasksToInit` [2] with sync. That 
is similar to `allTasksPerId` [3]. WDYT?

[0] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147

[1] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408

 [2] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112

[3] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302


was (Author: chia7712):
[~mjsax] nice question. It seems to me that is a flaky. IIRC, 
`onPartitionsAssigned` should be executed by thread same to the one calling 
consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is 
called by junit tread  [1] and so it causes race condition.

We can fix it by returning a copy of `pendingTasksToInit` [2]. That is similar 
to `allTasksPerId` [3]. WDYT?

[0] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147

[1] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408

 [2] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112

[3] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846912#comment-17846912
 ] 

Chia-Ping Tsai commented on KAFKA-16774:


[~mjsax] nice question. It seems to me that is a flaky. IIRC, 
`onPartitionsAssigned` should be executed by thread same to the one calling 
consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is 
called by junit tread  [1] and so it causes race condition.

We can fix it by returning a copy of `pendingTasksToInit` [2]. That is similar 
to `allTasksPerId` [3]. WDYT?

[0] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147

[1] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408

 [2] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112

[3] 
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16668) Enable to set tags by `ClusterTest`

2024-05-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16668.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Enable to set tags by `ClusterTest` 
> 
>
> Key: KAFKA-16668
> URL: https://issues.apache.org/jira/browse/KAFKA-16668
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Minor
> Fix For: 3.8.0
>
>
> Currently, the display name can be customized by only `name` 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/annotation/ClusterTest.java#L42).
>  However, the "key" is hard-code to "name=xxx". Also, it is impossible to set 
> more "tags" for display name. 
> https://github.com/apache/kafka/pull/15766 is a example that we want to add 
> "xxx=bbb" to display name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16781) Expose advertised.listeners in controller node

2024-05-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16781:
--

Assignee: TengYao Chi

> Expose advertised.listeners in controller node
> --
>
> Key: KAFKA-16781
> URL: https://issues.apache.org/jira/browse/KAFKA-16781
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: TengYao Chi
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> After 
> [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration],
>  we allow clients to talk to the KRaft controller node directly. But unlike 
> broker node, we don't allow users to config advertised.listeners for clients 
> to connect to. Without this config, the client cannot connect to the 
> controller node if the controller is sitting behind NAT network while the 
> client is in the external network.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16781) Expose advertised.listeners in controller node

2024-05-16 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846890#comment-17846890
 ] 

TengYao Chi commented on KAFKA-16781:
-

I will handle this issue :)

> Expose advertised.listeners in controller node
> --
>
> Key: KAFKA-16781
> URL: https://issues.apache.org/jira/browse/KAFKA-16781
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> After 
> [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration],
>  we allow clients to talk to the KRaft controller node directly. But unlike 
> broker node, we don't allow users to config advertised.listeners for clients 
> to connect to. Without this config, the client cannot connect to the 
> controller node if the controller is sitting behind NAT network while the 
> client is in the external network.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16539) Can't update specific broker configs in pre-migration mode

2024-05-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16539.

Resolution: Fixed

> Can't update specific broker configs in pre-migration mode
> --
>
> Key: KAFKA-16539
> URL: https://issues.apache.org/jira/browse/KAFKA-16539
> Project: Kafka
>  Issue Type: Bug
>  Components: config, kraft
>Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> In migration mode, ZK brokers will have a forwarding manager configured. This 
> is used to forward requests to the KRaft controller once we get to that part 
> of the migration. However, prior to KRaft taking over as the controller 
> (known as pre-migration mode), the ZK brokers are still attempting to forward 
> IncrementalAlterConfigs to the controller.
> This works fine for cluster level configs (e.g., "-entity-type broker 
> --entity-default"), but this fails for specific broker configs (e.g., 
> "-entity-type broker --entity-id 1").
> This affects BROKER and BROKER_LOGGER config types.
> To workaround this bug, you can either disable migrations on the brokers 
> (assuming no migration has taken place), or proceed with the migration and 
> get to the point where KRaft is the controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16539) Can't update specific broker configs in pre-migration mode

2024-05-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16539:
---
Fix Version/s: (was: 3.6.3)

> Can't update specific broker configs in pre-migration mode
> --
>
> Key: KAFKA-16539
> URL: https://issues.apache.org/jira/browse/KAFKA-16539
> Project: Kafka
>  Issue Type: Bug
>  Components: config, kraft
>Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> In migration mode, ZK brokers will have a forwarding manager configured. This 
> is used to forward requests to the KRaft controller once we get to that part 
> of the migration. However, prior to KRaft taking over as the controller 
> (known as pre-migration mode), the ZK brokers are still attempting to forward 
> IncrementalAlterConfigs to the controller.
> This works fine for cluster level configs (e.g., "-entity-type broker 
> --entity-default"), but this fails for specific broker configs (e.g., 
> "-entity-type broker --entity-id 1").
> This affects BROKER and BROKER_LOGGER config types.
> To workaround this bug, you can either disable migrations on the brokers 
> (assuming no migration has taken place), or proceed with the migration and 
> get to the point where KRaft is the controller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16781) Expose advertised.listeners in controller node

2024-05-16 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16781:
-

 Summary: Expose advertised.listeners in controller node
 Key: KAFKA-16781
 URL: https://issues.apache.org/jira/browse/KAFKA-16781
 Project: Kafka
  Issue Type: Improvement
Reporter: Luke Chen


After 
[KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration],
 we allow clients to talk to the KRaft controller node directly. But unlike 
broker node, we don't allow users to config advertised.listeners for clients to 
connect to. Without this config, the client cannot connect to the controller 
node if the controller is sitting behind NAT network while the client is in the 
external network.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16781) Expose advertised.listeners in controller node

2024-05-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16781:
--
Labels: need-kip newbie newbie++  (was: )

> Expose advertised.listeners in controller node
> --
>
> Key: KAFKA-16781
> URL: https://issues.apache.org/jira/browse/KAFKA-16781
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> After 
> [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration],
>  we allow clients to talk to the KRaft controller node directly. But unlike 
> broker node, we don't allow users to config advertised.listeners for clients 
> to connect to. Without this config, the client cannot connect to the 
> controller node if the controller is sitting behind NAT network while the 
> client is in the external network.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-05-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846880#comment-17846880
 ] 

Loïc Greffier commented on KAFKA-16448:
---

[~mjsax] - You can assign it to me

> Add Kafka Streams exception handler for exceptions occuring during processing 
> (KIP-1033)
> 
>
> Key: KAFKA-16448
> URL: https://issues.apache.org/jira/browse/KAFKA-16448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>    Priority: Minor
>
> Jira to follow work on KIP:  [KIP-1033: Add Kafka Streams exception handler 
> for exceptions occuring during 
> processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16780:
-
Description: 
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the non-txn topics. If the topic is enabled with the 
transaction, then we expect the transaction to either commit/rollback within 15 
minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to 
search only a few remote log segments to collect the aborted txns.

  was:
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only a few remote log segments to collect the aborted txns.


> Txn consumer exerts pressure on remote storage when reading non-txn topic
> -
>
> Key: KAFKA-16780
> URL: https://issues.apache.org/jira/browse/KAFKA-16780
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> h3. Logic to read aborted txns:
>  # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads 
> a non-txn topic, then the broker has to 
> [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
>  all the local log segments to collect the aborted transactions since there 
> won't be any entry in the transaction index.
>  # The same 
> [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
>  is applied while reading from remote storage. In this case, when the FETCH 
> request is reading data from the first remote log segment, then it has to 
> fetch the transaction indexes of all the remaining remote-log segments, and 
> then the call lands to the local-log segments before responding to the FETCH 
> request which increases the time taken to serve the requests.
> The [EoS Abort 
> Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
>  design doc explains how the transaction index file filters out the aborted 
> transaction records.
> The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
> level but read the non-txn topics. If the topic is enabled with the 
> transaction, then we expect the transaction to either commit/rollback within 
> 15 minutes (defau

[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16780:
-
Description: 
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only a few remote log segments to collect the aborted txns.

  was:
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only few remote log segments to collect the aborted txns.


> Txn consumer exerts pressure on remote storage when reading non-txn topic
> -
>
> Key: KAFKA-16780
> URL: https://issues.apache.org/jira/browse/KAFKA-16780
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> h3. Logic to read aborted txns:
>  # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads 
> a non-txn topic, then the broker has to 
> [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
>  all the local log segments to collect the aborted transactions since there 
> won't be any entry in the transaction index.
>  # The same 
> [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
>  is applied while reading from remote storage. In this case, when the FETCH 
> request is reading data from the first remote log segment, then it has to 
> fetch the transaction indexes of all the remaining remote-log segments, and 
> then the call lands to the local-log segments before responding to the FETCH 
> request which increases the time taken to serve the requests.
> The [EoS Abort 
> Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
>  design doc explains how the transaction index file filters out the aborted 
> transaction records.
> The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
> level but read the normal topics. If the topic is enabled with the 
> transaction, then we expect the transaction to either commit/rollback within 
> 15 minutes (default t

[jira] [Updated] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-16 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16780:
-
Description: 
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only few remote log segments to collect the aborted txns.

  was:
h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only for one (or) two remote log segments.


> Txn consumer exerts pressure on remote storage when reading non-txn topic
> -
>
> Key: KAFKA-16780
> URL: https://issues.apache.org/jira/browse/KAFKA-16780
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> h3. Logic to read aborted txns:
>  # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads 
> a non-txn topic, then the broker has to 
> [traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
>  all the local log segments to collect the aborted transactions since there 
> won't be any entry in the transaction index.
>  # The same 
> [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
>  is applied while reading from remote storage. In this case, when the FETCH 
> request is reading data from the first remote log segment, then it has to 
> fetch the transaction indexes of all the remaining remote-log segments, and 
> then the call lands to the local-log segments before responding to the FETCH 
> request which increases the time taken to serve the requests.
> The [EoS Abort 
> Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
>  design doc explains how the transaction index file filters out the aborted 
> transaction records.
> The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
> level but read the normal topics. If the topic is enabled with the 
> transaction, then we expect the transaction to either commit/rollback within 
> 15 minutes (default transaction.max.tim

[jira] [Created] (KAFKA-16780) Txn consumer exerts pressure on remote storage when reading non-txn topic

2024-05-16 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16780:


 Summary: Txn consumer exerts pressure on remote storage when 
reading non-txn topic
 Key: KAFKA-16780
 URL: https://issues.apache.org/jira/browse/KAFKA-16780
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash


h3. Logic to read aborted txns:
 # When the consumer enables isolation_level as {{READ_COMMITTED}} and reads a 
non-txn topic, then the broker has to 
[traverse|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LocalLog.scala#L394]
 all the local log segments to collect the aborted transactions since there 
won't be any entry in the transaction index.
 # The same 
[logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1436]
 is applied while reading from remote storage. In this case, when the FETCH 
request is reading data from the first remote log segment, then it has to fetch 
the transaction indexes of all the remaining remote-log segments, and then the 
call lands to the local-log segments before responding to the FETCH request 
which increases the time taken to serve the requests.

The [EoS Abort 
Index|https://docs.google.com/document/d/1Rlqizmk7QCDe8qAnVW5e5X8rGvn6m2DCR3JR2yqwVjc]
 design doc explains how the transaction index file filters out the aborted 
transaction records.

The issue is when consumers are enabled with the {{READ_COMMITTED}} isolation 
level but read the normal topics. If the topic is enabled with the transaction, 
then we expect the transaction to either commit/rollback within 15 minutes 
(default transaction.max.timeout.ms = 15 mins), possibly we may have to search 
only for one (or) two remote log segments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16771) First log directory printed twice when formatting storage

2024-05-16 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846858#comment-17846858
 ] 

Chia-Ping Tsai commented on KAFKA-16771:


[~gongxuanzhang] I have assigned this ticket to you

> First log directory printed twice when formatting storage
> -
>
> Key: KAFKA-16771
> URL: https://issues.apache.org/jira/browse/KAFKA-16771
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Assignee: xuanzhang gong
>Priority: Major
>
> If multiple log directories are set, when running bin/kafka-storage.sh 
> format, the first directory is printed twice. For example:
> {noformat}
> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties --release-version 3.6
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY})
> Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
> Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
> Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16771) First log directory printed twice when formatting storage

2024-05-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16771:
--

Assignee: xuanzhang gong

> First log directory printed twice when formatting storage
> -
>
> Key: KAFKA-16771
> URL: https://issues.apache.org/jira/browse/KAFKA-16771
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Assignee: xuanzhang gong
>Priority: Major
>
> If multiple log directories are set, when running bin/kafka-storage.sh 
> format, the first directory is printed twice. For example:
> {noformat}
> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties --release-version 3.6
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY})
> Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
> Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
> Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16779) Kafka retains logs past specified retention

2024-05-16 Thread Lin Siyuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846853#comment-17846853
 ] 

Lin Siyuan commented on KAFKA-16779:


I'm very sorry,Nicholas Feinberg. I misinterpreted the description. I've rolled 
back.

> Kafka retains logs past specified retention
> ---
>
> Key: KAFKA-16779
> URL: https://issues.apache.org/jira/browse/KAFKA-16779
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Nicholas Feinberg
>Priority: Major
>  Labels: expiration, retention
> Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, 
> kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, 
> state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz
>
>
> In a Kafka cluster with all topics set to four days of retention or longer 
> (34560ms), most brokers seem to be retaining six days of data.
> This is true even for topics which have high throughput (500MB/s, 50k msgs/s) 
> and thus are regularly rolling new log segments. We observe this unexpectedly 
> high retention both via disk usage statistics and by requesting the oldest 
> available messages from Kafka.
> Some of these brokers crashed with an 'mmap failed' error (attached). When 
> those brokers started up again, they returned to the expected four days of 
> retention.
> Manually restarting brokers also seems to cause them to return to four days 
> of retention. Demoting and promoting brokers only has this effect on a small 
> part of the data hosted on a broker.
> These hosts had ~170GiB of free memory available. We saw no signs of pressure 
> on either system or JVM heap memory before or after they reported this error. 
> Committed memory seems to be around 10%, so this doesn't seem to be an 
> overcommit issue.
> This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). 
> Prior to the upgrade, it was running on Kafka 2.4.
> We last reduced retention for ops on May 7th, after which we restored 
> retention to our default of four days. This was the second time we've 
> temporarily reduced and restored retention since the upgrade. This problem 
> did not manifest the previous time we did so, nor did it manifest on our 
> other Kafka 3.7 clusters.
> We are running on AWS 
> [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We 
> have 23 brokers, each with 24 disks. We're running in a JBOD configuration 
> (i.e. unraided).
> Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, 
> we're still using Zookeeper.
> Sample broker logs are attached. The 05-12 and 05-14 logs are from separate 
> hosts. Please let me know if I can provide any further information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-16 Thread Matej Sprysl (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846852#comment-17846852
 ] 

Matej Sprysl commented on KAFKA-15242:
--

Hi Matthias,

did not find the issue you are mentioning, thanks for pointing that out!

The other issue is a subset of this one, not sure if adding a 
`MockFixedKeyProcessorContext` would fully solve this one.

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16779) Kafka retains logs past specified retention

2024-05-16 Thread Lin Siyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lin Siyuan updated KAFKA-16779:
---
Description: 
In a Kafka cluster with all topics set to four days of retention or longer 
(34560ms), most brokers seem to be retaining six days of data.

This is true even for topics which have high throughput (500MB/s, 50k msgs/s) 
and thus are regularly rolling new log segments. We observe this unexpectedly 
high retention both via disk usage statistics and by requesting the oldest 
available messages from Kafka.

Some of these brokers crashed with an 'mmap failed' error (attached). When 
those brokers started up again, they returned to the expected four days of 
retention.

Manually restarting brokers also seems to cause them to return to four days of 
retention. Demoting and promoting brokers only has this effect on a small part 
of the data hosted on a broker.

These hosts had ~170GiB of free memory available. We saw no signs of pressure 
on either system or JVM heap memory before or after they reported this error. 
Committed memory seems to be around 10%, so this doesn't seem to be an 
overcommit issue.

This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). Prior 
to the upgrade, it was running on Kafka 2.4.

We last reduced retention for ops on May 7th, after which we restored retention 
to our default of four days. This was the second time we've temporarily reduced 
and restored retention since the upgrade. This problem did not manifest the 
previous time we did so, nor did it manifest on our other Kafka 3.7 clusters.

We are running on AWS 
[d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We 
have 23 brokers, each with 24 disks. We're running in a JBOD configuration 
(i.e. unraided).

Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, 
we're still using Zookeeper.

Sample broker logs are attached. The 05-12 and 05-14 logs are from separate 
hosts. Please let me know if I can provide any further information.

  was:
在将所有主题设置为保留四天或更长时间(34560毫秒)的 Kafka 集群中,大多数代理似乎保留了六天的数据。

即使对于具有高吞吐量(500MB/s,50k 消息/秒)的主题也是如此,因此会定期滚动新的日志段。我们通过磁盘使用情况统计信息和从 Kafka 
请求最早的可用消息来观察到这种出乎意料的高保留率。

其中一些代理崩溃并显示“mmap 失败”错误(已附)。当这些经纪人再次启动时,他们又回到了预期的四天保留期。

手动重新启动代理似乎也会导致它们恢复到四天的保留期。降级和提升代理仅对代理上托管的一小部分数据产生此影响。

这些主机有 ~170GiB 的可用内存。在他们报告此错误之前或之后,我们没有看到系统或 JVM 堆内存受到压力的迹象。提交的内存似乎在 10% 
左右,因此这似乎不是过度提交问题。

这个 Kafka 集群在两周前(4 月 29 日)升级到了 Kafka 3.7。在升级之前,它运行在 Kafka 2.4 上。

我们上次在 5 月 7 日减少了 ops 的保留期,之后我们将保留期恢复到默认的 4 
天。这是自升级以来,我们第二次暂时减少和恢复保留期。这个问题在我们之前这样做时没有表现出来,也没有在我们的其他 Kafka 3.7 集群上表现出来。

我们在 AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] 
主机上运行。我们有 23 个代理,每个代理有 24 个磁盘。我们在 JBOD 配置中运行(即未突袭)。

由于此集群是从 Kafka 2.4 升级而来的,并且由于我们使用的是 JBOD,因此我们仍然使用 Zookeeper。

附加了示例代理日志。05-12 和 05-14 日志来自不同的主机。如果我能提供任何进一步的信息,请告诉我。


> Kafka retains logs past specified retention
> ---
>
> Key: KAFKA-16779
> URL: https://issues.apache.org/jira/browse/KAFKA-16779
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Nicholas Feinberg
>Priority: Major
>  Labels: expiration, retention
> Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, 
> kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, 
> state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz
>
>
> In a Kafka cluster with all topics set to four days of retention or longer 
> (34560ms), most brokers seem to be retaining six days of data.
> This is true even for topics which have high throughput (500MB/s, 50k msgs/s) 
> and thus are regularly rolling new log segments. We observe this unexpectedly 
> high retention both via disk usage statistics and by requesting the oldest 
> available messages from Kafka.
> Some of these brokers crashed with an 'mmap failed' error (attached). When 
> those brokers started up again, they returned to the expected four days of 
> retention.
> Manually restarting brokers also seems to cause them to return to four days 
> of retention. Demoting and promoting brokers only has this effect on a small 
> part of the data hosted on a broker.
> These hosts had ~170GiB of free memory available. We saw no signs of pressure 
> on either system or JVM heap memory before or after they reported this error. 
> Committed memory seems to be around 10%, so this doesn't seem to be an 
> overcommit issue.
> This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). 
> Prior to the upgrade, it was running on Kafka 2.4.
> We last reduced retention for ops on May 7th, after which we restored 
> retention to our default of four days. This was the second time we've 
> temporarily reduced 

[jira] [Updated] (KAFKA-16779) Kafka retains logs past specified retention

2024-05-16 Thread Lin Siyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lin Siyuan updated KAFKA-16779:
---
Description: 
在将所有主题设置为保留四天或更长时间(34560毫秒)的 Kafka 集群中,大多数代理似乎保留了六天的数据。

即使对于具有高吞吐量(500MB/s,50k 消息/秒)的主题也是如此,因此会定期滚动新的日志段。我们通过磁盘使用情况统计信息和从 Kafka 
请求最早的可用消息来观察到这种出乎意料的高保留率。

其中一些代理崩溃并显示“mmap 失败”错误(已附)。当这些经纪人再次启动时,他们又回到了预期的四天保留期。

手动重新启动代理似乎也会导致它们恢复到四天的保留期。降级和提升代理仅对代理上托管的一小部分数据产生此影响。

这些主机有 ~170GiB 的可用内存。在他们报告此错误之前或之后,我们没有看到系统或 JVM 堆内存受到压力的迹象。提交的内存似乎在 10% 
左右,因此这似乎不是过度提交问题。

这个 Kafka 集群在两周前(4 月 29 日)升级到了 Kafka 3.7。在升级之前,它运行在 Kafka 2.4 上。

我们上次在 5 月 7 日减少了 ops 的保留期,之后我们将保留期恢复到默认的 4 
天。这是自升级以来,我们第二次暂时减少和恢复保留期。这个问题在我们之前这样做时没有表现出来,也没有在我们的其他 Kafka 3.7 集群上表现出来。

我们在 AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] 
主机上运行。我们有 23 个代理,每个代理有 24 个磁盘。我们在 JBOD 配置中运行(即未突袭)。

由于此集群是从 Kafka 2.4 升级而来的,并且由于我们使用的是 JBOD,因此我们仍然使用 Zookeeper。

附加了示例代理日志。05-12 和 05-14 日志来自不同的主机。如果我能提供任何进一步的信息,请告诉我。

  was:
In a Kafka cluster with all topics set to four days of retention or longer 
(34560ms), most brokers seem to be retaining six days of data.

This is true even for topics which have high throughput (500MB/s, 50k msgs/s) 
and thus are regularly rolling new log segments. We observe this unexpectedly 
high retention both via disk usage statistics and by requesting the oldest 
available messages from Kafka.

Some of these brokers crashed with an 'mmap failed' error (attached). When 
those brokers started up again, they returned to the expected four days of 
retention.

Manually restarting brokers also seems to cause them to return to four days of 
retention. Demoting and promoting brokers only has this effect on a small part 
of the data hosted on a broker.

These hosts had ~170GiB of free memory available. We saw no signs of pressure 
on either system or JVM heap memory before or after they reported this error. 
Committed memory seems to be around 10%, so this doesn't seem to be an 
overcommit issue.

This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). Prior 
to the upgrade, it was running on Kafka 2.4.

We last reduced retention for ops on May 7th, after which we restored retention 
to our default of four days. This was the second time we've temporarily reduced 
and restored retention since the upgrade. This problem did not manifest the 
previous time we did so, nor did it manifest on our other Kafka 3.7 clusters.

We are running on AWS 
[d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We 
have 23 brokers, each with 24 disks. We're running in a JBOD configuration 
(i.e. unraided).

Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, 
we're still using Zookeeper.

Sample broker logs are attached. The 05-12 and 05-14 logs are from separate 
hosts. Please let me know if I can provide any further information.


> Kafka retains logs past specified retention
> ---
>
> Key: KAFKA-16779
> URL: https://issues.apache.org/jira/browse/KAFKA-16779
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Nicholas Feinberg
>Priority: Major
>  Labels: expiration, retention
> Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, 
> kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, 
> state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz
>
>
> 在将所有主题设置为保留四天或更长时间(34560毫秒)的 Kafka 集群中,大多数代理似乎保留了六天的数据。
> 即使对于具有高吞吐量(500MB/s,50k 消息/秒)的主题也是如此,因此会定期滚动新的日志段。我们通过磁盘使用情况统计信息和从 Kafka 
> 请求最早的可用消息来观察到这种出乎意料的高保留率。
> 其中一些代理崩溃并显示“mmap 失败”错误(已附)。当这些经纪人再次启动时,他们又回到了预期的四天保留期。
> 手动重新启动代理似乎也会导致它们恢复到四天的保留期。降级和提升代理仅对代理上托管的一小部分数据产生此影响。
> 这些主机有 ~170GiB 的可用内存。在他们报告此错误之前或之后,我们没有看到系统或 JVM 堆内存受到压力的迹象。提交的内存似乎在 10% 
> 左右,因此这似乎不是过度提交问题。
> 这个 Kafka 集群在两周前(4 月 29 日)升级到了 Kafka 3.7。在升级之前,它运行在 Kafka 2.4 上。
> 我们上次在 5 月 7 日减少了 ops 的保留期,之后我们将保留期恢复到默认的 4 
> 天。这是自升级以来,我们第二次暂时减少和恢复保留期。这个问题在我们之前这样做时没有表现出来,也没有在我们的其他 Kafka 3.7 集群上表现出来。
> 我们在 AWS [d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] 
> 主机上运行。我们有 23 个代理,每个代理有 24 个磁盘。我们在 JBOD 配置中运行(即未突袭)。
> 由于此集群是从 Kafka 2.4 升级而来的,并且由于我们使用的是 JBOD,因此我们仍然使用 Zookeeper。
> 附加了示例代理日志。05-12 和 05-14 日志来自不同的主机。如果我能提供任何进一步的信息,请告诉我。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16333) Removed Deprecated methods KTable#join

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846821#comment-17846821
 ] 

Matthias J. Sax commented on KAFKA-16333:
-

In general yes. But we should wait until a final decision was made if the 
release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks.

> Removed Deprecated methods KTable#join
> --
>
> Key: KAFKA-16333
> URL: https://issues.apache.org/jira/browse/KAFKA-16333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> KTable#join() methods taking a `Named` parameter got deprecated in 3.1 
> release via https://issues.apache.org/jira/browse/KAFKA-13813 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16329) Remove Deprecated Task/ThreadMetadata classes and related methods

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846820#comment-17846820
 ] 

Matthias J. Sax commented on KAFKA-16329:
-

In general yes. But we should wait until a final decision was made if the 
release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks.

> Remove Deprecated Task/ThreadMetadata classes and related methods
> -
>
> Key: KAFKA-16329
> URL: https://issues.apache.org/jira/browse/KAFKA-16329
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Deprecated in AK 3.0 via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-744%3A+Migrate+TaskMetadata+and+ThreadMetadata+to+an+interface+with+internal+implementation]
>  
>  * 
> org.apache.kafka.streams.processor.TaskMetadata
>  * org.apache.kafka.streams.processo.ThreadMetadata
>  * org.apache.kafka.streams.KafkaStreams#localThredMetadata
>  * org.apache.kafka.streams.state.StreamsMetadata
>  * org.apache.kafka.streams.KafkaStreams#allMetadata
>  * org.apache.kafka.streams.KafkaStreams#allMetadataForStore
> This is related https://issues.apache.org/jira/browse/KAFKA-16330 and both 
> ticket should be worked on together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16330) Remove Deprecated methods/variables from TaskId

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846819#comment-17846819
 ] 

Matthias J. Sax commented on KAFKA-16330:
-

In general yes. But we should wait until a final decision was made if the 
release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks.

> Remove Deprecated methods/variables from TaskId
> ---
>
> Key: KAFKA-16330
> URL: https://issues.apache.org/jira/browse/KAFKA-16330
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Cf 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  
> This ticket relates to https://issues.apache.org/jira/browse/KAFKA-16329 and 
> both should be worked on together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846818#comment-17846818
 ] 

Matthias J. Sax commented on KAFKA-16448:
-

[~muralibasani] – Thanks for you interest. This ticket is already WIP and the 
corresponding KIP-1033 was accepted, and there is already a PR... [~Dabz] 
should we assign this ticket to you or Loic (not sure if he has an Jira account 
– if not, we should create one so we can assign the ticket to him?)

> Add Kafka Streams exception handler for exceptions occuring during processing 
> (KIP-1033)
> 
>
> Key: KAFKA-16448
> URL: https://issues.apache.org/jira/browse/KAFKA-16448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>    Priority: Minor
>
> Jira to follow work on KIP:  [KIP-1033: Add Kafka Streams exception handler 
> for exceptions occuring during 
> processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846815#comment-17846815
 ] 

Matthias J. Sax commented on KAFKA-16774:
-

Wondering if this is actually a flaky test, or if it exposes a real bug? 

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16774:

Labels: flaky-test  (was: )

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16774:

Component/s: streams
 unit tests

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846814#comment-17846814
 ] 

Matthias J. Sax edited comment on KAFKA-15242 at 5/16/24 5:32 AM:
--

There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP 
– ie, there is KIP-1027.

I believe it would cover this ticket? Wondering of we can close this ticket as 
duplicate?


was (Author: mjsax):
There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP 
– ie, there is KIP-1027.

I believe it would cover this ticket?

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846814#comment-17846814
 ] 

Matthias J. Sax commented on KAFKA-15242:
-

There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP 
– ie, there is KIP-1027.

I believe it would cover this ticket?

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-15 Thread Lenin Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846787#comment-17846787
 ] 

Lenin Joseph commented on KAFKA-16656:
--

Hi [~ChrisEgerton] , the issue seems to be due to aliases containing the 
replication.policy.separator.

ex: 
replication.policy.separator: "-"

 

spec:

  clusters:

  - alias: cluster-1

  - alias: cluster-2

I removed the "-" from the aliases to resolve the issue.

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16771) First log directory printed twice when formatting storage

2024-05-15 Thread xuanzhang gong (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846781#comment-17846781
 ] 

xuanzhang gong commented on KAFKA-16771:


i will handle this issue

> First log directory printed twice when formatting storage
> -
>
> Key: KAFKA-16771
> URL: https://issues.apache.org/jira/browse/KAFKA-16771
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Major
>
> If multiple log directories are set, when running bin/kafka-storage.sh 
> format, the first directory is printed twice. For example:
> {noformat}
> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties --release-version 3.6
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY})
> Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
> Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
> Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16779) Kafka retains logs past specified retention

2024-05-15 Thread Nicholas Feinberg (Jira)
Nicholas Feinberg created KAFKA-16779:
-

 Summary: Kafka retains logs past specified retention
 Key: KAFKA-16779
 URL: https://issues.apache.org/jira/browse/KAFKA-16779
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Nicholas Feinberg
 Attachments: OOM.txt, kafka-20240512.log.gz, kafka-20240514.log.gz, 
kafka-ooms.png, server.log.2024-05-12.gz, server.log.2024-05-14.gz, 
state-change.log.2024-05-12.gz, state-change.log.2024-05-14.gz

In a Kafka cluster with all topics set to four days of retention or longer 
(34560ms), most brokers seem to be retaining six days of data.

This is true even for topics which have high throughput (500MB/s, 50k msgs/s) 
and thus are regularly rolling new log segments. We observe this unexpectedly 
high retention both via disk usage statistics and by requesting the oldest 
available messages from Kafka.

Some of these brokers crashed with an 'mmap failed' error (attached). When 
those brokers started up again, they returned to the expected four days of 
retention.

Manually restarting brokers also seems to cause them to return to four days of 
retention. Demoting and promoting brokers only has this effect on a small part 
of the data hosted on a broker.

These hosts had ~170GiB of free memory available. We saw no signs of pressure 
on either system or JVM heap memory before or after they reported this error. 
Committed memory seems to be around 10%, so this doesn't seem to be an 
overcommit issue.

This Kafka cluster was upgraded to Kafka 3.7 two weeks ago (April 29th). Prior 
to the upgrade, it was running on Kafka 2.4.

We last reduced retention for ops on May 7th, after which we restored retention 
to our default of four days. This was the second time we've temporarily reduced 
and restored retention since the upgrade. This problem did not manifest the 
previous time we did so, nor did it manifest on our other Kafka 3.7 clusters.

We are running on AWS 
[d3en.12xlarge|https://instances.vantage.sh/aws/ec2/d3en.12xlarge] hosts. We 
have 23 brokers, each with 24 disks. We're running in a JBOD configuration 
(i.e. unraided).

Since this cluster was upgraded from Kafka 2.4 and since we're using JBOD, 
we're still using Zookeeper.

Sample broker logs are attached. The 05-12 and 05-14 logs are from separate 
hosts. Please let me know if I can provide any further information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition

2024-05-15 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16778:
--

 Summary: AsyncKafkaConsumer fetcher might occasionally try to 
fetch to a revoked partition
 Key: KAFKA-16778
 URL: https://issues.apache.org/jira/browse/KAFKA-16778
 Project: Kafka
  Issue Type: Bug
Reporter: Philip Nee


 
{code:java}
java.lang.IllegalStateException: No current assignment for partition 
output-topic-26
    at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
    at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542)
    at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411)
    at 
org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143)
    at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
    at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94)
 {code}
The setup is - running 30 consumers consuming from a 300 partitions topic.

We can occasionally get an IllegalStateException from the consumer. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846763#comment-17846763
 ] 

Justine Olshan commented on KAFKA-16692:


Hey [~akaltsikis] yeah, those are the release notes for 3.6. We can probably 
edit the kafka-site repo to get the change to show up in real time, but 
updating via the kafka repo requires waiting for the next release for the site 
to update.

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [h

[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-16692:
---
Affects Version/s: 3.8

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-16692:
---
Affects Version/s: 3.7.0

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Angelos Kaltsikis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846751#comment-17846751
 ] 

Angelos Kaltsikis commented on KAFKA-16692:
---

Tbh I was thinking that we should add  it here 
https://kafka.apache.org/36/documentation.html#upgrade_360_notable

My main concern is till your fix is released with 3.6.3 anyone upgrading from 
<= 3.5.2 that use transactions have the risk on facing the same issues we faced.

It goes without saying, a big Thanks a ton for the contribution and the fix. 

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsTo

[jira] [Assigned] (KAFKA-16626) Uuid to String for subscribed topic names in assignment spec

2024-05-15 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim reassigned KAFKA-16626:


Assignee: Jeff Kim  (was: Ritika Reddy)

> Uuid to String for subscribed topic names in assignment spec
> 
>
> Key: KAFKA-16626
> URL: https://issues.apache.org/jira/browse/KAFKA-16626
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Jeff Kim
>Priority: Major
>
> In creating the assignment spec from the existing consumer subscription 
> metadata, quite some time is spent in converting the String to a Uuid
> Change from Uuid to String for the subscribed topics in assignment spec and 
> convert on the fly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Description: 
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
called continuously, the consumer is not able to 
initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown.

 

There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
but filing this one to provide more context and point out the test failures and 
suggested new tests,. All fail even with the current patch in KAFKA-16637 so 
needs investigation. 

  was:
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consu

[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16777:
--
Component/s: clients

> New consumer should throw NoOffsetForPartitionException on continuous poll 
> zero if no reset strategy
> 
>
> Key: KAFKA-16777
> URL: https://issues.apache.org/jira/browse/KAFKA-16777
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> If the consumer does not define an offset reset strategy, a call to poll 
> should fail with NoOffsetForPartitionException. That works as expected on the 
> new consumer when polling with a timeout > 0 (existing integration test 
> [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
>  but fails when polling continuously with ZERO timeout.
> This can be easily reproduced with a new integration test like this (passes 
> for the legacy consumer but fails for the new consumer). We should add it as 
> part of the fix, for better coverage:
> {code:java}
>   @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
>   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
>   def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
> groupProtocol: String): Unit = {
> this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> "none")
> val consumer = createConsumer(configOverrides = this.consumerConfig)
> consumer.assign(List(tp).asJava)
> // continuous poll should eventually fail because there is no offset 
> reset strategy set (fail only when resetting positions after coordinator is 
> known)
> TestUtils.tryUntilNoAssertionError() {
>   assertThrows(classOf[NoOffsetForPartitionException], () => 
> consumer.poll(Duration.ZERO))
> }
>   }
> {code}
> Also this is covered in the unit test 
> [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
>  that is currently enabled only for the LegacyConsumer. After fixing this 
> issue we should be able to enable it for the new consumer too.
> The issue seems to be around calling poll with ZERO timeout, that even when 
> continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, 
> so the updateFetchPositions never makes it to 
> [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
>  where the exception is thrown.
>  
> There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
> but filing this one to provide more context and point out the test failures 
> and suggested new tests,. All fail even with the current patch in KAFKA-16637 
> so needs investigation. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16777:
---
Description: 
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout.

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:
{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}
Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too.

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown.

 

There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
but filing this one to provide more context and point out the test failures and 
suggested new tests,. All fail even with the current patch in KAFKA-16637 so 
needs investigation. 

  was:
If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout. 

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:

{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}

Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too. 

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the  updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/

[jira] [Created] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy

2024-05-15 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16777:
--

 Summary: New consumer should throw NoOffsetForPartitionException 
on continuous poll zero if no reset strategy
 Key: KAFKA-16777
 URL: https://issues.apache.org/jira/browse/KAFKA-16777
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Lianet Magrans


If the consumer does not define an offset reset strategy, a call to poll should 
fail with NoOffsetForPartitionException. That works as expected on the new 
consumer when polling with a timeout > 0 (existing integration test 
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
 but fails when polling continuously with ZERO timeout. 

This can be easily reproduced with a new integration test like this (passes for 
the legacy consumer but fails for the new consumer). We should add it as part 
of the fix, for better coverage:

{code:java}
  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)

consumer.assign(List(tp).asJava)

// continuous poll should eventually fail because there is no offset reset 
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
  assertThrows(classOf[NoOffsetForPartitionException], () => 
consumer.poll(Duration.ZERO))
}
  }
{code}

Also this is covered in the unit test 
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
 that is currently enabled only for the LegacyConsumer. After fixing this issue 
we should be able to enable it for the new consumer too. 

The issue seems to be around calling poll with ZERO timeout, that even when 
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so 
the  updateFetchPositions never makes it to 
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
 where the exception is thrown. 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846742#comment-17846742
 ] 

Justine Olshan commented on KAFKA-16692:


[~akaltsikis] Hmmm...I've included information in release notes about bugs like 
this, but not sure if there is a place to update between releases. Good new 
though is that the PR is almost ready. Just running a final batch of tests.

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apa

[jira] [Assigned] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-05-15 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-16622:
-

Assignee: Edoardo Comar

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>--from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16776) Builds flaky-failing with leaked client-metrics-reaper thread

2024-05-15 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16776:

Description: 
I observed the assertion added by KAFKA-16477 failing in my build:

[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15469/6/tests]
 Reproduced here:
{noformat}
Build / JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.TopicCommandIntegrationTest<1s
Build / JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.AuthorizerIntegrationTest<1s
Build / JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest<1s
Build / JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest<1s
Build / JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest<1s{noformat}
The exceptions for these tests are as follows:
{noformat}
org.opentest4j.AssertionFailedError: Found 1 unexpected threads during 
@BeforeAll: `executor-client-metrics` ==> expected:  but was: 
    at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
    at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
    at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
    at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
    at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
    at 
app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896)
    at 
app//kafka.server.QuorumTestHarness.setUpClass(QuorumTestHarness.scala:474)
...
    Suppressed: org.opentest4j.AssertionFailedError: Found 1 unexpected threads 
during @AfterAll: `executor-client-metrics` ==> expected:  but was: 

        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
        at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
        at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
        at 
app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896)
        at 
app//kafka.server.QuorumTestHarness.tearDownClass(QuorumTestHarness.scala:482){noformat}
But there is not any additional context about who leaked this thread.

  was:
I observed the assertion added by KAFKA-16477 failing in my build:

[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15469/6/tests]
 Reproduced here:
{noformat}
Build / JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.TopicCommandIntegrationTest<1sBuild / JDK 17 and Scala 
2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.AuthorizerIntegrationTest<1sBuild / JDK 
17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest<1sBuild / JDK 
17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest<1sBuild / 
JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest<1s{noformat}
The exceptions for these tests are as follows:
{noformat}
org.opentest4j.AssertionFailedError: Found 1 unexpected threads during 
@BeforeAll: `executor-client-metrics` ==> expected:  but was: 
    at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
    at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
    at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
    at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
    at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
    at 
app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896)
    at 
app//kafka.server.QuorumTestHarness.setUpClass(QuorumTestHarness.scala:474)
...
    Suppressed: org.opentest4j.AssertionFailedError: Found 1 unexpected threads 
during @AfterAll: `executor-client-metrics` ==> expected:  but was: 

        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
        at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
        at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
        at 
app//kafka.utils.TestUtils$

[jira] [Created] (KAFKA-16776) Builds flaky-failing with leaked client-metrics-reaper thread

2024-05-15 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16776:
---

 Summary: Builds flaky-failing with leaked client-metrics-reaper 
thread
 Key: KAFKA-16776
 URL: https://issues.apache.org/jira/browse/KAFKA-16776
 Project: Kafka
  Issue Type: Test
Affects Versions: 3.8.0
Reporter: Greg Harris


I observed the assertion added by KAFKA-16477 failing in my build:

[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15469/6/tests]
 Reproduced here:
{noformat}
Build / JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.TopicCommandIntegrationTest<1sBuild / JDK 17 and Scala 
2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.AuthorizerIntegrationTest<1sBuild / JDK 
17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest<1sBuild / JDK 
17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest<1sBuild / 
JDK 17 and Scala 2.13 / initializationError – 
org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest<1s{noformat}
The exceptions for these tests are as follows:
{noformat}
org.opentest4j.AssertionFailedError: Found 1 unexpected threads during 
@BeforeAll: `executor-client-metrics` ==> expected:  but was: 
    at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
    at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
    at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
    at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
    at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
    at 
app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896)
    at 
app//kafka.server.QuorumTestHarness.setUpClass(QuorumTestHarness.scala:474)
...
    Suppressed: org.opentest4j.AssertionFailedError: Found 1 unexpected threads 
during @AfterAll: `executor-client-metrics` ==> expected:  but was: 

        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
        at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
        at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
        at 
app//kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1896)
        at 
app//kafka.server.QuorumTestHarness.tearDownClass(QuorumTestHarness.scala:482){noformat}
But there is not any additional context about who leaked this thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16775) Fix flaky PlaintextAdminIntegrationTest#testCreateExistingTopicsThrowTopicExistsException

2024-05-15 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16775:
--

 Summary: Fix flaky 
PlaintextAdminIntegrationTest#testCreateExistingTopicsThrowTopicExistsException
 Key: KAFKA-16775
 URL: https://issues.apache.org/jira/browse/KAFKA-16775
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.opentest4j.AssertionFailedError: timed out waiting for topics
 at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
 at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:138)
 at 
app//kafka.api.BaseAdminIntegrationTest.waitForTopics(BaseAdminIntegrationTest.scala:236)
 at 
app//kafka.api.PlaintextAdminIntegrationTest.testCreateExistingTopicsThrowTopicExistsException(PlaintextAdminIntegrationTest.scala:140)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-15 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16774:
--

 Summary: fix flaky 
StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
 Key: KAFKA-16774
 URL: https://issues.apache.org/jira/browse/KAFKA-16774
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


java.util.ConcurrentModificationException
 at 
java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
 at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
 at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
 at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
 at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
 at 
org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
 at 
org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
 at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16773) Fix flaky QuorumControllerTest#testDelayedConfigurationOperations

2024-05-15 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16773:
--

 Summary: Fix flaky 
QuorumControllerTest#testDelayedConfigurationOperations
 Key: KAFKA-16773
 URL: https://issues.apache.org/jira/browse/KAFKA-16773
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0
 at org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:788)
 at 
org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1267)
 at 
org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:546)
 at 
org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179)
 at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:878)
 at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:868)
 at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149)
 at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138)
 at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
 at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
 at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Attempt to resign from epoch 1 
which is larger than the current epoch 0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-15 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846705#comment-17846705
 ] 

Igor Soarez commented on KAFKA-16760:
-

[~showuon] I a few hours looking into this but couldn't yet figure out why the 
test is failing. 

Regarding the logs you first mentioned:
Will await the new LeaderAndIsr state before resuming fetching.
I'm not sure this is normal. I've also not found an integration test covering 
alterLogDir for KRaft, so we need to add that!

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16772) Introduce a type for kraft.version

2024-05-15 Thread Jira
José Armando García Sancio created KAFKA-16772:
--

 Summary: Introduce a type for kraft.version
 Key: KAFKA-16772
 URL: https://issues.apache.org/jira/browse/KAFKA-16772
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: José Armando García Sancio


Instead of using a "short" to represent the kraft.version introduce a class or 
enum to provide a better abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition

2024-05-15 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846684#comment-17846684
 ] 

Greg Harris commented on KAFKA-16768:
-

[~muralibasani] Do you mean call Acceptor#close? That might cause an infinite 
loop (because Acceptor#close calls Processor#close which calls 
Processor#closeAll).
I think if the Processor closeAll (where newConnections is drained) could call 
just the acceptor thread `join` and get the same effect, but without the 
infinite loop.

> SocketServer leaks accepted SocketChannel instances due to race condition
> -
>
> Key: KAFKA-16768
> URL: https://issues.apache.org/jira/browse/KAFKA-16768
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Priority: Major
>
> The SocketServer has threads for Acceptors and Processors. These threads 
> communicate via Processor#accept/Processor#configureNewConnections and the 
> `newConnections` queue.
> During shutdown, the Acceptor and Processors are each stopped by setting 
> shouldRun to false, and then shutdown proceeds asynchronously in all 
> instances together. This leads to a race condition where an Acceptor accepts 
> a SocketChannel and queues it to a Processor, but that Processor instance has 
> already started shutting down and has already drained the newConnections 
> queue.
> KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely 
> different implementation but has the same flaw.
> An example execution order that includes this leak:
> 1. Acceptor#accept() is called, and a new SocketChannel is accepted.
> 2. Acceptor#assignNewConnection() begins
> 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor 
> and attached Processor instances
> 4. Processor#run() checks the shouldRun variable, and exits the loop
> 5. Processor#closeAll() executes, and drains the `newConnections` variable
> 6. Processor#run() returns and the Processor thread terminates
> 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the 
> SocketChannel to `newConnections`
> 8. Acceptor#assignNewConnection() returns
> 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the 
> Acceptor thread terminates.
> 10. Acceptor#close() joins all of the terminated threads, and returns
> At the end of this sequence, there are still open SocketChannel instances in 
> newConnections, which are then considered leaked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-05-15 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846675#comment-17846675
 ] 

Muralidhar Basani commented on KAFKA-16448:
---

[~Dabz] can I try looking into this ?

> Add Kafka Streams exception handler for exceptions occuring during processing 
> (KIP-1033)
> 
>
> Key: KAFKA-16448
> URL: https://issues.apache.org/jira/browse/KAFKA-16448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>    Priority: Minor
>
> Jira to follow work on KIP:  [KIP-1033: Add Kafka Streams exception handler 
> for exceptions occuring during 
> processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16771) First log directory printed twice when formatting storage

2024-05-15 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16771:
--

 Summary: First log directory printed twice when formatting storage
 Key: KAFKA-16771
 URL: https://issues.apache.org/jira/browse/KAFKA-16771
 Project: Kafka
  Issue Type: Task
  Components: tools
Affects Versions: 3.7.0
Reporter: Mickael Maison


If multiple log directories are set, when running bin/kafka-storage.sh format, 
the first directory is printed twice. For example:

{noformat}
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties --release-version 3.6
metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY})
Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2.
{noformat}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16770) Coalesce records into bigger batches

2024-05-15 Thread David Jacot (Jira)
David Jacot created KAFKA-16770:
---

 Summary: Coalesce records into bigger batches
 Key: KAFKA-16770
 URL: https://issues.apache.org/jira/browse/KAFKA-16770
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-15 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846662#comment-17846662
 ] 

Chris Egerton commented on KAFKA-16603:
---

[~dasarianil] In most cases, a single worker-global consumer is used to read 
offsets from the offsets topic. If a connector uses a separate offsets topic 
(which can happen if it is explicitly configured to use one via the 
[offsets.storage.topic|https://kafka.apache.org/documentation.html#sourceconnectorconfigs_offsets.storage.topic]
 property in its configuration, or if exactly-once support is enabled and the 
connector is configured to write to a different Kafka cluster than the one used 
by the Kafka Connect worker for its internal topics), then a new consumer is 
spun up for that connector, but that consumer will also be constantly polling 
the topic for changes and reads to the end of that topic should not lead to a 
significant bump in load on the Kafka cluster.

 

As far as communicating committed offsets to the {{SourceConnector}} instance 
goes, we cannot reuse state stored in the JVM to do this in most cases because 
Kafka Connect is intended to run in distributed mode with multiple workers, and 
there is no guarantee that the {{SourceConnector}} instance is running on the 
same worker as any of its tasks.

Again, though, we could possibly add some hook to the {{SourceConnector}} class 
that lets implementations know when new offsets have been successfully 
committed by any of its tasks. Would that be sufficient?

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTask

[jira] [Commented] (KAFKA-16330) Remove Deprecated methods/variables from TaskId

2024-05-15 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846661#comment-17846661
 ] 

Muralidhar Basani commented on KAFKA-16330:
---

I think very soon 4.0.0 might be the upcoming release after 3.8.0. May I pick 
this up ?

> Remove Deprecated methods/variables from TaskId
> ---
>
> Key: KAFKA-16330
> URL: https://issues.apache.org/jira/browse/KAFKA-16330
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Cf 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  
> This ticket relates to https://issues.apache.org/jira/browse/KAFKA-16329 and 
> both should be worked on together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16329) Remove Deprecated Task/ThreadMetadata classes and related methods

2024-05-15 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846660#comment-17846660
 ] 

Muralidhar Basani commented on KAFKA-16329:
---

I think very soon 4.0.0 might be the upcoming release after 3.8.0. May I pick 
this up ?

> Remove Deprecated Task/ThreadMetadata classes and related methods
> -
>
> Key: KAFKA-16329
> URL: https://issues.apache.org/jira/browse/KAFKA-16329
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Deprecated in AK 3.0 via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-744%3A+Migrate+TaskMetadata+and+ThreadMetadata+to+an+interface+with+internal+implementation]
>  
>  * 
> org.apache.kafka.streams.processor.TaskMetadata
>  * org.apache.kafka.streams.processo.ThreadMetadata
>  * org.apache.kafka.streams.KafkaStreams#localThredMetadata
>  * org.apache.kafka.streams.state.StreamsMetadata
>  * org.apache.kafka.streams.KafkaStreams#allMetadata
>  * org.apache.kafka.streams.KafkaStreams#allMetadataForStore
> This is related https://issues.apache.org/jira/browse/KAFKA-16330 and both 
> ticket should be worked on together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16333) Removed Deprecated methods KTable#join

2024-05-15 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846658#comment-17846658
 ] 

Muralidhar Basani commented on KAFKA-16333:
---

I think very soon 4.0.0 might be the upcoming release after 3.8.0. May I pick 
this up ?

> Removed Deprecated methods KTable#join
> --
>
> Key: KAFKA-16333
> URL: https://issues.apache.org/jira/browse/KAFKA-16333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> KTable#join() methods taking a `Named` parameter got deprecated in 3.1 
> release via https://issues.apache.org/jira/browse/KAFKA-13813 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-15 Thread Matej Sprysl (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846632#comment-17846632
 ] 

Matej Sprysl commented on KAFKA-15242:
--

I would also like this fixed. Currently, the API provides no other way to test 
FixedKeyProcessors other than creating a full test driver topology.

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition

2024-05-15 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846607#comment-17846607
 ] 

Muralidhar Basani commented on KAFKA-16768:
---

To fix this, when Processor#closeAll() is called, should we close all the 
acceptors in dataPlaneAcceptors, so that no new connections are accepted ?

 

> SocketServer leaks accepted SocketChannel instances due to race condition
> -
>
> Key: KAFKA-16768
> URL: https://issues.apache.org/jira/browse/KAFKA-16768
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Priority: Major
>
> The SocketServer has threads for Acceptors and Processors. These threads 
> communicate via Processor#accept/Processor#configureNewConnections and the 
> `newConnections` queue.
> During shutdown, the Acceptor and Processors are each stopped by setting 
> shouldRun to false, and then shutdown proceeds asynchronously in all 
> instances together. This leads to a race condition where an Acceptor accepts 
> a SocketChannel and queues it to a Processor, but that Processor instance has 
> already started shutting down and has already drained the newConnections 
> queue.
> KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely 
> different implementation but has the same flaw.
> An example execution order that includes this leak:
> 1. Acceptor#accept() is called, and a new SocketChannel is accepted.
> 2. Acceptor#assignNewConnection() begins
> 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor 
> and attached Processor instances
> 4. Processor#run() checks the shouldRun variable, and exits the loop
> 5. Processor#closeAll() executes, and drains the `newConnections` variable
> 6. Processor#run() returns and the Processor thread terminates
> 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the 
> SocketChannel to `newConnections`
> 8. Acceptor#assignNewConnection() returns
> 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the 
> Acceptor thread terminates.
> 10. Acceptor#close() joins all of the terminated threads, and returns
> At the end of this sequence, there are still open SocketChannel instances in 
> newConnections, which are then considered leaked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Angelos Kaltsikis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846592#comment-17846592
 ] 

Angelos Kaltsikis edited comment on KAFKA-16692 at 5/15/24 11:36 AM:
-

Shall we explicitly state this on the documents till your fix is released?
I am happy to ship a PR about that :) 


was (Author: akaltsikis):
Shall we explicitly state this on the documents till your fix is released?

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the netwo

[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846591#comment-17846591
 ] 

Justine Olshan commented on KAFKA-16692:


The delay in the fix is coming from the fact that reproducing the issue in a 
system test is hard because while I can get the error to appear, it is hard to 
consistently get a test to fail due to it. I assure you I'm making some 
progress and hope to have a fix by the end of the week :) 

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we crea

[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Angelos Kaltsikis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846592#comment-17846592
 ] 

Angelos Kaltsikis commented on KAFKA-16692:
---

Shall we explicitly state this on the documents till your fix is released?

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ 

[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846590#comment-17846590
 ] 

Justine Olshan commented on KAFKA-16692:


Hi [~akaltsikis]. Yes the bug is due to the verification requests during the 
upgrade. If you disable the verification, you won't hit the bug.

However, the feature was designed to allow upgrades, and there seems to be a 
small bug in the logic to gate the requests during the upgrade. I'm working on 
fixing that.

In the meantime, disabling the feature on upgrade is a work around.

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitions

[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Angelos Kaltsikis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846577#comment-17846577
 ] 

Angelos Kaltsikis commented on KAFKA-16692:
---

It seems that adding the `transaction.partition.verification.enable=false` to 
the 3.6.2 brokers makes the issue disappear. 

That means that when upgrading from 3.5.2 to 3.6.2, if the above config exists, 
there are no visible errors on the Kafka side or on the client side.

The same is true when downgrading from 3.6.2 to 3.5.2

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}No

[jira] [Comment Edited] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Angelos Kaltsikis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846572#comment-17846572
 ] 

Angelos Kaltsikis edited comment on KAFKA-16692 at 5/15/24 10:03 AM:
-

We faced the same issue when we tried to upgrade from 3.5.2 to 3.6.2 a few days 
ago. This is reproducible every time i am trying to upgrade from 3.5.2->3.6.2 
or vice-versa (reverting from 3.6.2 -> 3.5.2)

We are using a lot of clients that use transactions, thus those transactions 
fail on the client side every time we do those operations.

The interesting part is that as soon as the upgrade/downgrade is finished being 
rolled the errors disappear and clients seem to work fine. Thus this is visible 
only during the rolling deployment for the upgrade.

Let me know if i can help somehow


was (Author: akaltsikis):
We faced the same issue when we tried to upgrade from 3.5.2 to 3.6.2 a few days 
ago. This is reproducible every time i am trying to upgrade from 3.5.2->3.6.2 
or vice-versa (reverting from 3.6.2 -> 3.5.2)

We are using a lot of clients that use transactions, thus those transactions 
fail on the client side every time we do those operations.

Let me know if i can help somehow

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server

[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-15 Thread Angelos Kaltsikis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846572#comment-17846572
 ] 

Angelos Kaltsikis commented on KAFKA-16692:
---

We faced the same issue when we tried to upgrade from 3.5.2 to 3.6.2 a few days 
ago. This is reproducible every time i am trying to upgrade from 3.5.2->3.6.2 
or vice-versa (reverting from 3.6.2 -> 3.5.2)

We are using a lot of clients that use transactions, thus those transactions 
fail on the client side every time we do those operations.

Let me know if i can help somehow

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPart

[jira] [Comment Edited] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-15 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846552#comment-17846552
 ] 

Igor Soarez edited comment on KAFKA-16760 at 5/15/24 8:57 AM:
--

[~showuon] thanks for updating the branch, I can reproduce it now and will have 
a look.


was (Author: soarez):
[~showuon] thanks for updating the branch, I'll have a look at this.

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-15 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846552#comment-17846552
 ] 

Igor Soarez commented on KAFKA-16760:
-

[~showuon] thanks for updating the branch, I'll have a look at this.

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join

2024-05-15 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846544#comment-17846544
 ] 

Karsten Stöckmann commented on KAFKA-16700:
---

[~ayoubomari] Hm, according to documentation, it actually is also about 
{{null}} foreign keys.
{quote} * left-foreign-key join Ktable-Ktable: no longer drop left records with 
null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 
'null' for right value.{quote}
Thus, this should not come into play as a potential issue here. (Also, as 
mentioned, foreign keys are never {{null}} as their source table column is not 
nullable.)

> Kafka Streams: possible message loss on KTable-KTable FK Left Join
> --
>
> Key: KAFKA-16700
> URL: https://issues.apache.org/jira/browse/KAFKA-16700
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
> Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 
> 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka 
> Operators
>Reporter: Karsten Stöckmann
>Priority: Major
>  Labels: dsl, joins, streams
>
> We are experiencing significant, yet intermittent / non-deterministic / 
> unexplainable message loss on a Kafka Streams topology while performing a 
> *KTable-KTable* {*}FK Left Join{*}.
> Assume the following snippet:
> {code:java}
> streamsBuilder
> .table(
> folderTopicName,
> Consumed.with(
> folderKeySerde,
> folderSerde))
> .leftJoin(
> agencies, // KTable
> Folder::agencyIdValue,
> AggregateFolder::new,
> TableJoined.as("folder-to-agency"),
> Materialized
> .as("folder-to-agency-materialized")
> .withKeySerde(folderKeySerde)
> .withValueSerde(aggregateFolderSerde))
> .leftJoin(
> documents,
> {code}
> The setup is as follows:
> A Debezium Connector for PostgreSQL streams database changes into various 
> Kafka topics. A series of Quarkus Kafka Streams applications then performs 
> aggregation operations on those topics to create index documents later to be 
> sent into an OpenSearch system.
> When firing up the Kafka Streams infrastructure to work on initially 
> populated Kafka Topics (i.e. a snapshot of all relevant table data has been 
> streamed to Kafka), the above shown KTable-KTable FK Left Join seems to 
> produce message loss on the first of a series of FK Left Joins; the right 
> hand {{KTable}} is consumed from an aggregated 
> topic fed from another Kafka Streams topology / application.
> On a (heavily reduced) test data set of 6828 messages in the 
> {{folderTopicName}} Topic, we observe the following results:
>  * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
>  * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
>  * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
>  * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
> Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
> left hand topic should produce an aggregate even if no matching message is 
> found in the right hand topic.
> Message loss unpredictably varies across tests and seems not to be bound to 
> specific keys or messages.
> As it seems, this can only be observed when initially firing up the Streams 
> infrastructure to process the message 'backlog' that had been snapshotted by 
> Debezium. A manual snapshot triggered later (i.e. Streams applications 
> already running) seems not to show this behaviour. Additionally, as of yet we 
> observed this kind of message loss only when running multiple replicas of the 
> affected application. When carrying out the tests with only one replica, 
> everything seems to work as expected. We've tried to leverage 
> {{group.initial.rebalance.delay.ms}} in order to rule out possible 
> rebalancing issues, but to no avail.
> Our Kafka configuration:
> {code:yaml}
> offsets.topic.replication.factor: 3
> transaction.state.log.replication.factor: 3
> transaction.state.log.min.isr: 2
> default.replication.factor: 3
> min.insync.replicas: 2
> message.max.bytes: "20971520"
> {code}
> Our Kafka Streams application configuration:
> {code:yaml}
> kafka-streams.num.stream.threads: 5
> kafka-streams.num.standby.replicas: 1
> kafka-streams.auto.offset.reset: earliest
> kafka

[jira] [Created] (KAFKA-16769) Delete deprecated add.source.alias.to.metrics configuration

2024-05-15 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16769:
--

 Summary: Delete deprecated add.source.alias.to.metrics 
configuration
 Key: KAFKA-16769
 URL: https://issues.apache.org/jira/browse/KAFKA-16769
 Project: Kafka
  Issue Type: Task
  Components: mirrormaker
Reporter: Mickael Maison
Assignee: Mickael Maison
 Fix For: 4.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-15 Thread Laymain (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846522#comment-17846522
 ] 

Laymain commented on KAFKA-16361:
-

Thanks [~ableegoldman] and [~flashmouse], I am glad to learn that the fix has 
been merged.

As a workaround, it is also possible to use a different Assignor; in our case, 
for instance, foregoing rack awareness would incur significant additional costs.

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: 
> rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader 
> = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 
> 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 
> 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], 

[jira] (KAFKA-16616) refactor mergeWith in MetadataSnapshot

2024-05-15 Thread Cao Manh Dat (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16616 ]


Cao Manh Dat deleted comment on KAFKA-16616:
--

was (Author: caomanhdat):
Hi [~alyssahuang] , can I work on this item?

> refactor mergeWith in MetadataSnapshot
> --
>
> Key: KAFKA-16616
> URL: https://issues.apache.org/jira/browse/KAFKA-16616
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Alyssa Huang
>Priority: Minor
>
> Right now we keep track of topic ids and partition metadata to add/update 
> separately in mergeWith (e.g. two maps passed as arguments). This means we 
> iterate over topic metadata twice which could be costly when we're dealing 
> with a large number of updates. 
> `updatePartitionLeadership` which calls `mergeWith` does something similarly 
> (generates map of topic ids to update in a loop separate from the list of 
> partition metadata to update) and should be refactored as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16616) refactor mergeWith in MetadataSnapshot

2024-05-15 Thread Cao Manh Dat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846515#comment-17846515
 ] 

Cao Manh Dat commented on KAFKA-16616:
--

Hi [~alyssahuang] , can I work on this item?

> refactor mergeWith in MetadataSnapshot
> --
>
> Key: KAFKA-16616
> URL: https://issues.apache.org/jira/browse/KAFKA-16616
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Alyssa Huang
>Priority: Minor
>
> Right now we keep track of topic ids and partition metadata to add/update 
> separately in mergeWith (e.g. two maps passed as arguments). This means we 
> iterate over topic metadata twice which could be costly when we're dealing 
> with a large number of updates. 
> `updatePartitionLeadership` which calls `mergeWith` does something similarly 
> (generates map of topic ids to update in a loop separate from the list of 
> partition metadata to update) and should be refactored as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16760) alterReplicaLogDirs failed even if responded with none error

2024-05-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846508#comment-17846508
 ] 

Luke Chen commented on KAFKA-16760:
---

[~soarez], I'm really sorry, I can't believe I didn't commit my change upto the 
branch yesterday. I just updated the branch, and it reliably failed in my env 
after 3 run. Please give it a try again. 
I'd like to know if this is the expected behavior, or it is a bug?

> alterReplicaLogDirs failed even if responded with none error
> 
>
> Key: KAFKA-16760
> URL: https://issues.apache.org/jira/browse/KAFKA-16760
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When firing alterLogDirRequest, it gets error NONE result. But actually, the 
> alterLogDir never happened with these errors:
> {code:java}
> [2024-05-14 16:48:50,796] INFO [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 has an older epoch (0) than the current leader. Will await the new 
> LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaAlterLogDirsThread:66)
> [2024-05-14 16:48:50,796] WARN [ReplicaAlterLogDirsThread-1]: Partition 
> topicB-0 marked as failed (kafka.server.ReplicaAlterLogDirsThread:70)
> {code}
> Note: It's under KRaft mode. So the log with LeaderAndIsr is wrong. 
> This can be reproduced in this 
> [branch|https://github.com/showuon/kafka/tree/alterLogDirTest] and running 
> this test:
> {code:java}
> ./gradlew cleanTest storage:test --tests 
> org.apache.kafka.tiered.storage.integration.AlterLogDirTest
> {code}
> The complete logs can be found here: 
> https://gist.github.com/showuon/b16cdb05a125a7c445cc6e377a2b7923



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14

2024-05-14 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16763:
--

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> Upgrade to scala 2.12.19 and scala 2.13.14
> --
>
> Key: KAFKA-16763
> URL: https://issues.apache.org/jira/browse/KAFKA-16763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
>
> scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19)
>  
> scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16671) Revisit SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured

2024-05-14 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16671.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Revisit SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured
> 
>
> Key: KAFKA-16671
> URL: https://issues.apache.org/jira/browse/KAFKA-16671
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> loop 1000times on my local, and all pass. Let's enable the test to see what 
> happens in our CI



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread appchemist (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846483#comment-17846483
 ] 

appchemist commented on KAFKA-16764:


Hi [~lianetm] 

 

I would like to take this issue

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition

2024-05-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846475#comment-17846475
 ] 

Greg Harris commented on KAFKA-16765:
-

This is also a bug in EchoServer: 
[https://github.com/apache/kafka/blob/cb968845ecb3cb0982182d9dd437ecf652fe38d3/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java#L76-L81]
 and ServerShutdownTest: 
[https://github.com/apache/kafka/blob/cb968845ecb3cb0982182d9dd437ecf652fe38d3/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala#L274-L275]
 except those don't require a race condition to happen.

> NioEchoServer leaks accepted SocketChannel instances due to race condition
> --
>
> Key: KAFKA-16765
> URL: https://issues.apache.org/jira/browse/KAFKA-16765
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Priority: Minor
>
> The NioEchoServer has an AcceptorThread that calls accept() to open new 
> SocketChannel instances and insert them into the `newChannels` List, and a 
> main thread that drains the `newChannels` List and moves them to the 
> `socketChannels` List.
> During shutdown, the serverSocketChannel is closed, which causes both threads 
> to exit their while loops. It is possible for the NioEchoServer main thread 
> to sense the serverSocketChannel close and terminate before the Acceptor 
> thread does, and for the Acceptor thread to put a SocketChannel in 
> `newChannels` before terminating. This instance is never closed by either 
> thread, because it is never moved to `socketChannels`.
> A precise execution order that has this leak is:
> 1. NioEchoServer thread locks `newChannels`.
> 2. Acceptor thread accept() completes, and the SocketChannel is created
> 3. Acceptor thread blocks waiting for the `newChannels` lock
> 4. NioEchoServer thread releases the `newChannels` lock and does some 
> processing
> 5. NioEchoServer#close() is called, which closes the serverSocketChannel
> 6. NioEchoServer thread checks serverSocketChannel.isOpen() and then 
> terminates
> 7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel 
> to `newChannels`.
> 8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates.
> 9. NioEchoServer#close() stops blocking now that both other threads have 
> terminated.
> The end result is that the leaked socket is left open in the `newChannels` 
> list at the end of close(), which is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition

2024-05-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16768:

Description: 
The SocketServer has threads for Acceptors and Processors. These threads 
communicate via Processor#accept/Processor#configureNewConnections and the 
`newConnections` queue.

During shutdown, the Acceptor and Processors are each stopped by setting 
shouldRun to false, and then shutdown proceeds asynchronously in all instances 
together. This leads to a race condition where an Acceptor accepts a 
SocketChannel and queues it to a Processor, but that Processor instance has 
already started shutting down and has already drained the newConnections queue.

KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely 
different implementation but has the same flaw.

An example execution order that includes this leak:
1. Acceptor#accept() is called, and a new SocketChannel is accepted.
2. Acceptor#assignNewConnection() begins
3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor 
and attached Processor instances
4. Processor#run() checks the shouldRun variable, and exits the loop
5. Processor#closeAll() executes, and drains the `newConnections` variable
6. Processor#run() returns and the Processor thread terminates
7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the 
SocketChannel to `newConnections`
8. Acceptor#assignNewConnection() returns
9. Acceptor#run() checks the shouldRun variable and exits the loop, and the 
Acceptor thread terminates.
10. Acceptor#close() joins all of the terminated threads, and returns

At the end of this sequence, there are still open SocketChannel instances in 
newConnections, which are then considered leaked.

  was:
The SocketServer has threads for Acceptors and Processors. These threads 
communicate via Processor#accept/Processor#configureNewConnections and the 
`newConnections` queue.

During shutdown, the Acceptor and Processors are each stopped by setting 
shouldRun to false, and then shutdown proceeds asynchronously in all instances 
together. This leads to a race condition where an Acceptor accepts a 
SocketChannel and queues it to a Processor, but that Processor instance has 
already started shutting down and has already drained the newConnections queue.


> SocketServer leaks accepted SocketChannel instances due to race condition
> -
>
> Key: KAFKA-16768
> URL: https://issues.apache.org/jira/browse/KAFKA-16768
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.8.0
>Reporter: Greg Harris
>Priority: Major
>
> The SocketServer has threads for Acceptors and Processors. These threads 
> communicate via Processor#accept/Processor#configureNewConnections and the 
> `newConnections` queue.
> During shutdown, the Acceptor and Processors are each stopped by setting 
> shouldRun to false, and then shutdown proceeds asynchronously in all 
> instances together. This leads to a race condition where an Acceptor accepts 
> a SocketChannel and queues it to a Processor, but that Processor instance has 
> already started shutting down and has already drained the newConnections 
> queue.
> KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely 
> different implementation but has the same flaw.
> An example execution order that includes this leak:
> 1. Acceptor#accept() is called, and a new SocketChannel is accepted.
> 2. Acceptor#assignNewConnection() begins
> 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor 
> and attached Processor instances
> 4. Processor#run() checks the shouldRun variable, and exits the loop
> 5. Processor#closeAll() executes, and drains the `newConnections` variable
> 6. Processor#run() returns and the Processor thread terminates
> 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the 
> SocketChannel to `newConnections`
> 8. Acceptor#assignNewConnection() returns
> 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the 
> Acceptor thread terminates.
> 10. Acceptor#close() joins all of the terminated threads, and returns
> At the end of this sequence, there are still open SocketChannel instances in 
> newConnections, which are then considered leaked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16768) SocketServer leaks accepted SocketChannel instances due to race condition

2024-05-14 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16768:
---

 Summary: SocketServer leaks accepted SocketChannel instances due 
to race condition
 Key: KAFKA-16768
 URL: https://issues.apache.org/jira/browse/KAFKA-16768
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.8.0
Reporter: Greg Harris


The SocketServer has threads for Acceptors and Processors. These threads 
communicate via Processor#accept/Processor#configureNewConnections and the 
`newConnections` queue.

During shutdown, the Acceptor and Processors are each stopped by setting 
shouldRun to false, and then shutdown proceeds asynchronously in all instances 
together. This leads to a race condition where an Acceptor accepts a 
SocketChannel and queues it to a Processor, but that Processor instance has 
already started shutting down and has already drained the newConnections queue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16763) Upgrade to scala 2.12.19 and scala 2.13.14

2024-05-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846471#comment-17846471
 ] 

黃竣陽 commented on KAFKA-16763:
-

I will handle this issue.

> Upgrade to scala 2.12.19 and scala 2.13.14
> --
>
> Key: KAFKA-16763
> URL: https://issues.apache.org/jira/browse/KAFKA-16763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> scala 2.12.19 (https://github.com/scala/scala/releases/tag/v2.12.19)
>  
> scala 2.13.14 (https://github.com/scala/scala/releases/tag/v2.13.14)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846467#comment-17846467
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-16361 at 5/15/24 12:24 AM:
--

Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
KAFKA-15170

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though


was (Author: ableegoldman):
Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
[KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170]

 

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

 

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assig

[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846467#comment-17846467
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16361:


Thanks, I think it's safe to say this is related to the rack-aware assignment 
code that was added in 3.5. Probably the same issue that [~flashmouse] found in 
[KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170]

 

Fortunately I just merged that fix and cherrypicked it back to 3.7, so the 
patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix 
release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is 
just a day from KIP freeze which means if all goes well, it will be available 
in a little over a month.

 

If you need an immediate resolution in the meantime then you have two options:

1) disable rack-awareness which will effectively make the assignor just skip 
over the buggy code

2) if you can build from source and don't require an official release, just 
cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch 
with whatever version you'd like to use and compile it yourself. I wouldn't 
recommend building directly from trunk for a production environment since that 
contains untested code, but you can at least run your test again using the 
latest trunk build if you want to make sure that it fixes the issue you're 
experiencing. I'm pretty confident it will though

> Rack aware sticky assignor minQuota violations
> --
>
> Key: KAFKA-16361
> URL: https://issues.apache.org/jira/browse/KAFKA-16361
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.5.1, 3.7.0, 3.6.1
>Reporter: Luke D
>Priority: Major
> Attachments: illegalstateexception.log
>
>
> In some low topic replication scenarios the rack aware assignment in the 
> StickyAssignor fails to balance consumers to its own expectations and throws 
> an IllegalStateException, commonly crashing the application (depending on 
> application implementation). While uncommon the error is deterministic, and 
> so persists until the replication state changes. 
>  
> We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
> locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
> would also be reproducible there) 
>  
> Here is the error and stack from our test case against 3.7.0
> {code:java}
> We haven't reached the expected number of members with more than the minQuota 
> partitions, but no more partitions to be assigned
> java.lang.IllegalStateException: We haven't reached the expected number of 
> members with more than the minQuota partitions, but no more partitions to be 
> assigned
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
>  {code}
> Here is a specific test case from 3.7.0 that fails when passed to 
> StickyAssignor.assign:
> {code:java}
> Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 
> (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: 
> rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader 
> = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 
> 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 
> 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 
> 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = 
> topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], 
> offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 
> 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partit

[jira] [Updated] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15170:
---
Fix Version/s: 3.8.0
   3.7.1

> CooperativeStickyAssignor cannot adjust assignment correctly
> 
>
> Key: KAFKA-15170
> URL: https://issues.apache.org/jira/browse/KAFKA-15170
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment 
> when all consumers in group subscribe the same topic list, but it couldn't 
> add all partitions move owner to another consumer to 
> ``partitionsWithMultiplePreviousOwners``.
>  
> the reason is in function assignOwnedPartitions hasn't add partitions that 
> rack-mismatch with prev owner to allRevokedPartitions, then partition only in 
> this list would add to partitionsWithMultiplePreviousOwners.
>  
> In Cooperative Rebalance, partitions have changed owner must be removed from 
> final assignment or will lead to incorrect consume behavior, I have already 
> raise a pr, please take a look, thx
>  
> [https://github.com/apache/kafka/pull/13965]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15170.

Resolution: Fixed

> CooperativeStickyAssignor cannot adjust assignment correctly
> 
>
> Key: KAFKA-15170
> URL: https://issues.apache.org/jira/browse/KAFKA-15170
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: li xiangyuan
>Assignee: li xiangyuan
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment 
> when all consumers in group subscribe the same topic list, but it couldn't 
> add all partitions move owner to another consumer to 
> ``partitionsWithMultiplePreviousOwners``.
>  
> the reason is in function assignOwnedPartitions hasn't add partitions that 
> rack-mismatch with prev owner to allRevokedPartitions, then partition only in 
> this list would add to partitionsWithMultiplePreviousOwners.
>  
> In Cooperative Rebalance, partitions have changed owner must be removed from 
> final assignment or will lead to incorrect consume behavior, I have already 
> raise a pr, please take a look, thx
>  
> [https://github.com/apache/kafka/pull/13965]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-05-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-16277:
---
Fix Version/s: 3.7.1

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Assignee: Cameron Redpath
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two consumers, 6 partitions from each topic go to each consumer
>  * With three consumers, 4 partitions from each topic go to each consumer
>  * With four consumers, 3 partitions from each topic go to each consumer
>  
> Of note, we only saw this behaviour after migrating to the 
> `CooperativeStickyAssignor`. It was not an issue with the default partition 
> assignment strategy.
>  
> It is possible this may be intended behaviour. In which case, what is the 
> preferred workaround for our scenario? Our current workaround if we decide to 
> go ahead with the update to `CooperativeStickyAssignor` may be to limit our 
> consumers so they only subscribe to one topic, and have two consumer threads 
> per instance of the application.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16764:
--
Fix Version/s: 3.8.0

> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
> URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16767) KRaft should track HWM outside the log layer

2024-05-14 Thread Jira
José Armando García Sancio created KAFKA-16767:
--

 Summary: KRaft should track HWM outside the log layer
 Key: KAFKA-16767
 URL: https://issues.apache.org/jira/browse/KAFKA-16767
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Reporter: José Armando García Sancio


The current implementation of KRaft tracks the HWM using the log layer 
implementation. The log layer has an invariant where the HWM <= LEO. This mean 
that the log layer always sets the HWM to the minimum of HWM and LEO.

This has the side-effect of the local KRaft reporting a HWM that is much 
smaller than the leader's HWM when the replica start with an empty log. E.g. a 
new broker or the kafka-metadata-shell.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16637) AsyncKafkaConsumer removes offset fetch responses from cache too aggressively

2024-05-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846446#comment-17846446
 ] 

Lianet Magrans commented on KAFKA-16637:


Hey [~chickenchickenlove], sorry I had missed your last question. The new group 
rebalance protocol from KIP-848 is supported in KRaft mode only.

> AsyncKafkaConsumer removes offset fetch responses from cache too aggressively
> -
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: sanghyeok An
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png, 
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> *Broker logs* 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16766:
---
Description: 
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the 
same timeout message in the LegacyConsumer (defined 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
 but do not have a clear message in the Async, so we should fix them all 3.

With the fix, we should write tests for each func, like the ones defined for 
the Legacy Consumer 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
 Note that we would need different tests, added to AsyncKafkaConsumerTest, 
given that the async consumer issues a FindCoordinator request in this case, 
but the AsyncConsumer does, so it does not account for that when matching 
requests/responses in the current tests.

  was:
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

After the fix, we should be able to write a test like the 
[testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246]
 that exist for the legacy consumer. Note that we would need a different test 
given that the legacy consumer does not issue a FindCoordinator request in this 
case but the AsyncConsumer does, so the test would have to account for that 
when matching requests/responses.


> New consumer offsetsForTimes timeout exception does not have the proper 
> message
> ---
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException". 
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message specific to offsetsForTimes.
> Same situation exists for beginningOffsets and endOffsets. All 3 funcs show 
> the same timeout message in the LegacyConsumer (defined 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/co

[jira] [Created] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16766:
--

 Summary: New consumer offsetsForTimes timeout exception does not 
have the proper message
 Key: KAFKA-16766
 URL: https://issues.apache.org/jira/browse/KAFKA-16766
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
 Fix For: 3.8.0


If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

After the fix, we should be able to write a test like the 
[testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246]
 that exist for the legacy consumer. Note that we would need a different test 
given that the legacy consumer does not issue a FindCoordinator request in this 
case but the AsyncConsumer does, so the test would have to account for that 
when matching requests/responses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16765) NioEchoServer leaks accepted SocketChannel instances due to race condition

2024-05-14 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16765:
---

 Summary: NioEchoServer leaks accepted SocketChannel instances due 
to race condition
 Key: KAFKA-16765
 URL: https://issues.apache.org/jira/browse/KAFKA-16765
 Project: Kafka
  Issue Type: Bug
  Components: core, unit tests
Affects Versions: 3.8.0
Reporter: Greg Harris


The NioEchoServer has an AcceptorThread that calls accept() to open new 
SocketChannel instances and insert them into the `newChannels` List, and a main 
thread that drains the `newChannels` List and moves them to the 
`socketChannels` List.

During shutdown, the serverSocketChannel is closed, which causes both threads 
to exit their while loops. It is possible for the NioEchoServer main thread to 
sense the serverSocketChannel close and terminate before the Acceptor thread 
does, and for the Acceptor thread to put a SocketChannel in `newChannels` 
before terminating. This instance is never closed by either thread, because it 
is never moved to `socketChannels`.

A precise execution order that has this leak is:
1. NioEchoServer thread locks `newChannels`.
2. Acceptor thread accept() completes, and the SocketChannel is created
3. Acceptor thread blocks waiting for the `newChannels` lock
4. NioEchoServer thread releases the `newChannels` lock and does some processing
5. NioEchoServer#close() is called, which closes the serverSocketChannel
6. NioEchoServer thread checks serverSocketChannel.isOpen() and then terminates
7. Acceptor thread acquires the `newChannels` lock and adds the SocketChannel 
to `newChannels`.
8. Acceptor thread checks serverSocketChannel.isOpen() and then terminates.
9. NioEchoServer#close() stops blocking now that both other threads have 
terminated.

The end result is that the leaked socket is left open in the `newChannels` list 
at the end of close(), which is incorrect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16764:
---
Description: 
A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])

This is probably what makes that 
[testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
 fails for the new consumer. Once this bug is fixed, we should be able to 
enable that test for the new consumer.

  was:
A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])


> New consumer should throw InvalidTopicException on poll when invalid topic in 
> metadata
> --
>
> Key: KAFKA-16764
>     URL: https://issues.apache.org/jira/browse/KAFKA-16764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support
>
> A call to consumer.poll should throw InvalidTopicException if an invalid 
> topic is discovered in metadata. This can be easily reproduced by calling 
> subscribe("invalid topic") and then poll, for example.The new consumer does 
> not throw the expected InvalidTopicException like the LegacyKafkaConsumer 
> does. 
> The legacy consumer achieves this by checking for metadata exceptions on 
> every iteration of the ConsumerNetworkClient (see 
> [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])
> This is probably what makes that 
> [testSubscriptionOnInvalidTopic|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2956]
>  fails for the new consumer. Once this bug is fixed, we should be able to 
> enable that test for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata

2024-05-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16764:
--

 Summary: New consumer should throw InvalidTopicException on poll 
when invalid topic in metadata
 Key: KAFKA-16764
 URL: https://issues.apache.org/jira/browse/KAFKA-16764
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans


A call to consumer.poll should throw InvalidTopicException if an invalid topic 
is discovered in metadata. This can be easily reproduced by calling 
subscribe("invalid topic") and then poll, for example.The new consumer does not 
throw the expected InvalidTopicException like the LegacyKafkaConsumer does. 

The legacy consumer achieves this by checking for metadata exceptions on every 
iteration of the ConsumerNetworkClient (see 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >