Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
kamalcph commented on PR #15463: URL: https://github.com/apache/kafka/pull/15463#issuecomment-2027930309 > Instead of getting the number from yammer metrics, we can check the metric counter and see if it increased For my understanding, Could you please explain how this patch fixed the issue? The `safeYammerMetricValue` was also returning the `meter` count. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16447: Fix failed ReplicaManagerTest [kafka]
FrankYang0529 opened a new pull request, #15630: URL: https://github.com/apache/kafka/pull/15630 Ref: https://github.com/apache/kafka/pull/15373/files#r1544335647 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2027904152 > I think both are important. First, it's important to be able to derive the same thing consistently from the leader and the follower log. This affects things like the time indexing entries. It will be confusing if the leader adds an offset in the middle of a batch while the follower adds an offset at the end of the batch. Second, it's important to name things as accurately as possible. Otherwise, future developers could make inaccurate assumptions. you are right. I have reverted the impl and naming. Also, I add extra comments for the "spec" of offsetOfMaxTimestamp -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1545074098 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -104,6 +104,11 @@ public Row(String name, int partition, Long timestamp) { this.timestamp = timestamp; } +@Override +public String toString() { +return "Row[name:" + name + ",partition:" + partition + ",timestamp:" + timestamp; Review Comment: https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java#L93 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
chia7712 commented on PR #15572: URL: https://github.com/apache/kafka/pull/15572#issuecomment-2027901054 @nizhikov thanks for this migration!! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
chia7712 merged PR #15572: URL: https://github.com/apache/kafka/pull/15572 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13906) Invalid replica state transition
[ https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnny Hsu reassigned KAFKA-13906: -- Assignee: Johnny Hsu > Invalid replica state transition > > > Key: KAFKA-13906 > URL: https://issues.apache.org/jira/browse/KAFKA-13906 > Project: Kafka > Issue Type: Bug > Components: controller, core, replication >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, > 3.2.1 >Reporter: Igor Soarez >Assignee: Johnny Hsu >Priority: Major > Labels: BUG, controller, replication, reproducible-bug > > The controller runs into an IllegalStateException when reacting to changes in > broker membership status if there are topics that are pending deletion. > > How to reproduce: > # Setup cluster with 3 brokers > # Create a topic with a partition being led by each broker and produce some > data > # Kill one of the brokers that is not the controller, and keep that broker > down > # Delete the topic > # Restart the other broker that is not the controller > > Logs and stacktrace: > {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed > (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > OfflineReplica,ReplicaDeletionStarted states before moving to > ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful > state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{ at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}} > {{ at > scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}} > {{ at > kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}} > {{ at > kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}} > {{ at > kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}} > {{ at > kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}} > {{ at > kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}} > {{ at > kafka.controller.KafkaController.process(KafkaController.scala:2534)}} > {{ at > kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}} > {{ at > kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}} > {{--}} > {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states > before moving to OnlineReplica state. Instead it is in > ReplicaDeletionSuccessful state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{ at >
[jira] [Commented] (KAFKA-13906) Invalid replica state transition
[ https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832368#comment-17832368 ] Johnny Hsu commented on KAFKA-13906: hey [~soarez] [~showuon] I would like to work on this, let me assign this to myself, thanks for reporting this > Invalid replica state transition > > > Key: KAFKA-13906 > URL: https://issues.apache.org/jira/browse/KAFKA-13906 > Project: Kafka > Issue Type: Bug > Components: controller, core, replication >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, > 3.2.1 >Reporter: Igor Soarez >Priority: Major > Labels: BUG, controller, replication, reproducible-bug > > The controller runs into an IllegalStateException when reacting to changes in > broker membership status if there are topics that are pending deletion. > > How to reproduce: > # Setup cluster with 3 brokers > # Create a topic with a partition being led by each broker and produce some > data > # Kill one of the brokers that is not the controller, and keep that broker > down > # Delete the topic > # Restart the other broker that is not the controller > > Logs and stacktrace: > {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed > (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > OfflineReplica,ReplicaDeletionStarted states before moving to > ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful > state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{ at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}} > {{ at > scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}} > {{ at > kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}} > {{ at > kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}} > {{ at > kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}} > {{ at > kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}} > {{ at > kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}} > {{ at > kafka.controller.KafkaController.process(KafkaController.scala:2534)}} > {{ at > kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}} > {{ at > kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}} > {{--}} > {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states > before moving to OnlineReplica state. Instead it is in > ReplicaDeletionSuccessful state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1545058512 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -104,6 +104,11 @@ public Row(String name, int partition, Long timestamp) { this.timestamp = timestamp; } +@Override +public String toString() { +return "Row[name:" + name + ",partition:" + partition + ",timestamp:" + timestamp; Review Comment: I tried to do the refactoring to use `offset` rather than `timestamp`, but I'm a bit lost as in how I can get the offset number from the `GetOffsetShell`. I tried to read through the CI output but couldn't locate where the output is an offset rather than the timestamp. Could you point me to the outputs that you saw so that I can get more context of it ? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]
jolshan merged PR #15541: URL: https://github.com/apache/kafka/pull/15541 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker
[ https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832334#comment-17832334 ] Greg Harris commented on KAFKA-16344: - [~janardhanag] I don't understand what you mean. Can you be more specific about what you tried, and what you observed? > Internal topic mm2-offset-syncsinternal created with single > partition is putting more load on the broker > - > > Key: KAFKA-16344 > URL: https://issues.apache.org/jira/browse/KAFKA-16344 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.5.1 >Reporter: Janardhana Gopalachar >Priority: Major > > We are using Kafka 3.5.1 version, we see that the internal topic created by > mirrormaker > mm2-offset-syncsinternal is created with single partition due to > which the CPU load on the broker which will be leader for this partition is > increased compared to other brokers. Can multiple partitions be created for > the topic so that the CPU load would get distributed > > Topic: mm2-offset-syncscluster-ainternal TopicId: XRvTDbogT8ytNhqX2YTyrA > PartitionCount: 1ReplicationFactor: 3 Configs: > min.insync.replicas=2,cleanup.policy=compact,message.format.version=3.0-IV1 > Topic: mm2-offset-syncscluster-ainternal Partition: 0 Leader: 2 > Replicas: 2,1,0 Isr: 2,1,0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Image test improvements [kafka]
ahuang98 commented on code in PR #15373: URL: https://github.com/apache/kafka/pull/15373#discussion_r1544932474 ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -88,7 +101,7 @@ public class ClusterImageTest { setId(2). setEpoch(123). setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")). -setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))). +setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))). Review Comment: Responded on https://issues.apache.org/jira/browse/KAFKA-16447, someone is volunteering to fix, will check in again tomorrow! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16447) Fix failed ReplicaManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-16447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832332#comment-17832332 ] Alyssa Huang commented on KAFKA-16447: -- [~yangpoan] go for it, sorry for breaking the build. I'll submit a PR tomorrow if nothing is opened by then > Fix failed ReplicaManagerTest > - > > Key: KAFKA-16447 > URL: https://issues.apache.org/jira/browse/KAFKA-16447 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Nikolay Izhikov >Priority: Major > > see comment: https://github.com/apache/kafka/pull/15373/files#r1544335647 for > root cause -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Image test improvements [kafka]
ahuang98 commented on code in PR #15373: URL: https://github.com/apache/kafka/pull/15373#discussion_r1544930494 ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -88,7 +101,7 @@ public class ClusterImageTest { setId(2). setEpoch(123). setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")). -setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))). +setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))). Review Comment: Thanks folks, will open up a fix soon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]
jolshan commented on PR #15541: URL: https://github.com/apache/kafka/pull/15541#issuecomment-2027801197 Some failing tests are due to https://issues.apache.org/jira/browse/KAFKA-16447 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16451) testDeltaFollower tests failing in ReplicaManager
[ https://issues.apache.org/jira/browse/KAFKA-16451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-16451. Resolution: Duplicate > testDeltaFollower tests failing in ReplicaManager > - > > Key: KAFKA-16451 > URL: https://issues.apache.org/jira/browse/KAFKA-16451 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Priority: Major > > many ReplicaManagerTests with the prefix testDeltaFollower seem to be > failing. A few other ReplicaManager tests as well. See existing failures in > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2765/tests] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16451) testDeltaFollower tests failing in ReplicaManager
[ https://issues.apache.org/jira/browse/KAFKA-16451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832330#comment-17832330 ] Justine Olshan commented on KAFKA-16451: closing as it is a duplicate of https://issues.apache.org/jira/browse/KAFKA-16447 > testDeltaFollower tests failing in ReplicaManager > - > > Key: KAFKA-16451 > URL: https://issues.apache.org/jira/browse/KAFKA-16451 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Priority: Major > > many ReplicaManagerTests with the prefix testDeltaFollower seem to be > failing. A few other ReplicaManager tests as well. See existing failures in > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2765/tests] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Image test improvements [kafka]
jolshan commented on code in PR #15373: URL: https://github.com/apache/kafka/pull/15373#discussion_r1544929534 ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -88,7 +101,7 @@ public class ClusterImageTest { setId(2). setEpoch(123). setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")). -setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))). +setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))). Review Comment: I just found this as well. I opened https://issues.apache.org/jira/browse/KAFKA-16451 but will close as a duplicate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832325#comment-17832325 ] Greg Harris commented on KAFKA-16223: - PR 1/3 merged, leaving this open. > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16451) testDeltaFollower tests failing in ReplicaManager
Justine Olshan created KAFKA-16451: -- Summary: testDeltaFollower tests failing in ReplicaManager Key: KAFKA-16451 URL: https://issues.apache.org/jira/browse/KAFKA-16451 Project: Kafka Issue Type: Bug Reporter: Justine Olshan many ReplicaManagerTests with the prefix testDeltaFollower seem to be failing. A few other ReplicaManager tests as well. See existing failures in [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2765/tests] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (1/3) [kafka]
gharris1727 merged PR #15520: URL: https://github.com/apache/kafka/pull/15520 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (1/3) [kafka]
gharris1727 commented on PR #15520: URL: https://github.com/apache/kafka/pull/15520#issuecomment-2027797163 Test failures appear unrelated, and the connect runtime tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16439: Update replication_replica_failure_test.py to support KIP-848’s group protocol config [kafka]
kirktrue commented on PR #15629: URL: https://github.com/apache/kafka/pull/15629#issuecomment-2027785610 @lucasbru—and another system test migration to review. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16439: Update replication_replica_failure_test.py to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15629: URL: https://github.com/apache/kafka/pull/15629 Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16440: Update security_test.py to support KIP-848’s group protocol config [kafka]
kirktrue commented on PR #15628: URL: https://github.com/apache/kafka/pull/15628#issuecomment-2027781431 @lucasbru—here's another system test migration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (1/3) [kafka]
gharris1727 commented on code in PR #15520: URL: https://github.com/apache/kafka/pull/15520#discussion_r1544876870 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -329,6 +329,11 @@ public static String LOGGER_CLUSTER_KEY(String namespace) { private final Map fencableProducerProps; private final Time time; +//VisibleForTesting +void setConfigLog(KafkaBasedLog configLog) { +this.configLog = configLog; Review Comment: I don't like this, but I think fixing it is going to also require solving the this-escape in the constructor, which looks nasty. Good enough for now, and we can address it later after EasyMock is gone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16440: Update security_test.py to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15628: URL: https://github.com/apache/kafka/pull/15628 Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16438: Update consumer_test.py’s static tests to support KIP-848’s group protocol config [kafka]
kirktrue commented on PR #15627: URL: https://github.com/apache/kafka/pull/15627#issuecomment-2027764974 @lucasbru—can you take a look at this system test? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16438: Update consumer_test.py’s static tests to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15627: URL: https://github.com/apache/kafka/pull/15627 Migrated the following tests for the new consumer: - `test_fencing_static_consumer` - `test_static_consumer_bounce` - `test_static_consumer_persisted_after_rejoin` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
gharris1727 commented on PR #15601: URL: https://github.com/apache/kafka/pull/15601#issuecomment-2027729205 This PR looks like it'll have merge conflicts with #15510. Since that is fixing a bug it should probably have higher priority than this refactor, can you rebase on top of their changes? Alternatively we can wait for that to land first before returning to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16450) Add thread-safe alternative to Exit
Greg Harris created KAFKA-16450: --- Summary: Add thread-safe alternative to Exit Key: KAFKA-16450 URL: https://issues.apache.org/jira/browse/KAFKA-16450 Project: Kafka Issue Type: Task Reporter: Greg Harris Assignee: Greg Harris -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16420) Refactor utils.Exit call-sites to use thread-safe alternative
[ https://issues.apache.org/jira/browse/KAFKA-16420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16420: Summary: Refactor utils.Exit call-sites to use thread-safe alternative (was: Add thread-safe alternative to utils.Exit) > Refactor utils.Exit call-sites to use thread-safe alternative > - > > Key: KAFKA-16420 > URL: https://issues.apache.org/jira/browse/KAFKA-16420 > Project: Kafka > Issue Type: Wish > Components: connect, core, tools >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > The Exit class is not thread-safe, and exposes our tests to race conditions > and inconsistent execution. It is not possible to make it thread-safe due to > the static design of the API. > We should add an alternative to the Exit class, and migrate the existing > usages to the replacement, before finally removing the legacy Exit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16420) Refactor utils.Exit call-sites to use thread-safe alternative
[ https://issues.apache.org/jira/browse/KAFKA-16420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16420: Description: The Exit class is not thread-safe, and exposes our tests to race conditions and inconsistent execution. It is not possible to make it thread-safe due to the static design of the API. We should migrate the existing usages to the replacement, before finally removing the legacy Exit. was: The Exit class is not thread-safe, and exposes our tests to race conditions and inconsistent execution. It is not possible to make it thread-safe due to the static design of the API. We should add an alternative to the Exit class, and migrate the existing usages to the replacement, before finally removing the legacy Exit. > Refactor utils.Exit call-sites to use thread-safe alternative > - > > Key: KAFKA-16420 > URL: https://issues.apache.org/jira/browse/KAFKA-16420 > Project: Kafka > Issue Type: Wish > Components: connect, core, tools >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > The Exit class is not thread-safe, and exposes our tests to race conditions > and inconsistent execution. It is not possible to make it thread-safe due to > the static design of the API. > We should migrate the existing usages to the replacement, before finally > removing the legacy Exit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config [kafka]
kirktrue commented on PR #15626: URL: https://github.com/apache/kafka/pull/15626#issuecomment-2027703629 @lucasbru—here's another system test to review. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15626: URL: https://github.com/apache/kafka/pull/15626 Added a new optional `group_protocol` parameter to the test methods, then passed that down to the methods involved. New new consumer only works with 3.7+, so I added a new `@matrix` block with those parameters. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
akatona84 commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1544810115 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,32 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + class ErrorQueue(initialErrorCounts: Errors*) { +private val queue: mutable.Queue[Errors] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + if (queue.isEmpty) { +return Errors.NONE + } + queue.dequeue() +} Review Comment: thx, true! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null
[ https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ayoub Omari updated KAFKA-16434: Description: We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple *INNER* foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. (same topology example as in KAFKA-16407) *Scenario: Unset foreign key of a primary key* {code:scala} rightTopic.pipeInput("fk1", "1") leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) leftTopic.pipeInput("pk1", ProductValue(null, "pk1")) {code} *+Actual result+* {code:java} KeyValue(pk1, 3) {code} *+Expected result+* {code:java} KeyValue(pk1, 3) KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code} However, in {+}other cases{+}, where the join result should be unset (e.g. the primary key is deleted, or the foreign key changes to a non existing FK), that record is {+}correctly emitted{+}. Also, the importance of unsetting the join result is mentioned in the code: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36] {code:java} //[...] Additionally, propagate null if no FK is found there, // since we must "unset" any output set by the previous FK-join. This is true for both INNER and LEFT join. {code} was: We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple *INNER* foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. (same topology example as in KAFKA-16407) *Scenario: Unset foreign key of a primary key* {code:scala} rightTopic.pipeInput("fk1", "1") leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) leftTopic.pipeInput("pk1", ProductValue(null, "pk1")) {code} *+Actual result+* {code:java} KeyValue(pk1, 3) {code} *+Expected result+* {code:java} KeyValue(pk1, 3) KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code} However, in {+}other cases{+}, where the join result should be unset (e.g. the primary key is deleted, or the foreign key changes to a non existing FK), that record is {+}correctly emitted{+}. Also, the importance of unsetting the join result is mentioned in the code: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36] {code:java} //[...] Additionally, propagate null if no FK is found there, // since we must "unset" any output set by the previous FK-join. This is true for both INNER and LEFT join. {code} > ForeignKey INNER join does not unset join result when FK becomes null > - > > Key: KAFKA-16434 > URL: https://issues.apache.org/jira/browse/KAFKA-16434 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.2, 3.7.0 >Reporter: Ayoub Omari >Assignee: Ayoub Omari >Priority: Major > > We have two topics : _left-topic[String, LeftRecord]_ and > _right-topic[String, String]_ > where _LeftRecord_ : > {code:scala} > case class LeftRecord(foreignKey: String, name: String){code} > we do a simple *INNER* foreign key join on left-topic's foreignKey field. The > resulting join value is the value in right-topic. (same topology example as > in KAFKA-16407) > > *Scenario: Unset foreign key of a primary key* > {code:scala} > rightTopic.pipeInput("fk1", "1") > leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) > leftTopic.pipeInput("pk1", ProductValue(null, "pk1")) > {code} > > *+Actual result+* > {code:java} > KeyValue(pk1, 3) {code} > > *+Expected result+* > {code:java} > KeyValue(pk1, 3) > KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code} > > However, in {+}other cases{+}, where the join result should be unset (e.g. > the primary key is deleted, or the foreign key changes to a non existing FK), > that record is {+}correctly emitted{+}. > > Also, the importance of unsetting the join result is mentioned in the code: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36] > {code:java} > //[...] Additionally, propagate null if no FK is found there, > // since we must "unset" any output set by the previous FK-join. This is true > for both INNER and LEFT join. {code} > > -- This
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1544779363 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java: ## @@ -41,6 +41,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.kafka.streams.state.internals.JoinSide; Review Comment: I just noticed that the JoinSide package is `state`, while this class is `kstream`. Since this enum is so tightly bound to this class, it should probably be in the same package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1544787082 ## streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; + +/** + * An enum representing the side of a join operation. + * It provides methods to create instances of {@link LeftOrRightValue} based on the side specified. + */ +@SuppressWarnings("unchecked") Review Comment: It looks like this is just moving the unchecked warning around, rather than fixing it. ` LeftOrRightValue make(final V leftValue);` is impossible to type in a similar way to ` LeftOrRightValue make(final boolean isLeftSide, final V value)`, because it will always require casting V to V1 or V2. It might not be possible to do this with Enum, as it can't take type arguments. ## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java: ## @@ -89,8 +74,8 @@ public V2 getRightValue() { @Override public String toString() { return "<" -+ ((leftValue != null) ? "left," + leftValue : "right," + rightValue) -+ ">"; ++ ((leftValue != null) ? JoinSide.LEFT + "," + leftValue : JoinSide.RIGHT + "," + rightValue) Review Comment: This doesn't seem necessary. LeftOrRightValue can be completely unaware of the JoinSide enum. ## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java: ## @@ -62,13 +62,13 @@ public LeftOrRightValue deserialize(final String topic, final byte[] dat } return (data[0] == 1) -? LeftOrRightValue.makeLeftValue(leftDeserializer.deserialize(topic, rawValue(data))) Review Comment: Same comment as in LeftOrRightValue, I don't think this is necessary. ## streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; + +/** + * An enum representing the side of a join operation. + * It provides methods to create instances of {@link LeftOrRightValue} based on the side specified. + */ +@SuppressWarnings("unchecked") +public enum JoinSide { +LEFT("left") { +/** + * Create a new {@link LeftOrRightValue} instance with the V1 value as {@code leftValue} and V2 value as null. + * + * @param leftValue the left V1 value + * @param the type of the value + * @return a new {@link LeftOrRightValue} instance + */ +@Override +public LeftOrRightValue make(final V leftValue) { +Objects.requireNonNull(leftValue, "The left join value is null"); +return (LeftOrRightValue) new LeftOrRightValue<>(leftValue, null); +} +}, + +RIGHT("right") { +/** + * Create a new {@link LeftOrRightValue} instance with the V2 value as {@code rightValue} and V1 value as null. + * + * @param rightValue the right V2 value + * @param the type of the value + * @return a new {@link LeftOrRightValue} instance + */ +@Override +public LeftOrRightValue make(final V rightValue) { +
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
wernerdv closed pull request #15101: KAFKA-16072: JUnit 5 extension to detect thread leak URL: https://github.com/apache/kafka/pull/15101 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16449) Kafka Docker run foce formatting with Zookeeper connect setup and block kafka running
Yusu Gao created KAFKA-16449: Summary: Kafka Docker run foce formatting with Zookeeper connect setup and block kafka running Key: KAFKA-16449 URL: https://issues.apache.org/jira/browse/KAFKA-16449 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Yusu Gao I am adopting the official kafka image for testing purpose with zookeeper enabled, however when trying to launch kafka, I get blocked with below message mentioning about format issue: {code:java} b4870f897e2a:/opt/kafka$ /etc/kafka/docker/run ===> User uid=1000(appuser) gid=1000(appuser) groups=1000(appuser) ===> Setting default values of environment variables if not already set. ===> Configuring ... ===> Launching ... ===> Using provided cluster id ... The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode. {code} Which leads to: [https://github.com/apache/kafka/blob/3.7.0/docker/jvm/launch#L57-L58], which leads to [https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala#L46] and then leads to [https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/tools/StorageTool.scala#L90] If my understanding is correct, StorageTools#format is only supposed to run for KRaft mode, yet current docker image script is enforcing running this command even for Zookeeper mode. Please advise if this is actually by design or this should be fixed as a bug. Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1544749545 ## streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; + +/** + * An enum representing the side of a join operation. + * It provides methods to create instances of {@link LeftOrRightValue} based on the side specified. + */ +@SuppressWarnings("unchecked") +public enum JoinSide { +LEFT("left") { +/** + * Create a new {@link LeftOrRightValue} instance with the V1 value as {@code leftValue} and V2 value as null. + * + * @param leftValue the left V1 value + * @param the type of the value + * @return a new {@link LeftOrRightValue} instance + */ +@Override +public LeftOrRightValue make(final V leftValue) { +Objects.requireNonNull(leftValue, "The left join value is null"); +return (LeftOrRightValue) new LeftOrRightValue<>(leftValue, null); +} +}, + +RIGHT("right") { +/** + * Create a new {@link LeftOrRightValue} instance with the V2 value as {@code rightValue} and V1 value as null. + * + * @param rightValue the right V2 value + * @param the type of the value + * @return a new {@link LeftOrRightValue} instance + */ +@Override +public LeftOrRightValue make(final V rightValue) { +Objects.requireNonNull(rightValue, "The left join value is null"); +return (LeftOrRightValue) new LeftOrRightValue<>(null, rightValue); +} + +}; + +private final String joinSideName; + +JoinSide(final String joinSideName) { +this.joinSideName = joinSideName; +} + +public abstract LeftOrRightValue make(final V value); + +/** + * Returns true if this JoinSide represents the left side. + * + * @return true if this JoinSide represents the left side, otherwise false + */ +public boolean isLeftSide() { Review Comment: Closed the "hatch" forever! I had it removed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1544749545 ## streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Objects; + +/** + * An enum representing the side of a join operation. + * It provides methods to create instances of {@link LeftOrRightValue} based on the side specified. + */ +@SuppressWarnings("unchecked") +public enum JoinSide { +LEFT("left") { +/** + * Create a new {@link LeftOrRightValue} instance with the V1 value as {@code leftValue} and V2 value as null. + * + * @param leftValue the left V1 value + * @param the type of the value + * @return a new {@link LeftOrRightValue} instance + */ +@Override +public LeftOrRightValue make(final V leftValue) { +Objects.requireNonNull(leftValue, "The left join value is null"); +return (LeftOrRightValue) new LeftOrRightValue<>(leftValue, null); +} +}, + +RIGHT("right") { +/** + * Create a new {@link LeftOrRightValue} instance with the V2 value as {@code rightValue} and V1 value as null. + * + * @param rightValue the right V2 value + * @param the type of the value + * @return a new {@link LeftOrRightValue} instance + */ +@Override +public LeftOrRightValue make(final V rightValue) { +Objects.requireNonNull(rightValue, "The left join value is null"); +return (LeftOrRightValue) new LeftOrRightValue<>(null, rightValue); +} + +}; + +private final String joinSideName; + +JoinSide(final String joinSideName) { +this.joinSideName = joinSideName; +} + +public abstract LeftOrRightValue make(final V value); + +/** + * Returns true if this JoinSide represents the left side. + * + * @return true if this JoinSide represents the left side, otherwise false + */ +public boolean isLeftSide() { Review Comment: I had it removed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1544749533 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2184,155 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +} + +Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi( +List topicNamesList, +Map> topicFutures, +Map nodes, +DescribeTopicsOptions options, +long now +) { +return new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); Review Comment: Thanks for the advice. Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362 Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1544749273 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java: ## Review Comment: @gharris1727 Thanks for the fast review! I have addressed your comments. In my opinion, the bigger refactoring should be done in a follow-up ticket/issue, and this one should specifically solve the casting issue. This is also my first contribution here, so I don't want to move a lot of parts and have a forever open PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2027526301 > Do we need to revert all of them? the paths we had fixed works well now. > > 1. It seems to me adding comments for both "recover" and "follower" cases can remind readers that this offsetOfMaxTimestampMs is shallow. > 2. or we can only rename offsetForMaxTimestamp back to shallowOffsetMaxTimestamp but we keep the implementation. > @chia7712 : I think both are important. First, it's important to be able to derive the same thing consistently from the leader and the follower log. This affects things like the time indexing entries. It will be confusing if the leader adds an offset in the middle of a batch while the follower adds an offset at the end of the batch. Second, it's our responsibility name things as accurately as possible. Otherwise, future developers could make inaccurate assumptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]
jolshan commented on PR #15541: URL: https://github.com/apache/kafka/pull/15541#issuecomment-2027424479 I don't have further comments. I will restart the build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
nizhikov commented on PR #15572: URL: https://github.com/apache/kafka/pull/15572#issuecomment-2027422531 @ijuma Please, ignore my last question, I misread your statement :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16303: Add upgrade notes to 3.5.0, 3.5.2, and 3.7.0 about MM2 offset translation [kafka]
gharris1727 merged PR #15423: URL: https://github.com/apache/kafka/pull/15423 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
johnnychhsu commented on code in PR #15463: URL: https://github.com/apache/kafka/pull/15463#discussion_r1544639805 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4164,16 +4164,13 @@ class ReplicaManagerTest { mock(classOf[FetchDataInfo]) }).when(spyRLM).read(any()) - // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec metric value before fetching - val curExpiresPerSec = safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long] + val curExpiresPerSec = DelayedRemoteFetchMetrics.expiredRequestMeter.count() replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 10, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, fetchCallback) // advancing the clock to expire the delayed remote fetch timer.advanceClock(2000L) - // verify the metric value is incremented since the delayed remote fetch is expired - TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long], -"The ExpiresPerSec value is not incremented. Current value is: " + - safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]) + // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is called since the delayed remote fetch is expired + assertEquals(curExpiresPerSec + 1, DelayedRemoteFetchMetrics.expiredRequestMeter.count()) Review Comment: @showuon just updated, thanks for the comment! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
nizhikov commented on PR #15572: URL: https://github.com/apache/kafka/pull/15572#issuecomment-2027411004 @ijuma Are you suggesting to move `AclAuthorizer` in this PR? Or it could be postponed until all dependencies are ready? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
johnnychhsu commented on PR #15463: URL: https://github.com/apache/kafka/pull/15463#issuecomment-2027410113 @kamalcph just updated, thanks for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
johnnychhsu commented on code in PR #15463: URL: https://github.com/apache/kafka/pull/15463#discussion_r154463 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1460,10 +1460,8 @@ class ReplicaManager(val config: KafkaConfig, warn("Unable to fetch data from remote storage", e) return Some(createLogReadResult(e)) } - Review Comment: sure, just removed it, thanks for the review @chia7712 ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1544639778 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -104,6 +104,11 @@ public Row(String name, int partition, Long timestamp) { this.timestamp = timestamp; } +@Override +public String toString() { +return "Row[name:" + name + ",partition:" + partition + ",timestamp:" + timestamp; Review Comment: yep, the output from shell is offset rather than timestamp -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
ijuma commented on PR #15572: URL: https://github.com/apache/kafka/pull/15572#issuecomment-2027407574 > AclAuthorizer We should not move this since it will be deleted as part of ZK removal. I would focus on moving classes that we don't intend to delete soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1544636515 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -104,6 +104,11 @@ public Row(String name, int partition, Long timestamp) { this.timestamp = timestamp; } +@Override +public String toString() { +return "Row[name:" + name + ",partition:" + partition + ",timestamp:" + timestamp; Review Comment: you mean like `return "Row[name:" + name + ",partition:" + partition + ",offset:" + offset;` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Testing the terraform file [kafka]
gharris1727 commented on PR #15623: URL: https://github.com/apache/kafka/pull/15623#issuecomment-2027405515 @Rashi-Confluent This is probably the wrong upstream -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Testing the terraform file [kafka]
gharris1727 closed pull request #15623: Testing the terraform file URL: https://github.com/apache/kafka/pull/15623 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832228#comment-17832228 ] Chia-Ping Tsai commented on KAFKA-16310: {quote} I got more context and also feel that we can transfer the workload to listMaxTimestamp when clients fetch this since it's rare. What do you think? {quote} yep, we are on the same page. I have filed a PR according to the solution. Please take a look at https://github.com/apache/kafka/pull/15621 > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.8.0 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832226#comment-17832226 ] Johnny Hsu commented on KAFKA-16310: thanks [~junrao] for pointing this out and sharing the solution, and thanks [~chia7712] [~showuon] for those discussions and revert it for 3.6. {quote}Since this is a rare operation, paying the decompression overhead is fine. Adding a new field in the batch requires record format change, which is a much bigger effort. For now, the easiest thing is to add a method in Batch to find out offsetOfMaxTimestanp by iterating all records. Regarding the optimization on the leader side by caching offsetOfMaxTimestanp, we could do it. However, my understanding is that listMaxTimestamp is rare and I am not sure if it's worth the additional complexity. {quote} have go through the comments and had a offline discussion with Chia-Ping, I got more context and also feel that we can transfer the workload to listMaxTimestamp when clients fetch this since it's rare. What do you think? > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.8.0 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16447) Fix failed ReplicaManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-16447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832210#comment-17832210 ] Chia-Ping Tsai commented on KAFKA-16447: [~nizhikov] thanks for taking over this jira. However, it would be great that you can send a request before assigning this jira to yourself > Fix failed ReplicaManagerTest > - > > Key: KAFKA-16447 > URL: https://issues.apache.org/jira/browse/KAFKA-16447 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Nikolay Izhikov >Priority: Major > > see comment: https://github.com/apache/kafka/pull/15373/files#r1544335647 for > root cause -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
mumrah commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1544478846 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2185,9 +2184,155 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; -if (!topicNamesList.isEmpty()) { -runnable.call(call, now); +} + +Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi( +List topicNamesList, +Map> topicFutures, +Map nodes, +DescribeTopicsOptions options, +long now +) { +return new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { +Map pendingTopics = +topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); Review Comment: I think this would be more readable if we just create a TreeMap and populate it in a for loop. Since we are using this map like a queue, it might be better to use LinkedHashMap (after sorting the topics). Really, we don't even need a map since we can build the TopicRequest-s on demand in createRequest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)
[ https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damien Gasparina updated KAFKA-16448: - Description: Jira to follow work on KIP: [KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] (was: Jira to follow work on KIP: h1. [KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]) > Add Kafka Streams exception handler for exceptions occuring during processing > (KIP-1033) > > > Key: KAFKA-16448 > URL: https://issues.apache.org/jira/browse/KAFKA-16448 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina >Priority: Minor > > Jira to follow work on KIP: [KIP-1033: Add Kafka Streams exception handler > for exceptions occuring during > processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)
Damien Gasparina created KAFKA-16448: Summary: Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033) Key: KAFKA-16448 URL: https://issues.apache.org/jira/browse/KAFKA-16448 Project: Kafka Issue Type: Improvement Components: streams Reporter: Damien Gasparina Jira to follow work on KIP: h1. [KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1544483152 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -104,6 +104,11 @@ public Row(String name, int partition, Long timestamp) { this.timestamp = timestamp; } +@Override +public String toString() { +return "Row[name:" + name + ",partition:" + partition + ",timestamp:" + timestamp; Review Comment: Could you rename the `timestamp` to `offset`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16447) Fix failed ReplicaManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-16447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov reassigned KAFKA-16447: --- Assignee: Nikolay Izhikov > Fix failed ReplicaManagerTest > - > > Key: KAFKA-16447 > URL: https://issues.apache.org/jira/browse/KAFKA-16447 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Nikolay Izhikov >Priority: Major > > see comment: https://github.com/apache/kafka/pull/15373/files#r1544335647 for > root cause -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]
chia7712 merged PR #15589: URL: https://github.com/apache/kafka/pull/15589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16397) Use ByteBufferOutputStream to avoid array copy
[ https://issues.apache.org/jira/browse/KAFKA-16397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16397. Fix Version/s: 3.8.0 Resolution: Fixed > Use ByteBufferOutputStream to avoid array copy > -- > > Key: KAFKA-16397 > URL: https://issues.apache.org/jira/browse/KAFKA-16397 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia Chuan Yu >Priority: Minor > Fix For: 3.8.0 > > > from https://github.com/apache/kafka/pull/15148#discussion_r1531889679 > source code: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java#L216 > we can use ByteBufferOutputStream to collect the uncompressed data, and then > return the inner buffer directly instead of copying full array. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [WIP] Add Remote Log Manager quota manager [kafka]
abhijeetk88 opened a new pull request, #15625: URL: https://github.com/apache/kafka/pull/15625 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]
wernerdv commented on code in PR #15558: URL: https://github.com/apache/kafka/pull/15558#discussion_r1544403086 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -216,7 +217,10 @@ Set findConsumerGroups() Collection listConsumerGroups() throws InterruptedException, ExecutionException { -return sourceAdminClient.listConsumerGroups().valid().get(); +return adminCall( +() -> sourceAdminClient.listConsumerGroups().valid().get(), +() -> "list consumer groups on cluster " + config.sourceClusterAlias() Review Comment: Yes, it's ready for review now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]
mimaison commented on code in PR #15558: URL: https://github.com/apache/kafka/pull/15558#discussion_r1544383751 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -216,7 +217,10 @@ Set findConsumerGroups() Collection listConsumerGroups() throws InterruptedException, ExecutionException { -return sourceAdminClient.listConsumerGroups().valid().get(); +return adminCall( +() -> sourceAdminClient.listConsumerGroups().valid().get(), +() -> "list consumer groups on cluster " + config.sourceClusterAlias() Review Comment: I don't see the change, did you forget to push? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]
wernerdv commented on code in PR #15558: URL: https://github.com/apache/kafka/pull/15558#discussion_r1544364748 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -320,4 +322,19 @@ static void createCompactedTopic(String topicName, short partitions, short repli static void createSinglePartitionCompactedTopic(String topicName, short replicationFactor, Admin admin) { createCompactedTopic(topicName, (short) 1, replicationFactor, admin); } + +static T adminCall(Callable callable, Supplier errMsg) +throws ExecutionException, InterruptedException { +try { +return callable.call(); +} catch (ExecutionException | InterruptedException e) { +if (e.getCause() instanceof TopicAuthorizationException || Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]
wernerdv commented on code in PR #15558: URL: https://github.com/apache/kafka/pull/15558#discussion_r1544364893 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -216,7 +217,10 @@ Set findConsumerGroups() Collection listConsumerGroups() throws InterruptedException, ExecutionException { -return sourceAdminClient.listConsumerGroups().valid().get(); +return adminCall( +() -> sourceAdminClient.listConsumerGroups().valid().get(), +() -> "list consumer groups on cluster " + config.sourceClusterAlias() Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16447) Fix failed ReplicaManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-16447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832167#comment-17832167 ] Chia-Ping Tsai commented on KAFKA-16447: {quote} I would like to take the issue. May I assign to myself? Thank you. {quote} You can take over it if [~alyssahuang] has no free queue :) > Fix failed ReplicaManagerTest > - > > Key: KAFKA-16447 > URL: https://issues.apache.org/jira/browse/KAFKA-16447 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > > see comment: https://github.com/apache/kafka/pull/15373/files#r1544335647 for > root cause -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16447) Fix failed ReplicaManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-16447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832165#comment-17832165 ] PoAn Yang commented on KAFKA-16447: --- Hi [~chia7712], I would like to take the issue. May I assign to myself? Thank you. > Fix failed ReplicaManagerTest > - > > Key: KAFKA-16447 > URL: https://issues.apache.org/jira/browse/KAFKA-16447 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > > see comment: https://github.com/apache/kafka/pull/15373/files#r1544335647 for > root cause -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Image test improvements [kafka]
chia7712 commented on code in PR #15373: URL: https://github.com/apache/kafka/pull/15373#discussion_r1544339792 ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -88,7 +101,7 @@ public class ClusterImageTest { setId(2). setEpoch(123). setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")). -setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))). +setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))). Review Comment: I have filed a ticket https://issues.apache.org/jira/browse/KAFKA-16447 please feel free to take over it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16447) Fix flaky ReplicaManagerTest
Chia-Ping Tsai created KAFKA-16447: -- Summary: Fix flaky ReplicaManagerTest Key: KAFKA-16447 URL: https://issues.apache.org/jira/browse/KAFKA-16447 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai see comment: https://github.com/apache/kafka/pull/15373/files#r1544335647 for root cause -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16447) Fix failed ReplicaManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-16447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16447: --- Summary: Fix failed ReplicaManagerTest (was: Fix flaky ReplicaManagerTest) > Fix failed ReplicaManagerTest > - > > Key: KAFKA-16447 > URL: https://issues.apache.org/jira/browse/KAFKA-16447 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Priority: Major > > see comment: https://github.com/apache/kafka/pull/15373/files#r1544335647 for > root cause -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Image test improvements [kafka]
chia7712 commented on code in PR #15373: URL: https://github.com/apache/kafka/pull/15373#discussion_r1544335647 ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -88,7 +101,7 @@ public class ClusterImageTest { setId(2). setEpoch(123). setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")). -setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))). +setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))). Review Comment: `ReplicaManagerTest` reuse the `IMAGE1` to test the listeners, hence this change break some test cases. Do you have free cycle to fix it? If not, I can file PR to fix it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
chia7712 commented on PR #15572: URL: https://github.com/apache/kafka/pull/15572#issuecomment-2027038241 the flaky of `ReplicaManagerTest` is related to #15373 see https://github.com/apache/kafka/pull/15373/files#r1544335647 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]
showuon commented on code in PR #15521: URL: https://github.com/apache/kafka/pull/15521#discussion_r1544325278 ## server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java: ## @@ -310,6 +301,62 @@ void testOnCompletion() throws Exception { } } +private static ClientResponse buildSuccessfulResponse(AssignReplicasToDirsRequestData request) { +Map> errors = new HashMap<>(); +for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) { +for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) { +for (AssignReplicasToDirsRequestData.PartitionData partition : topic.partitions()) { +TopicIdPartition topicIdPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex()); +errors.computeIfAbsent(directory.id(), d -> new HashMap<>()).put(topicIdPartition, Errors.NONE); +} +} +} +AssignReplicasToDirsResponseData responseData = AssignmentsHelper.buildResponseData(Errors.NONE.code(), 0, errors); +ClientResponse response = new ClientResponse(null, null, null, +0L, 0L, false, false, null, null, +new AssignReplicasToDirsResponse(responseData)); +return response; +} + +@Test +public void testAssignmentCompaction() throws Exception { +// Delay the first controller response to force assignment compaction logic +CompletableFuture completionFuture = new CompletableFuture<>(); +doAnswer(invocation -> { +AssignReplicasToDirsRequestData request = invocation.getArgument(0, AssignReplicasToDirsRequest.Builder.class).build().data(); +ControllerRequestCompletionHandler completionHandler = invocation.getArgument(1, ControllerRequestCompletionHandler.class); +ClientResponse response = buildSuccessfulResponse(request); +Runnable completion = () -> completionHandler.onComplete(response); +if (completionFuture.isDone()) completion.run(); +else completionFuture.complete(completion); +return null; + }).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class), +any(ControllerRequestCompletionHandler.class)); + +CountDownLatch remainingInvocations = new CountDownLatch(20); +Runnable onComplete = () -> { +assertTrue(completionFuture.isDone(), "Premature invocation"); +assertTrue(remainingInvocations.getCount() > 0, "Extra invocation"); +remainingInvocations.countDown(); +}; +Uuid[] dirs = {DIR_1, DIR_2, DIR_3}; +for (int i = 0; i < remainingInvocations.getCount(); i++) { +time.sleep(100); +manager.onAssignment(new TopicIdPartition(TOPIC_1, 0), dirs[i % 3], onComplete); +} +activeWait(completionFuture::isDone); +completionFuture.get().run(); +activeWait(() -> remainingInvocations.getCount() == 0); +} + +void activeWait(Supplier predicate) { +while (!predicate.get()) { +time.sleep(100); +manager.wakeup(); +Thread.yield(); +} +} Review Comment: This infinite loop will cause test hanging forever if there's some bugs. Could we improve it? Same as L298. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
showuon commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1544295936 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: For (1), I think we'll catch up until the high watermark before publish the image. So it should be safe. For (2), good suggestion. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2745,10 +2745,10 @@ class ReplicaManager(val config: KafkaConfig, "local leaders.") replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet) localLeaders.forKeyValue { (tp, info) => + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) Review Comment: Same here, unnecessary change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16413: add FileLockTest [kafka]
FrankYang0529 opened a new pull request, #15624: URL: https://github.com/apache/kafka/pull/15624 Add unit tests for `FileLock`. Ref: https://github.com/apache/kafka/pull/15568#pullrequestreview-1950676267 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
chia7712 commented on PR #15572: URL: https://github.com/apache/kafka/pull/15572#issuecomment-2026944004 ``` ./gradlew cleanTest :metadata:test --tests QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics --tests QuorumControllerTest.testFenceMultipleBrokers :connect:mirror:test --tests MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault :core:test --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeWithPrefixedAcls --tests ReplicaManagerTest.testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithKRaftPath --tests ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric --tests ReplicaManagerTest.testDeltaFollowerToNotReplica --tests ReplicaManagerTest.testDeltaFollowerWithNoChange --tests ReplicaManagerTest.testDeltaFollowerRemovedTopic --tests ReplicaManagerTest.testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset --tests ReplicaManagerTest.testDeltaFromLeaderToFollower --tests ReplicaManagerTest.testDeltaFromFollowerToLeader --tests PlaintextConsumerTest.testPartitionPauseAndResume --tests LogDirFai lureTest.testIOExceptionDuringLogRoll --tests LogDirFailureTest.testIOExceptionDuringCheckpoint --tests SslConsumerTest.testClusterResourceListener ``` pass on my local also. will merge it tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1544283639 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -189,14 +190,47 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { -val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) -assertEquals(0, earliestOffset.offset()) +def check(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + assertEquals(0, earliestOffset.offset()) -val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) -assertEquals(3, latestOffset.offset()) + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + assertEquals(3, latestOffset.offset()) -val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) -assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) + assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) +} + +// case 0: test the offsets from leader's append path +check() + +// case 1: test the offsets from follower's append path. Review Comment: @junrao the extra tests are added. please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
showuon commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1544263703 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -289,13 +289,17 @@ class BrokerMetadataPublisher( try { // Start log manager, which will perform (potentially lengthy) // recovery-from-unclean-shutdown if required. - logManager.startup(metadataCache.getAllTopics()) - - // Delete partition directories which we're not supposed to have. We have - // to do this before starting ReplicaManager, so that the stray replicas - // don't block creation of new ones with different IDs but the same names. - // See KAFKA-14616 for details. - logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics()) + logManager.startup( +metadataCache.getAllTopics(), +isStray = (topicId, partition) => { + val tid = topicId.getOrElse { +throw new RuntimeException(s"Partition $partition does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + Option(newImage.topics().getPartition(tid, partition.partition())) +.exists(_.replicas.contains(brokerId)) Review Comment: In original `findStrayReplicas`, we'll treat 2 cases as stray: 1. newImage doesn't contain the topic ID 2. newImage contains the topicID, but doesn't include this replica Here, we only treat case (2) as stray, not (1). Could I know what's the reason? Also, why can't we invoke `findStrayReplicas` or `isStrayReplicas` after your change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16068) Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors
[ https://issues.apache.org/jira/browse/KAFKA-16068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] johndoe reassigned KAFKA-16068: --- Assignee: johndoe > Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin > scanning errors > --- > > Key: KAFKA-16068 > URL: https://issues.apache.org/jira/browse/KAFKA-16068 > Project: Kafka > Issue Type: Task > Components: connect >Reporter: Greg Harris >Assignee: johndoe >Priority: Minor > Labels: newbie++ > > The ConnectorValidationIntegrationTest creates test plugins, some with > erroneous behavior. In particular: > > {noformat} > [2023-12-29 10:28:06,548] ERROR Failed to discover Converter in classpath: > Unable to instantiate TestConverterWithPrivateConstructor: Plugin class > default constructor must be public > (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138) > [2023-12-29 10:28:06,550] > ERROR Failed to discover Converter in classpath: Unable to instantiate > TestConverterWithConstructorThatThrowsException: Failed to invoke plugin > constructor (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:138) > java.lang.reflect.InvocationTargetException{noformat} > These plugins should be eliminated from the classpath, so that the errors do > not appear in unrelated tests. Instead, plugins with erroneous behavior > should only be present in the TestPlugins, so that tests can opt-in to > loading them. > There are already plugins with private constructors and > throwing-exceptions-constructors, so they should be able to be re-used. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15899 [1/2] Move kafka.security package from core to server module [kafka]
nizhikov commented on PR #15572: URL: https://github.com/apache/kafka/pull/15572#issuecomment-2026879663 Tests looks OK for me. I've checked locally and `kafka.server.ReplicaManagerTest` fails with the same error in trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Testing the terraform file [kafka]
Rashi-Confluent opened a new pull request, #15623: URL: https://github.com/apache/kafka/pull/15623 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-1194: changes needed to run on Windows [kafka]
ckmbks commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-2026804291 Who can reslove that test failures? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org