Re: [PR] KAFKA-12216; Improve flaky test reporting [kafka]
dajac commented on PR #14862: URL: https://github.com/apache/kafka/pull/14862#issuecomment-1936915696 I need to update the PR to only apply this while building PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
dajac commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936915364 @gharris1727 I see. So if I understand correctly, two tests are flaky in the suite so we move the entire suite. This does not seem right to me. This particular suite is the one that you want to run when doing changes in the admin client. I have been working on https://github.com/apache/kafka/pull/14862 which changes how we report flaky tests in Jenkins. I believe that this is a better was to handle them. What do you think? However, I do agree with marking the slow tests as integration tests. I suppose that they should have been integration tests anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
gharris1727 commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936910131 @dajac No, those were added because they were flaky: https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.admin.KafkaAdminClientTest=FLAKY -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
dajac commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936903652 @gharris1727 Thanks for looking into this. Overall, I like the idea. However, I see many suites that are unit tests and therefore we must be kept as unit tests (e.g. admin client test). Are those recategorized because there were slow? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
gharris1727 commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936821280 @mumrah I used this page: https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=FLAKY and sorted by Flaky, Failed, and Mean Execution Time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]
mumrah commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936811537 This is great, @gharris1727! How did you come up with the list of tests to mark as integration? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan merged PR #15324: URL: https://github.com/apache/kafka/pull/15324 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]
jolshan commented on PR #15262: URL: https://github.com/apache/kafka/pull/15262#issuecomment-1936789321 The issues started for 3.7 on the same day so it is one of the 3 commits backported feb 2 https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=3.7=kafka.server.LogDirFailureTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16225) Flaky test suite LogDirFailureTest
[ https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816260#comment-17816260 ] Justine Olshan commented on KAFKA-16225: Looks like it happened on 3.7 the same day which narrows it down to 3 commits? https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=3.7=kafka.server.LogDirFailureTest > Flaky test suite LogDirFailureTest > -- > > Key: KAFKA-16225 > URL: https://issues.apache.org/jira/browse/KAFKA-16225 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Reporter: Greg Harris >Priority: Major > Labels: flaky-test > > I see this failure on trunk and in PR builds for multiple methods in this > test suite: > {noformat} > org.opentest4j.AssertionFailedError: expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179) > at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715) > at > kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186) > > at > kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat} > It appears this assertion is failing > [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715] > The other error which is appearing is this: > {noformat} > org.opentest4j.AssertionFailedError: Unexpected exception type thrown, > expected: but was: > > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67) > at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) > at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) > at > kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164) > > at > kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat} > Failures appear to have started in this commit, but this does not indicate > that this commit is at fault: > [https://github.com/apache/kafka/tree/3d95a69a28c2d16e96618cfa9a1eb69180fb66ea] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]
jolshan commented on PR #15262: URL: https://github.com/apache/kafka/pull/15262#issuecomment-1936787524 Hey folks. We've seen a large increase in LogDirFailureTest after this PR. Can we take a look and see if something here caused it? gradle enterprise: https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest jira: https://issues.apache.org/jira/browse/KAFKA-16225 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark tests with the most flaky failures as integration tests [kafka]
gharris1727 commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936785675 We did save a little time, but not really too much because the build tends to spend a lot of time waiting for compileTestScala to complete :) `./gradlew clean test` is 45 minutes on my machine, `./gradlew clean unitTest` was ~10m, and now is ~8m. If the compilation is already finished, `./gradlew unitTest` is ~5m. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16225) Flaky test suite LogDirFailureTest
[ https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816258#comment-17816258 ] Justine Olshan commented on KAFKA-16225: I'm encountering this too. It looks pretty bad https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest > Flaky test suite LogDirFailureTest > -- > > Key: KAFKA-16225 > URL: https://issues.apache.org/jira/browse/KAFKA-16225 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Reporter: Greg Harris >Priority: Major > Labels: flaky-test > > I see this failure on trunk and in PR builds for multiple methods in this > test suite: > {noformat} > org.opentest4j.AssertionFailedError: expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179) > at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715) > at > kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186) > > at > kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat} > It appears this assertion is failing > [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715] > The other error which is appearing is this: > {noformat} > org.opentest4j.AssertionFailedError: Unexpected exception type thrown, > expected: but was: > > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > > at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67) > at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) > at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) > at > kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164) > > at > kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat} > Failures appear to have started in this commit, but this does not indicate > that this commit is at fault: > [https://github.com/apache/kafka/tree/3d95a69a28c2d16e96618cfa9a1eb69180fb66ea] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] fix syntax error [kafka]
mberndt123 opened a new pull request, #15350: URL: https://github.com/apache/kafka/pull/15350 Seems like a good idea to balance those parens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark tests with the most flaky failures as integration tests [kafka]
ijuma commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936765040 This is very cool. Any chance you could do the same for the top N slowest unit tests (they're not unit tests if they're that slow)? Then the `unitTest` command would be useful enough to run locally before submitting PRs, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
splett2 commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1484905012 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: @CalvinConfluent hmm, interesting. this is an inconsistency between the KRaft implementation of reassignment completion and the ZK controller implementation of reassignment completion. In the ZK controller, we only allow a reassignment to be completed if the ISR contains all of the target replicas. So in your second example, the ISR would need to contain [0, 1, 7, 8]. code reference is in `KafkaController.isReassignmentComplete`: ``` val targetReplicas = assignment.targetReplicas.toSet targetReplicas.subsetOf(isr) ``` I am wondering whether it was intentional to change the criteria? IMO, the ZK controller criteria for completing reassignment makes more sense than the existing kcontroller criteria, but requiring at least min ISR replicas to be in the ISR (your PR) also seems reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
splett2 commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1484905012 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: @CalvinConfluent hmm, interesting. this is an inconsistency in the KRaft implementation of reassignment completion and the ZK controller implementation of reassignment completion. In the ZK controller, we only allow a reassignment to be completed if the ISR contains all of the target replicas. So in your second example, the ISR would need to contain [0, 1, 7, 8]. code reference is in `KafkaController.isReassignmentComplete`: ``` val targetReplicas = assignment.targetReplicas.toSet targetReplicas.subsetOf(isr) ``` I am wondering whether it was intentional to change the criteria? IMO, the ZK controller criteria for completing reassignment makes more sense than the existing kcontroller criteria, but requiring at least min ISR replicas to be in the ISR (your PR) also seems reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16242: Mark tests with the most flaky failures as integration tests [kafka]
gharris1727 commented on PR #15349: URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936736732 Alright, I got 3 consecutive green builds, and I haven't seen any flaky failures locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1484883905 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: A better example is with min ISR=3 1. originally replicas [0,1,2,3], ISR[0,1,2,3] 2. Start reassignment. Target replica set [0,1,7,8]. Temporarily, the replica set is [0,1,2,3,7,8], ISR[0,1,2,3]. 3. Replication goes on and some brokers fail and up, and we could experience the following transition for ISR. [0,1,2,3] -> [3,7] -> [7] -> [7,8] 4. At this point, the reassignment can be completed. 5. In the current world, we can't do much for the above. But in the ELR world, we can have something different in step 3. ISR/ELR: [0,1,2,3]/[] -> [3,7]/[2] -> [7]/[2,3] -> [7,8]/[2,3]. If we complete the reassignment now, we loss the potential good replicas [2,3] and have a higher risk of data loss. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1936706379 > Merged the code of [KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) into this PR. Why? We are mixing up two ticket if we do this (cf https://github.com/apache/kafka/pull/14426#discussion_r1483677544) Can you remove those changes? Fixing the grace period should be kept separate to get different commits for different fixes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816239#comment-17816239 ] Philip Nee commented on KAFKA-16156: There seem to be a subtle difference in behavior between the async and the legacy consumer. The legacy one catches the error without doing anything, the async client doesn't seem to be handling the exception. The fix would be easy - but I'll run the same test first > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, system tests >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at >
[jira] [Commented] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task
[ https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816238#comment-17816238 ] Matthias J. Sax commented on KAFKA-16241: - {quote} Detected out-of-order KTable update {quote} I would hope that versioned store by default would fix this, but there is no real timeline for this... (we could also add a sensor for it and remove the log line – we did discuss this in the past with no resolution... not sure...) {quote}Detected that shutdown was requested. All clients in this app will now begin to shutdown {quote} Interesting – I did not look into the logs in detail but just collected them – it on the main processing loop so no wonder we this this so often; guess `taskManager.rebalanceInProgress()` just return `true` for a longer period of while (I would assume that `isRunning()` is already `false`) – maybe just a simple static boolean flag to execute `maybeSendShutdown();` only once might be sufficient to address this? > Kafka Streams hits IllegalStateException trying to recycle a task > - > > Key: KAFKA-16241 > URL: https://issues.apache.org/jira/browse/KAFKA-16241 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Matthias J. Sax >Priority: Major > Attachments: streams-1.zip, streams-2.zip, streams-3.zip > > > Running with EOS-v2 (not sure if relevant or not) and hitting: > {code:java} > [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] > stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 > cleanly. Attempting to close remaining tasks before re-throwing: > (org.apache.kafka.streams.processor.internals.TaskManager) > java.lang.IllegalStateException: Illegal state RESTORING while recycling > active task 1_0 > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582) > at > org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125) > at > org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675) > at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > {code} > Logs of all three KS instances attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task
[ https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816234#comment-17816234 ] A. Sophie Blee-Goldman commented on KAFKA-16241: Off-topic but I opened the logs for a second and there is a huge amount of spam. I know everyone is probably aware of the "Detected out-of-order KTable update" logspam by now, but there was over 100MB from that one log message alone (for a single node)...yikes Something that's new, or new to me at least, is this: {code:java} [2024-02-09 03:23:55,168] WARN [i-0fede2697f39580f9-StreamThread-1] stream-thread [i-0fede2697f39580f9-StreamThread-1] Detected that shutdown was requested. All clients in this app will now begin to shutdown (org.apache.kafka.streams.processor.internals.StreamThread) [2024-02-09 03:23:55,168] INFO [i-0fede2697f39580f9-StreamThread-1] [Consumer instanceId=ip-172-31-14-207-1, clientId=i-0fede2697f39580f9-StreamThread-1-consumer, groupId=stream-soak-test] Request joining group due to: Shutdown requested (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) {code} Those two lines are logged repeatedly (and excessively) in a very tight loop at the end of the logs. Filtering those two lines out dropped the file size by another 200MB...that's a lot!! And it's all from about 10 seconds of real time. So it's a VERY busy loop... > Kafka Streams hits IllegalStateException trying to recycle a task > - > > Key: KAFKA-16241 > URL: https://issues.apache.org/jira/browse/KAFKA-16241 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Matthias J. Sax >Priority: Major > Attachments: streams-1.zip, streams-2.zip, streams-3.zip > > > Running with EOS-v2 (not sure if relevant or not) and hitting: > {code:java} > [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] > stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 > cleanly. Attempting to close remaining tasks before re-throwing: > (org.apache.kafka.streams.processor.internals.TaskManager) > java.lang.IllegalStateException: Illegal state RESTORING while recycling > active task 1_0 > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582) > at > org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125) > at > org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675) > at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > {code} > Logs of all three KS instances attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1484850347 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: I think it is more like a design issue/bug. The following example is valid currently: An existing replica set is [0,1,2], and a new partition assignment is [0,1,3]. So the "adding" replica is 3. The controller can finish the reassignment if the current ISR includes all the "adding" replicas like [3]. This PR prevents completing such reassignment if min ISR is 2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
CalvinConfluent commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1484850347 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: I think it is more like a design issue/bug. The following example is valid currently: An existing replica set is [0,1,2], and a new partition assignment is [0,1,3]. So the "adding" replica is 3. The controller can finish the reassignment if the ISR if the current ISR is [3]. This PR prevents completing such reassignment if min ISR is 2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16202: Extra dot in error message in producer [kafka]
infantlikesprogramming commented on code in PR #15296: URL: https://github.com/apache/kafka/pull/15296#discussion_r1484843384 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -704,7 +704,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons "topic-partition may not exist or the user may not have Describe access to it", batch.topicPartition); } else { -log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " + +log.warn("Received invalid metadata error in produce request on partition {} due to {} Going " + "to request metadata update now", batch.topicPartition, Review Comment: Thank you so much for your comment. I have made changes based on your review in the latest (4th) commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]
splett2 commented on code in PR #14604: URL: https://github.com/apache/kafka/pull/14604#discussion_r1484842081 ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: I haven't been following along the ELR KIP closely so I may be missing some of the details. My understanding is that a reassignment is only completed once the target replicas are all in the ISR. Isn't it better to just reject the initiation of a reassignment where the target replica count is less than the configured ISR? From what I can tell, if a user initiates a reassignment where the target replica count < ISR, the reassignment will just get stuck indefinitely. ## metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java: ## @@ -124,6 +124,8 @@ Optional maybeCompleteReassignment(List targetIs if (!newTargetIsr.contains(replica)) return Optional.empty(); } +if (newTargetIsr.size() < Math.min(minIsrCount, newTargetReplicas.size())) return Optional.empty(); Review Comment: I haven't been following along the ELR KIP closely so I may be missing some of the details. My understanding is that a reassignment is only completed once the target replicas are all in the ISR. Isn't it better to just reject the initiation of a reassignment where the target replica count is less than the configured ISR? From what I can tell, if a user initiates a reassignment where the target replica count < min ISR count, the reassignment will just get stuck indefinitely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1484643624 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -127,6 +127,13 @@ public synchronized Cluster fetch() { return cache.cluster(); } +/** + * Get the current metadata cache. + */ +public synchronized MetadataCache fetchCache() { Review Comment: Perhaps we could make `cache` volatile and avoid the synchronization? ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -39,8 +39,9 @@ import java.util.stream.Collectors; /** - * An internal mutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster + * An internal immutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster * instance which is optimized for read access. + * Prefer to extend MetadataCache's API for internal client usage Vs the public {@link Cluster} */ public class MetadataCache { Review Comment: We don't have to do it here, but "snapshot" might be a better name since it suggests immutability. ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -113,14 +116,28 @@ Optional nodeById(int id) { return Optional.ofNullable(nodes.get(id)); } -Cluster cluster() { +public Cluster cluster() { if (clusterInstance == null) { throw new IllegalStateException("Cached Cluster instance should not be null, but was."); } else { return clusterInstance; } } +/** + * Get leader-epoch for partition. + * @param tp partition + * @return leader-epoch if known, else return optional.empty() + */ +public Optional leaderEpochFor(TopicPartition tp) { Review Comment: Would it make sense to use `OptionalInt`? ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon * @param latestLeaderEpoch latest leader's epoch. */ void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) { -if (!currentLeaderEpoch.equals(latestLeaderEpoch)) { +if (latestLeaderEpoch.isPresent() Review Comment: Hmm, this looks like a change in behavior. Previously we would override a known current leader epoch if `latestLeaderEpoch` is not defined. I am not sure if that was intentional. I recall there we had some logic which assumed that metadata version might be downgraded and no longer provide a leader epoch. Not sure it matters here though since we are not changing the leader, just updating the epoch. ## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ## @@ -123,6 +126,34 @@ public static Cluster clusterWith(final int nodes, final String topic, final int return clusterWith(nodes, Collections.singletonMap(topic, partitions)); } +public static MetadataCache metadataCacheWith(final int nodes, final Map topicPartitionCounts) { Review Comment: Useful to have a brief javadoc to explain replica counts and assignment strategy. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -879,17 +884,12 @@ private List drainBatchesForOneNode(Metadata metadata, Node node, // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; -Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); -// Although a small chance, but skip this partition if leader has changed since the partition -> node assignment obtained from outside the loop. Review Comment: I guess this race is no longer possible since we are using the snapshot, which is immutable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16242: Mark tests with the most flaky failures as integration tests [kafka]
gharris1727 opened a new pull request, #15349: URL: https://github.com/apache/kafka/pull/15349 These tests have had at least one flaky test failure in the last 28 days, and aren't already marked as integration tests. I looked at the top-50 flakiest tests in the project with gradle enterprise, and then spot-checked the top-50 flakiest tests in core, connect, tools, and streams. I've also temporarily changed the Jenkinsfile to only run the unit tests to get an idea of the runtime and success rate of the unit tests in Jenkins. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
OmniaGM commented on code in PR #15348: URL: https://github.com/apache/kafka/pull/15348#discussion_r1484726099 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java: ## @@ -0,0 +1,1485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.opentest4j.AssertionFailedError; + +import java.net.InetAddress; +import
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
OmniaGM commented on code in PR #15348: URL: https://github.com/apache/kafka/pull/15348#discussion_r1484726099 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java: ## @@ -0,0 +1,1485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.opentest4j.AssertionFailedError; + +import java.net.InetAddress; +import
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jeqo commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1936481385 @jolshan thanks for catching this! adding it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task
[ https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16241: Affects Version/s: 3.6.1 > Kafka Streams hits IllegalStateException trying to recycle a task > - > > Key: KAFKA-16241 > URL: https://issues.apache.org/jira/browse/KAFKA-16241 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Matthias J. Sax >Priority: Major > Attachments: streams-1.zip, streams-2.zip, streams-3.zip > > > Running with EOS-v2 (not sure if relevant or not) and hitting: > {code:java} > [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] > stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 > cleanly. Attempting to close remaining tasks before re-throwing: > (org.apache.kafka.streams.processor.internals.TaskManager) > java.lang.IllegalStateException: Illegal state RESTORING while recycling > active task 1_0 > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582) > at > org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125) > at > org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675) > at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > {code} > Logs of all three KS instances attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task
[ https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16241: Attachment: streams-3.zip > Kafka Streams hits IllegalStateException trying to recycle a task > - > > Key: KAFKA-16241 > URL: https://issues.apache.org/jira/browse/KAFKA-16241 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Matthias J. Sax >Priority: Major > Attachments: streams-1.zip, streams-2.zip, streams-3.zip > > > Running with EOS-v2 (not sure if relevant or not) and hitting: > {code:java} > [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] > stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 > cleanly. Attempting to close remaining tasks before re-throwing: > (org.apache.kafka.streams.processor.internals.TaskManager) > java.lang.IllegalStateException: Illegal state RESTORING while recycling > active task 1_0 > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582) > at > org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125) > at > org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675) > at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > {code} > Logs of all three KS instances attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
jolshan commented on code in PR #15348: URL: https://github.com/apache/kafka/pull/15348#discussion_r1484696012 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java: ## @@ -0,0 +1,1485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.SyncGroupRequestData; +import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.classic.ClassicGroup; +import org.apache.kafka.coordinator.group.classic.ClassicGroupState; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.opentest4j.AssertionFailedError; + +import java.net.InetAddress; +import
[jira] [Created] (KAFKA-16242) Mark flaky tests as integration tests
Greg Harris created KAFKA-16242: --- Summary: Mark flaky tests as integration tests Key: KAFKA-16242 URL: https://issues.apache.org/jira/browse/KAFKA-16242 Project: Kafka Issue Type: Test Components: unit tests Reporter: Greg Harris Assignee: Greg Harris It would be valuable to have the `unitTest` gradle target run only reliable tests that can be used for CI gating. If we push all of the flaky or possibly-flaky tests to the integrationTest target, we can start gating CI on deterministic failures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task
[ https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16241: Attachment: streams-2.zip > Kafka Streams hits IllegalStateException trying to recycle a task > - > > Key: KAFKA-16241 > URL: https://issues.apache.org/jira/browse/KAFKA-16241 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Attachments: streams-1.zip, streams-2.zip > > > Running with EOS-v2 (not sure if relevant or not) and hitting: > {code:java} > [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] > stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 > cleanly. Attempting to close remaining tasks before re-throwing: > (org.apache.kafka.streams.processor.internals.TaskManager) > java.lang.IllegalStateException: Illegal state RESTORING while recycling > active task 1_0 > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582) > at > org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125) > at > org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675) > at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > {code} > Logs of all three KS instances attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
jolshan commented on code in PR #15348: URL: https://github.com/apache/kafka/pull/15348#discussion_r1484689761 ## checkstyle/suppressions.xml: ## @@ -336,11 +336,11 @@ + files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/> Review Comment: what's your take on in file suppressions vs suppressions.xml? On a previous PR I was asked to make the supppression in file for specific methods: https://github.com/apache/kafka/pull/15139#discussion_r1451075038 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task
[ https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16241: Attachment: streams-1.zip > Kafka Streams hits IllegalStateException trying to recycle a task > - > > Key: KAFKA-16241 > URL: https://issues.apache.org/jira/browse/KAFKA-16241 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Attachments: streams-1.zip > > > Running with EOS-v2 (not sure if relevant or not) and hitting: > {code:java} > [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] > stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 > cleanly. Attempting to close remaining tasks before re-throwing: > (org.apache.kafka.streams.processor.internals.TaskManager) > java.lang.IllegalStateException: Illegal state RESTORING while recycling > active task 1_0 > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582) > at > org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125) > at > org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675) > at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > {code} > Logs of all three KS instances attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task
Matthias J. Sax created KAFKA-16241: --- Summary: Kafka Streams hits IllegalStateException trying to recycle a task Key: KAFKA-16241 URL: https://issues.apache.org/jira/browse/KAFKA-16241 Project: Kafka Issue Type: Bug Components: streams Reporter: Matthias J. Sax Running with EOS-v2 (not sure if relevant or not) and hitting: {code:java} [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 cleanly. Attempting to close remaining tasks before re-throwing: (org.apache.kafka.streams.processor.internals.TaskManager) java.lang.IllegalStateException: Illegal state RESTORING while recycling active task 1_0 at org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582) at org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125) at org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675) at org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) {code} Logs of all three KS instances attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison commented on code in PR #15274: URL: https://github.com/apache/kafka/pull/15274#discussion_r1484652868 ## tests/kafkatest/services/console_consumer.py: ## @@ -21,7 +21,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin, JmxTool -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 +from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7 Review Comment: That seems to work. However while running the system tests I found an issue loading configs via `--consumer.config`. I pushed https://github.com/apache/kafka/pull/15274/commits/c6355fe630a7eae8ea55c8806c66b04cf389bfa2 to fix it. I've kicked another run on our system tests CI, hopefully it will be clean now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1484638827 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -52,7 +52,7 @@ public class MetadataCache { private final Map metadataByPartition; private final Map topicIds; private final Map topicNames; -private Cluster clusterInstance; +private InternalCluster clusterInstance; Review Comment: > We should confirm though. I don't see it being used mutably in code. I see historically, it was made mutable to support deletion/updates within cache, but the deletion/update code has since been removed. As far i can see, read-only semantic. So i have treated `MetadataCache` as immutable cache, made its internal data structures unmodifiable and updated the javadoc. All clients test pass locally, hopefully Jenkins signal is green too 爛 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1484632678 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -111,37 +233,138 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListGroupCommand(quorum: String): Unit = { + @Test + def testConsumerGroupTypesFromString(): Unit = { +var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer") +assertEquals(Set(GroupType.CONSUMER), result) + +result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, classic") +assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result) + +result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, Classic") +assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result) + +assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong")) + +assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic")) + +assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString(" , ,")) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testListGroupCommandClassicProtocol(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" +val protocolGroup = "protocol-group" + addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) +addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) var out = "" var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) -}, s"Expected to find $simpleGroup, $group and no header, but found $out") + !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) +}, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) -}, s"Expected to find $simpleGroup, $group and the header, but found $out") + out.contains("STATE") && !out.contains("TYPE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) +}, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") + +cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") +TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) +}, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") + +cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "--type") +TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) +}, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "Stable") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(group) && out.contains("Stable") -}, s"Expected to find $group in state Stable and the header, but found $out") + out.contains("STATE") && out.contains(group) && out.contains("Stable") && out.contains(protocolGroup) +}, s"Expected to find $group, $protocolGroup in state Stable and the header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "stable") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(group) && out.contains("Stable") -}, s"Expected to find $group in state Stable and
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1484632200 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -18,74 +18,196 @@ package kafka.admin import joptsimple.OptionException import org.junit.jupiter.api.Assertions._ -import kafka.utils.TestUtils -import org.apache.kafka.common.ConsumerGroupState +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.ConsumerGroupListing -import java.util.Optional - +import org.apache.kafka.common.{ConsumerGroupState, GroupType} +import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Optional class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroups(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" +val protocolGroup = "protocol-group" + +createOffsetsTopic() Review Comment: Is it required or is this just for uniformity? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1484631548 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -18,44 +18,47 @@ package kafka.admin import joptsimple.OptionException import org.junit.jupiter.api.Assertions._ -import kafka.utils.TestUtils -import org.apache.kafka.common.ConsumerGroupState +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.ConsumerGroupListing -import java.util.Optional - +import org.apache.kafka.common.{ConsumerGroupState, GroupType} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Optional class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroups(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" +val protocolGroup = "protocol-group" + addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) +addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list") val service = getConsumerGroupService(cgcArgs) -val expectedGroups = Set(group, simpleGroup) +val expectedGroups = Set(protocolGroup, group, simpleGroup) var foundGroups = Set.empty[String] TestUtils.waitUntilTrue(() => { foundGroups = service.listConsumerGroups().toSet expectedGroups == foundGroups }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListWithUnrecognizedNewConsumerOption(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: String): Unit = { val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list") assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs)) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroupsWithStates(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) Review Comment: AH yes thankss I understand, I was just asking if I should make changes related to the state methods in this PR, since I got comments before to not put unrelated changes in the same PR, I'll just do it and you can lmk if you wanna revert it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
rreddy-22 commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1484628868 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2811,6 +2811,72 @@ public void testListConsumerGroupsWithStates() throws Exception { } } +@Test +public void testListConsumerGroupsWithTypes() throws Exception { +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +// Test with a specific state filter but no type filter in list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + +env.kafkaClient().prepareResponseFrom( +request -> request instanceof ListGroupsRequest && +!((ListGroupsRequest) request).data().statesFilter().isEmpty() && +((ListGroupsRequest) request).data().typesFilter().isEmpty(), Review Comment: haha okay, I actually started off by adding it to the unsupported version response method before getting the comments, it's honestly weird that the other tests don't validate anything lol, I had this urge to change the state tests also -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]
jolshan commented on PR #15324: URL: https://github.com/apache/kafka/pull/15324#issuecomment-1936317088 Hey there @jeqo looks like check style failed. Do you mind adding the apache header to your new benchmark? ```/home/jenkins/workspace/Kafka_kafka-pr_PR-15324/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java:1: Line does not match expected header line of '/*'. [Header]``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper
[ https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816120#comment-17816120 ] Gaurav Salvi edited comment on KAFKA-16239 at 2/9/24 3:42 PM: -- [~divijvaidya] : What should the references to 'IntegrationTestHelper' replaced with? Or we can simply remove the lines with it. Also I would like to work on it. Please assign to me. was (Author: JIRAUSER304146): [~divijvaidya] : What should the references to 'IntegrationTestHelper' replaced with? Or we can simply remove the lines with it. > Clean up references to non-existent IntegrationTestHelper > - > > Key: KAFKA-16239 > URL: https://issues.apache.org/jira/browse/KAFKA-16239 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > Labels: newbie > > A bunch of places in the code javadocs and READ docs refer to a class called > IntegrationTestHelper. Such a class does not exist. > This task will clean up all referenced to IntegrationTestHelper from Kafka > code base. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper
[ https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816120#comment-17816120 ] Gaurav Salvi commented on KAFKA-16239: -- [~divijvaidya] : What should the references to 'IntegrationTestHelper' replaced with? Or we can simply remove the lines with it. > Clean up references to non-existent IntegrationTestHelper > - > > Key: KAFKA-16239 > URL: https://issues.apache.org/jira/browse/KAFKA-16239 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Minor > Labels: newbie > > A bunch of places in the code javadocs and READ docs refer to a class called > IntegrationTestHelper. Such a class does not exist. > This task will clean up all referenced to IntegrationTestHelper from Kafka > code base. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
lianetm commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1936141808 @Phuc-Hong-Tran regarding this: > Just for clarification, when we were talking about "implement and test everything up to the point where the field is populated", does that mean we're not gonna implement and test the part where the client receive the assignment from broker at this stage? We do need to: - `implement and test everything up to the point where the field is populated ` (from the point the user calls subscribe with the SubscriptionPattern, to the point where the supplied regex is available in the HB builder, to be included in the HB request). - include the field in the HB request. This is where we do need the RPC to be updated to support the new field. I could be missing something, but I would say we don't need any changes for the part where the client receives the assignment from the broker after subscribing to a regex. It should be exactly the same logic as when a client receives an assignment from the broker after subscribing to a list of topics. After sending the HB with the new regex, the client will receive the list of partitions assigned to it, and will reconcile them, just as it reconciles all assignments received (no matter the subscription type that led to receiving that assignment). Just for the record, the legacy coordinator does have a bit of logic ([here](https://github.com/apache/kafka/blob/ec4a8aaadbc95cfcf0de2f5e1385373f095298ca/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L367)) for validating assignments received after subscribing to a regex, where it checks that the assignment received matches the regex. Our initial thought was not to include any assignment validation like that in the client, in an attempt to simplify it: the broker is the sole responsible for computing the regex and target assignment, the client takes and reconciles whatever the broker sends, and if the subscription changes from the client side, we have a common logic (not specific for regex), to make sure that the new subscription is sent to the broker (what the legacy achieved with the rejoin) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
lianetm commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1936091369 Hey @Phuc-Hong-Tran , regarding the mixed usage of subscribe with `Pattern` and with `SubscriptionPattern`, my opinion is that it is something we should live with to provide a smooth transition, while the usage of `Pattern` is deprecated. So I would say that we shouldn't restrict it by throwing any new exception to the user (which btw, would introduce API level changes not included in the KIP, so it would require an updated/new KIP). We could just allow subsequent calls to both subscribe with Pattern or SubscriptionPattern, and just ensure that the latest prevails. This is the behaviour for subsequent calls to `subscribe(Pattern..)`, tested in [testSubsequentPatternSubscription](https://github.com/apache/kafka/blob/ec4a8aaadbc95cfcf0de2f5e1385373f095298ca/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L417). Just for the record, there is a restriction (see [here](https://github.com/apache/kafka/blob/ec4a8aaadbc95cfcf0de2f5e1385373f095298ca/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L162)) for not allowing mixed usage of subscribe, but only when mixing different subscription types (topics, partitions, pattern). We continue to respect that, without introducing any new restriction for the calls that in the end represent the same pattern-based subscription type (AUTO_PATTERN). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Refactor GroupMetadataManagerTest [kafka]
dajac opened a new pull request, #15348: URL: https://github.com/apache/kafka/pull/15348 `GroupMetadataManagerTest` class got a little under control. We have too many things defined in it. As a first steps, this patch extracts all the inner classes. It also extracts all the helper methods. However, the logic is not changed at all. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16102) about DynamicListenerConfig, the dynamic modification of the listener's port or IP does not take effect.
[ https://issues.apache.org/jira/browse/KAFKA-16102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815470#comment-17815470 ] Jialun Peng edited comment on KAFKA-16102 at 2/9/24 2:06 PM: - Hi, can you take a look at this issue? [~cmccabe_impala_fa3f] was (Author: JIRAUSER303739): Hi, can you take a look at this issue? > about DynamicListenerConfig, the dynamic modification of the listener's port > or IP does not take effect. > > > Key: KAFKA-16102 > URL: https://issues.apache.org/jira/browse/KAFKA-16102 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 3.6.0 > Environment: Must be present in any environment. >Reporter: Jialun Peng >Assignee: Jialun Peng >Priority: Minor > Labels: easyfix > Fix For: 3.8.0 > > Original Estimate: 96h > Remaining Estimate: 96h > > When I dynamically modify the parameters related to Kafka listeners, such as > changing the IP or port value of a listener, the dynamic parameters under the > corresponding path in ZooKeeper are updated. However, in reality, the > modification of the IP or port for the corresponding listener does not take > effect. This phenomenon consistently occurs. And there is a slight > improvement as the error "Security protocol cannot be updated for existing > listener" will be eliminated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]
Phuc-Hong-Tran commented on PR #15020: URL: https://github.com/apache/kafka/pull/15020#issuecomment-1935914902 @philipnee All done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]
Phuc-Hong-Tran commented on PR #15020: URL: https://github.com/apache/kafka/pull/15020#issuecomment-1935913572 @philipnee Will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1935911914 Merged the code of [KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) into this PR. Everything else left as it was. All the tests still passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16240) Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
[ https://issues.apache.org/jira/browse/KAFKA-16240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge updated KAFKA-16240: -- Description: Failed run [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/] Stack trace {code:java} org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: deleteRecords(api=DELETE_RECORDS) at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860) {code} was: Failed run [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/] Stack trace ``` org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: deleteRecords(api=DELETE_RECORDS) at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860) ``` > Flaky test > PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft > - > > Key: KAFKA-16240 > URL: https://issues.apache.org/jira/browse/KAFKA-16240 > Project: Kafka > Issue Type: Test >Reporter: Gantigmaa Selenge >Priority: Minor > > Failed run > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/] > Stack trace > {code:java} > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: deleteRecords(api=DELETE_RECORDS) at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16240) Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
Gantigmaa Selenge created KAFKA-16240: - Summary: Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft Key: KAFKA-16240 URL: https://issues.apache.org/jira/browse/KAFKA-16240 Project: Kafka Issue Type: Test Reporter: Gantigmaa Selenge Failed run [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/] Stack trace ``` org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: deleteRecords(api=DELETE_RECORDS) at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860) ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16240) Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
[ https://issues.apache.org/jira/browse/KAFKA-16240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge updated KAFKA-16240: -- Priority: Minor (was: Major) > Flaky test > PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft > - > > Key: KAFKA-16240 > URL: https://issues.apache.org/jira/browse/KAFKA-16240 > Project: Kafka > Issue Type: Test >Reporter: Gantigmaa Selenge >Priority: Minor > > Failed run > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/] > Stack trace > ``` > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a > node assignment. Call: deleteRecords(api=DELETE_RECORDS) at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860) > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [MINOR] Fix docker image build [kafka]
stanislavkozlovski merged PR #15347: URL: https://github.com/apache/kafka/pull/15347 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13835) Fix two bugs related to dynamic broker configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816049#comment-17816049 ] Divij Vaidya commented on KAFKA-13835: -- [~cmccabe] the associated PR is merged. Can you please add the correct fix version and resolve this if you feel that this Jira is complete. > Fix two bugs related to dynamic broker configs in KRaft > --- > > Key: KAFKA-13835 > URL: https://issues.apache.org/jira/browse/KAFKA-13835 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Critical > Labels: 4.0-blocker > > The first bug is that we were calling reloadUpdatedFilesWithoutConfigChange > when a topic configuration was changed, but not when a broker configuration > was changed. This was backwards -- this function must be called only for > BROKER configs, and never for TOPIC configs. (Also, this function is called > only for specific broker configs, not for cluster configs.) > The second bug is that there were several configurations such as > `max.connections` which were related to broker listeners, but which did not > involve creating or removing new listeners. We can and should support these > configurations in KRaft, since no additional work is needed to support them. > Only adding or removing listeners is unsupported. This PR adds support for > these by fixing the configuration change validation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12670) KRaft support for unclean.leader.election.enable
[ https://issues.apache.org/jira/browse/KAFKA-12670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-12670: - Component/s: kraft > KRaft support for unclean.leader.election.enable > > > Key: KAFKA-12670 > URL: https://issues.apache.org/jira/browse/KAFKA-12670 > Project: Kafka > Issue Type: Task > Components: kraft >Reporter: Colin McCabe >Assignee: Ryan Dielhenn >Priority: Major > > Implement KRaft support for the unclean.leader.election.enable > configurations. These configurations can be set at the topic, broker, or > cluster level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools
[ https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816045#comment-17816045 ] Divij Vaidya edited comment on KAFKA-14349 at 2/9/24 12:52 PM: --- [~cmccabe] might have forgot to close this since it's still open. As per [https://github.com/apache/kafka/blob/092dc7fc467ed7d354ec504d6939b3fcd7b80632/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L298] (I might be wrong) but we are only reconfiguring the IO thread pool and not the network thread pool. Let's wait for [~cmccabe] to clarify this. was (Author: divijvaidya): [~cmccabe] might have forgot to close this since it's still open. I am closing this. I have verified that controller threads pools can be dynamically resized as per https://github.com/apache/kafka/blob/092dc7fc467ed7d354ec504d6939b3fcd7b80632/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L298 > Support dynamically resizing the KRaft controller's thread pools > > > Key: KAFKA-14349 > URL: https://issues.apache.org/jira/browse/KAFKA-14349 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Labels: 4.0-blocker, kip-500 > > Support dynamically resizing the KRaft controller's request handler and > network handler thread pools. See {{DynamicBrokerConfig.scala}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools
[ https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816045#comment-17816045 ] Divij Vaidya commented on KAFKA-14349: -- [~cmccabe] might have forgot to close this since it's still open. I am closing this. I have verified that controller threads pools can be dynamically resized as per https://github.com/apache/kafka/blob/092dc7fc467ed7d354ec504d6939b3fcd7b80632/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L298 > Support dynamically resizing the KRaft controller's thread pools > > > Key: KAFKA-14349 > URL: https://issues.apache.org/jira/browse/KAFKA-14349 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Labels: 4.0-blocker, kip-500 > > Support dynamically resizing the KRaft controller's request handler and > network handler thread pools. See {{DynamicBrokerConfig.scala}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Fix docker image build [kafka]
VedarthConfluent opened a new pull request, #15347: URL: https://github.com/apache/kafka/pull/15347 Base image has removed bash. So we need to install it to run the bash scripts inside docker container. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools
[ https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816043#comment-17816043 ] Divij Vaidya commented on KAFKA-14349: -- [~cmccabe] can we remove "Modifying certain dynamic configurations on the standalone KRaft controller" from [https://kafka.apache.org/37/documentation.html#kraft_missing] after this JIRA? > Support dynamically resizing the KRaft controller's thread pools > > > Key: KAFKA-14349 > URL: https://issues.apache.org/jira/browse/KAFKA-14349 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Labels: 4.0-blocker, kip-500 > > Support dynamically resizing the KRaft controller's request handler and > network handler thread pools. See {{DynamicBrokerConfig.scala}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16238) ConnectRestApiTest broken after KIP-1004
[ https://issues.apache.org/jira/browse/KAFKA-16238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-16238. Fix Version/s: 3.8.0 Resolution: Fixed > ConnectRestApiTest broken after KIP-1004 > > > Key: KAFKA-16238 > URL: https://issues.apache.org/jira/browse/KAFKA-16238 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Fix For: 3.8.0 > > > KIP-1004 introduced a new configuration for connectors: 'tasks.max.enforce'. > The ConnectRestApiTest system test needs to be updated to expect the new > configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16238: Fix ConnectRestApiTest system test [kafka]
mimaison merged PR #15346: URL: https://github.com/apache/kafka/pull/15346 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: EventAccumulator should signal to one thread when key becomes available [kafka]
dajac merged PR #15340: URL: https://github.com/apache/kafka/pull/15340 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14127) KIP-858: Handle JBOD broker disk failure in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816038#comment-17816038 ] Divij Vaidya commented on KAFKA-14127: -- Hey folks 3.7's documentation still says that JBOD is a missing feature [1] in KRaft. Could we please fix that? [1] https://kafka.apache.org/37/documentation.html#kraft_missing > KIP-858: Handle JBOD broker disk failure in KRaft > - > > Key: KAFKA-14127 > URL: https://issues.apache.org/jira/browse/KAFKA-14127 > Project: Kafka > Issue Type: Improvement > Components: jbod, kraft >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > Labels: 4.0-blocker, kip-500, kraft > Fix For: 3.7.0 > > > Supporting configurations with multiple storage directories in KRaft mode -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim resolved KAFKA-16162. --- Fix Version/s: 3.7.0 Resolution: Fixed > New created topics are unavailable after upgrading to 3.7 > - > > Key: KAFKA-16162 > URL: https://issues.apache.org/jira/browse/KAFKA-16162 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.0 > > > In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration > request will include the `LogDirs` fields with UUID for each log dir in each > broker. This info will be stored in the controller and used to identify if > the log dir is known and online while handling AssignReplicasToDirsRequest > [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093]. > > While upgrading from old version, the kafka cluster will run in 3.7 binary > with old metadata version, and then upgrade to newer version using > kafka-features.sh. That means, while brokers startup and send the > brokerRegistration request, it'll be using older metadata version without > `LogDirs` fields included. And it makes the controller has no log dir info > for all brokers. Later, after upgraded, if new topic is created, the flow > will go like this: > 1. Controller assign replicas and adds in metadata log > 2. brokers fetch the metadata and apply it > 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment > 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica > assignment, controller will think the log dir in current replica is offline, > so triggering offline handler, and reassign leader to another replica, and > offline, until no more replicas to assign, so assigning leader to -1 (i.e. no > leader) > So, the results will be that new created topics are unavailable (with no > leader) because the controller thinks all log dir are offline. > {code:java} > lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic > quickstart-events3 --bootstrap-server localhost:9092 > > Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: > 3 ReplicationFactor: 3Configs: segment.bytes=1073741824 > Topic: quickstart-events3 Partition: 0Leader: none > Replicas: 7,2,6 Isr: 6 > Topic: quickstart-events3 Partition: 1Leader: none > Replicas: 2,6,7 Isr: 6 > Topic: quickstart-events3 Partition: 2Leader: none > Replicas: 6,7,2 Isr: 6 > {code} > The log snippet in the controller : > {code:java} > # handling 1st assignReplicaToDirs request > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] > offline-dir-assignment: changing partition(s): quickstart-events3-0, > quickstart-events3-2, quickstart-events3-1 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [7K5JBERyyqFFxIXSXYluJA, AA, AA], > partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition > change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, > isr=null, leader=-2, replicas=null, removingReplicas=null, > addingReplicas=null, leaderRecoveryState=-1, > directories=[7K5JBERyyqFFxIXSXYluJA, AA, > AA], eligibleLeaderReplicas=null, lastKnownELR=null) for > topic quickstart-events3 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [AA,
[jira] [Commented] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816037#comment-17816037 ] Omnia Ibrahim commented on KAFKA-16162: --- Marking this as resolved as the pr was committed > New created topics are unavailable after upgrading to 3.7 > - > > Key: KAFKA-16162 > URL: https://issues.apache.org/jira/browse/KAFKA-16162 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.0 > > > In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration > request will include the `LogDirs` fields with UUID for each log dir in each > broker. This info will be stored in the controller and used to identify if > the log dir is known and online while handling AssignReplicasToDirsRequest > [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093]. > > While upgrading from old version, the kafka cluster will run in 3.7 binary > with old metadata version, and then upgrade to newer version using > kafka-features.sh. That means, while brokers startup and send the > brokerRegistration request, it'll be using older metadata version without > `LogDirs` fields included. And it makes the controller has no log dir info > for all brokers. Later, after upgraded, if new topic is created, the flow > will go like this: > 1. Controller assign replicas and adds in metadata log > 2. brokers fetch the metadata and apply it > 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment > 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica > assignment, controller will think the log dir in current replica is offline, > so triggering offline handler, and reassign leader to another replica, and > offline, until no more replicas to assign, so assigning leader to -1 (i.e. no > leader) > So, the results will be that new created topics are unavailable (with no > leader) because the controller thinks all log dir are offline. > {code:java} > lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic > quickstart-events3 --bootstrap-server localhost:9092 > > Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: > 3 ReplicationFactor: 3Configs: segment.bytes=1073741824 > Topic: quickstart-events3 Partition: 0Leader: none > Replicas: 7,2,6 Isr: 6 > Topic: quickstart-events3 Partition: 1Leader: none > Replicas: 2,6,7 Isr: 6 > Topic: quickstart-events3 Partition: 2Leader: none > Replicas: 6,7,2 Isr: 6 > {code} > The log snippet in the controller : > {code:java} > # handling 1st assignReplicaToDirs request > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] > offline-dir-assignment: changing partition(s): quickstart-events3-0, > quickstart-events3-2, quickstart-events3-1 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [7K5JBERyyqFFxIXSXYluJA, AA, AA], > partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition > change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, > isr=null, leader=-2, replicas=null, removingReplicas=null, > addingReplicas=null, leaderRecoveryState=-1, > directories=[7K5JBERyyqFFxIXSXYluJA, AA, > AA], eligibleLeaderReplicas=null, lastKnownELR=null) for > topic quickstart-events3 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA,
Re: [PR] KAFKA-16215; KAFKA-16178: Fix member not rejoining after error [kafka]
AndrewJSchofield commented on code in PR #15311: URL: https://github.com/apache/kafka/pull/15311#discussion_r1484254595 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -397,13 +400,17 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, membershipManager.memberId(), membershipManager.memberEpoch()); logInfo(message, response, currentTimeMs); membershipManager.transitionToFenced(); +// Skip backoff so that a next HB to rejoin is sent as soon as the fenced member releases it assignment Review Comment: nit: "its" assignment. And below also -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14782: Implementation Details Different from Documentation (del… [kafka]
whiteyesx closed pull request #13721: KAFKA-14782: Implementation Details Different from Documentation (del… URL: https://github.com/apache/kafka/pull/13721 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14616) Topic recreation with offline broker causes permanent URPs
[ https://issues.apache.org/jira/browse/KAFKA-14616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim resolved KAFKA-14616. --- Fix Version/s: 3.7.0 Assignee: Colin McCabe Resolution: Fixed > Topic recreation with offline broker causes permanent URPs > -- > > Key: KAFKA-14616 > URL: https://issues.apache.org/jira/browse/KAFKA-14616 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: Omnia Ibrahim >Assignee: Colin McCabe >Priority: Major > Fix For: 3.7.0 > > > We are facing an odd situation when we delete and recreate a topic while > broker is offline in KRAFT mode. > Here’s what we saw step by step > # Created topic {{foo.test}} with 10 partitions and 4 replicas — Topic > {{foo.test}} was created with topic ID {{MfuZbwdmSMaiSa0g6__TPg}} > # Took broker 4 offline — which held replicas for partitions {{0, 3, 4, 5, > 7, 8, 9}} > # Deleted topic {{foo.test}} — The deletion process was successful, despite > the fact that broker 4 still held replicas for partitions {{0, 3, 4, 5, 7, 8, > 9}} on local disk. > # Recreated topic {{foo.test}} with 10 partitions and 4 replicas. — Topic > {{foo.test}} was created with topic ID {{RzalpqQ9Q7ub2M2afHxY4Q}} and > partitions {{0, 1, 2, 7, 8, 9}} got assigned to broker 4 (which was still > offline). Notice here that partitions {{0, 7, 8, 9}} are common between the > assignment of the deleted topic ({{{}topic_id: MfuZbwdmSMaiSa0g6__TPg{}}}) > and the recreated topic ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}). > # Brough broker 4 back online. > # Broker started to create new partition replicas for the recreated topic > {{foo.test}} ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}) > # The broker hit the following error {{Tried to assign topic ID > RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already > contained topic ID MfuZbwdmSMaiSa0g6__TPg}} . As a result of this error the > broker decided to rename log dir for partitions {{0, 3, 4, 5, 7, 8, 9}} to > {{{}-.-delete{}}}. > # Ran {{ls }} > {code:java} > foo.test-0.658f87fb9a2e42a590b5d7dcc28862b5-delete/ > foo.test-1/ > foo.test-2/ > foo.test-3.a68f05d05bcc4e579087551b539af311-delete/ > foo.test-4.79ce30a5310d4950ad1b28f226f74895-delete/ > foo.test-5.76ed04da75bf46c3a63342be1eb44450-delete/ > foo.test-6/ > foo.test-7.c2d33db3bf844e9ebbcd9ef22f5270da-delete/ > foo.test-8.33836969ac714b41b69b5334a5068ce0-delete/ > foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/{code} > 9. Waited until the deletion of the old topic was done and ran {{ls > }} again, now we were expecting to see log dir for partitions > {{0, 1, 2, 7, 8, 9}} however the result is: > {code:java} > foo.test-1/ > foo.test-2/ > foo.test-6/{code} > 10. Ran {{kafka-topics.sh --command-config cmd.properties > --bootstrap-server --describe --topic foo.test}} > {code:java} > Topic: foo.test TopicId: RzalpqQ9Q7ub2M2afHxY4Q PartitionCount: 10 > ReplicationFactor: 4 Configs: > min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=3145728,unclean.leader.election.enable=false,retention.bytes=10 > Topic: foo.test Partition: 0 Leader: 2 Replicas: 2,3,4,5 Isr: 2,3,5 > Topic: foo.test Partition: 1 Leader: 3 Replicas: 3,4,5,6 Isr: 3,5,6,4 > Topic: foo.test Partition: 2 Leader: 5 Replicas: 5,4,6,1 Isr: 5,6,1,4 > Topic: foo.test Partition: 3 Leader: 5 Replicas: 5,6,1,2 Isr: 5,6,1,2 > Topic: foo.test Partition: 4 Leader: 6 Replicas: 6,1,2,3 Isr: 6,1,2,3 > Topic: foo.test Partition: 5 Leader: 1 Replicas: 1,6,2,5 Isr: 1,6,2,5 > Topic: foo.test Partition: 6 Leader: 6 Replicas: 6,2,5,4 Isr: 6,2,5,4 > Topic: foo.test Partition: 7 Leader: 2 Replicas: 2,5,4,3 Isr: 2,5,3 > Topic: foo.test Partition: 8 Leader: 5 Replicas: 5,4,3,1 Isr: 5,3,1 > Topic: foo.test Partition: 9 Leader: 3 Replicas: 3,4,1,6 Isr: 3,1,6{code} > Here’s a sample of broker logs > > {code:java} > {"timestamp":"2023-01-11T15:19:53,620Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted > log for partition foo.test-9 in > /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete.","logger":"kafka.log.LogManager"} > {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted > time index > /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.timeindex.deleted.","logger":"kafka.log.LogSegment"} > {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted > offset index > /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.index.deleted.","logger":"kafka.log.LogSegment"} >
[jira] [Commented] (KAFKA-14616) Topic recreation with offline broker causes permanent URPs
[ https://issues.apache.org/jira/browse/KAFKA-14616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816034#comment-17816034 ] Omnia Ibrahim commented on KAFKA-14616: --- I'll mark this as resolved as [~cmccabe] committed the pr. > Topic recreation with offline broker causes permanent URPs > -- > > Key: KAFKA-14616 > URL: https://issues.apache.org/jira/browse/KAFKA-14616 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: Omnia Ibrahim >Priority: Major > > We are facing an odd situation when we delete and recreate a topic while > broker is offline in KRAFT mode. > Here’s what we saw step by step > # Created topic {{foo.test}} with 10 partitions and 4 replicas — Topic > {{foo.test}} was created with topic ID {{MfuZbwdmSMaiSa0g6__TPg}} > # Took broker 4 offline — which held replicas for partitions {{0, 3, 4, 5, > 7, 8, 9}} > # Deleted topic {{foo.test}} — The deletion process was successful, despite > the fact that broker 4 still held replicas for partitions {{0, 3, 4, 5, 7, 8, > 9}} on local disk. > # Recreated topic {{foo.test}} with 10 partitions and 4 replicas. — Topic > {{foo.test}} was created with topic ID {{RzalpqQ9Q7ub2M2afHxY4Q}} and > partitions {{0, 1, 2, 7, 8, 9}} got assigned to broker 4 (which was still > offline). Notice here that partitions {{0, 7, 8, 9}} are common between the > assignment of the deleted topic ({{{}topic_id: MfuZbwdmSMaiSa0g6__TPg{}}}) > and the recreated topic ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}). > # Brough broker 4 back online. > # Broker started to create new partition replicas for the recreated topic > {{foo.test}} ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}) > # The broker hit the following error {{Tried to assign topic ID > RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already > contained topic ID MfuZbwdmSMaiSa0g6__TPg}} . As a result of this error the > broker decided to rename log dir for partitions {{0, 3, 4, 5, 7, 8, 9}} to > {{{}-.-delete{}}}. > # Ran {{ls }} > {code:java} > foo.test-0.658f87fb9a2e42a590b5d7dcc28862b5-delete/ > foo.test-1/ > foo.test-2/ > foo.test-3.a68f05d05bcc4e579087551b539af311-delete/ > foo.test-4.79ce30a5310d4950ad1b28f226f74895-delete/ > foo.test-5.76ed04da75bf46c3a63342be1eb44450-delete/ > foo.test-6/ > foo.test-7.c2d33db3bf844e9ebbcd9ef22f5270da-delete/ > foo.test-8.33836969ac714b41b69b5334a5068ce0-delete/ > foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/{code} > 9. Waited until the deletion of the old topic was done and ran {{ls > }} again, now we were expecting to see log dir for partitions > {{0, 1, 2, 7, 8, 9}} however the result is: > {code:java} > foo.test-1/ > foo.test-2/ > foo.test-6/{code} > 10. Ran {{kafka-topics.sh --command-config cmd.properties > --bootstrap-server --describe --topic foo.test}} > {code:java} > Topic: foo.test TopicId: RzalpqQ9Q7ub2M2afHxY4Q PartitionCount: 10 > ReplicationFactor: 4 Configs: > min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=3145728,unclean.leader.election.enable=false,retention.bytes=10 > Topic: foo.test Partition: 0 Leader: 2 Replicas: 2,3,4,5 Isr: 2,3,5 > Topic: foo.test Partition: 1 Leader: 3 Replicas: 3,4,5,6 Isr: 3,5,6,4 > Topic: foo.test Partition: 2 Leader: 5 Replicas: 5,4,6,1 Isr: 5,6,1,4 > Topic: foo.test Partition: 3 Leader: 5 Replicas: 5,6,1,2 Isr: 5,6,1,2 > Topic: foo.test Partition: 4 Leader: 6 Replicas: 6,1,2,3 Isr: 6,1,2,3 > Topic: foo.test Partition: 5 Leader: 1 Replicas: 1,6,2,5 Isr: 1,6,2,5 > Topic: foo.test Partition: 6 Leader: 6 Replicas: 6,2,5,4 Isr: 6,2,5,4 > Topic: foo.test Partition: 7 Leader: 2 Replicas: 2,5,4,3 Isr: 2,5,3 > Topic: foo.test Partition: 8 Leader: 5 Replicas: 5,4,3,1 Isr: 5,3,1 > Topic: foo.test Partition: 9 Leader: 3 Replicas: 3,4,1,6 Isr: 3,1,6{code} > Here’s a sample of broker logs > > {code:java} > {"timestamp":"2023-01-11T15:19:53,620Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted > log for partition foo.test-9 in > /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete.","logger":"kafka.log.LogManager"} > {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted > time index > /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.timeindex.deleted.","logger":"kafka.log.LogSegment"} > {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted > offset index > /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.index.deleted.","logger":"kafka.log.LogSegment"} > {"timestamp":"2023-01-11T15:19:53,615Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted > log >
Re: [PR] KAFKA-7504 Mitigate sendfile(2) blocking in network threads by waming up fetch data [kafka]
divijvaidya commented on code in PR #14289: URL: https://github.com/apache/kafka/pull/14289#discussion_r1484189476 ## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ## @@ -421,6 +446,18 @@ private AbstractIterator batchIterator(int start) { return new RecordBatchIterator<>(inputStream); } +/** + * Try populating OS page cache with file content + */ +public void prepareForRead() throws IOException { +if (DEVNULL_PATH != null) { +long size = Math.min(channel.size(), end) - start; Review Comment: Isn't `size` member in FileRecords precomputed for FileRecords during construction and maintained whenever there is more mutation? Hence, we can simply use `this.size` here. No? ## storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java: ## @@ -39,19 +40,40 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, Records records, boolean firstEntryIncomplete, Optional> abortedTransactions) { -this(fetchOffsetMetadata, records, firstEntryIncomplete, abortedTransactions, Optional.empty()); +this(fetchOffsetMetadata, + records, + firstEntryIncomplete, + abortedTransactions, + Optional.empty(), + LogOffsetMetadata.UNKNOWN_OFFSET_METADATA); } public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, Records records, boolean firstEntryIncomplete, Optional> abortedTransactions, - Optional delayedRemoteStorageFetch) { + Optional delayedRemoteStorageFetch, + LogOffsetMetadata maxOffsetMetadata) { this.fetchOffsetMetadata = fetchOffsetMetadata; this.records = records; this.firstEntryIncomplete = firstEntryIncomplete; this.abortedTransactions = abortedTransactions; this.delayedRemoteStorageFetch = delayedRemoteStorageFetch; +this.maxOffsetMetadata = maxOffsetMetadata; +} + +public FetchDataInfo withMaxOffsetMetadata(LogOffsetMetadata maxOffsetMetadata) { +return new FetchDataInfo(fetchOffsetMetadata, + records, + firstEntryIncomplete, + abortedTransactions, + delayedRemoteStorageFetch, + maxOffsetMetadata); +} + +public boolean isLastSegment() { Review Comment: last segment is always the active segment. Shall we call it isActiveSegment()? Also, FetchDataInfo is not a property of a segment, hence isActiveSegment() sounds counter intuitive. But it also cannot have data spanning across multiple segments. So, perhaps rename the function to, isFetchDataFromActiveSegment() ## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ## @@ -421,6 +446,18 @@ private AbstractIterator batchIterator(int start) { return new RecordBatchIterator<>(inputStream); } +/** + * Try populating OS page cache with file content + */ +public void prepareForRead() throws IOException { +if (DEVNULL_PATH != null) { +long size = Math.min(channel.size(), end) - start; +try (FileChannel devnullChannel = FileChannel.open(DEVNULL_PATH, StandardOpenOption.WRITE)) { +channel.transferTo(start, size, devnullChannel); Review Comment: do we want to pre-populate the entire content represented by the FileRecords or just the one that will be read by writeTo() method? For example, we are pre-populating all content represented by FileRecords here but we might end up reading only a part of it in writeTo() (see length param in writeTo()). Can we instead stick to a pre-populating a smaller chunk, perhaps 32KB which is the chunk size used by SSL transport layer while reading from the file. ## storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java: ## @@ -39,19 +40,40 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, Records records, boolean firstEntryIncomplete, Optional> abortedTransactions) { -this(fetchOffsetMetadata, records, firstEntryIncomplete, abortedTransactions, Optional.empty()); +this(fetchOffsetMetadata, + records, + firstEntryIncomplete, + abortedTransactions, + Optional.empty(), + LogOffsetMetadata.UNKNOWN_OFFSET_METADATA); } public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, Records records, boolean firstEntryIncomplete,
Re: [PR] KAFKA-13403 Fix KafkaServer crashes when deleting topics due to the race in log deletion [kafka]
divijvaidya commented on PR #11438: URL: https://github.com/apache/kafka/pull/11438#issuecomment-1935743682 We have a test failing for this PR which has not failed in trunk for last 28 days [2]. @arunmathew88 can you please debug why this PR is causing that test to fail. [1] https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11438/6/ [2] https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=trunk=Europe%2FBerlin=kafka.log.remote.RemoteIndexCacheTest=testIndexFileAlreadyExistOnDiskButNotInCache() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve Code Style [kafka]
divijvaidya merged PR #15319: URL: https://github.com/apache/kafka/pull/15319 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: ZKAcl Migration Client Test Problem failing in Scala version 2.12 [kafka]
divijvaidya closed pull request #15342: MINOR: ZKAcl Migration Client Test Problem failing in Scala version 2.12 URL: https://github.com/apache/kafka/pull/15342 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]
divijvaidya merged PR #15239: URL: https://github.com/apache/kafka/pull/15239 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: set test global timeout as 10 mins [kafka]
gaurav-narula commented on code in PR #15065: URL: https://github.com/apache/kafka/pull/15065#discussion_r1484162204 ## clients/src/test/resources/junit-platform.properties: ## @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. junit.jupiter.params.displayname.default = "{displayName}.{argumentsWithNames}" +junit.jupiter.execution.timeout.default = 10 m Review Comment: Also stumbled upon [Thread Mode](https://github.com/junit-team/junit5/pull/2949) which avoids pitfalls where the test code [doesn't interrupt](https://github.com/junit-team/junit5/issues/2087). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: ZKAcl Migration Client Test Problem failing in Scala version 2.12 [kafka]
divijvaidya commented on PR #15342: URL: https://github.com/apache/kafka/pull/15342#issuecomment-1935712826 Thank you for looking into this @highluck but this seems to have already been fixed by https://github.com/apache/kafka/pull/15343 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper
Divij Vaidya created KAFKA-16239: Summary: Clean up references to non-existent IntegrationTestHelper Key: KAFKA-16239 URL: https://issues.apache.org/jira/browse/KAFKA-16239 Project: Kafka Issue Type: Improvement Reporter: Divij Vaidya A bunch of places in the code javadocs and READ docs refer to a class called IntegrationTestHelper. Such a class does not exist. This task will clean up all referenced to IntegrationTestHelper from Kafka code base. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]
jlprat commented on PR #15239: URL: https://github.com/apache/kafka/pull/15239#issuecomment-1935630910 https://issues.apache.org/jira/browse/KAFKA-12895?focusedCommentId=17365016=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17365016 This could be done when KIP-751 is done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]
jlprat commented on PR #15239: URL: https://github.com/apache/kafka/pull/15239#issuecomment-1935623714 > I've tried that and thought it worked because of a silly mistake that I made. But core actually needs it, so it needs to stay. Correct, `core` needs the `compat` library as it works for both 2.12 and 2.13. Once 2.12 support is removed (in 4.0) we can refactor the 2.13 core and remove the `compat` library. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16238: Fix ConnectRestApiTest system test [kafka]
mimaison opened a new pull request, #15346: URL: https://github.com/apache/kafka/pull/15346 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16238) ConnectRestApiTest broken after KIP-1004
Mickael Maison created KAFKA-16238: -- Summary: ConnectRestApiTest broken after KIP-1004 Key: KAFKA-16238 URL: https://issues.apache.org/jira/browse/KAFKA-16238 Project: Kafka Issue Type: Improvement Components: connect Reporter: Mickael Maison Assignee: Mickael Maison KIP-1004 introduced a new configuration for connectors: 'tasks.max.enforce'. The ConnectRestApiTest system test needs to be updated to expect the new configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]
dajac commented on code in PR #15275: URL: https://github.com/apache/kafka/pull/15275#discussion_r1484072806 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1392,4 +1356,16 @@ public void registerStateListener(MemberStateListener listener) { } this.stateUpdatesListeners.add(listener); } + +/** + * If either a new target assignment or new metadata is available that we have not yet attempted + * to reconcile, and we are currently in state RECONCILING, trigger reconciliation. + */ +@Override +public PollResult poll(final long currentTimeMs) { +if (state == MemberState.RECONCILING) { +maybeReconcile(); Review Comment: I have one remaining question for my understanding. If there is unresolved topic ids, is it going to request a metadata update (in `findResolvableAssignmentAndTriggerMetadataUpdate`) every time that `poll` is called? I believe that it won't be the case because the `Metadata` class will request a full update only once. Is my understanding correct? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1521,6 +1660,18 @@ private ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker() { ); } +private SortedSet topicPartitions(Map> topicIdMap, Map topicIdNames) { Review Comment: nit: Could it be static? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
dajac commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1484046178 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -2811,6 +2811,72 @@ public void testListConsumerGroupsWithStates() throws Exception { } } +@Test +public void testListConsumerGroupsWithTypes() throws Exception { +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +// Test with a specific state filter but no type filter in list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + +env.kafkaClient().prepareResponseFrom( +request -> request instanceof ListGroupsRequest && +!((ListGroupsRequest) request).data().statesFilter().isEmpty() && +((ListGroupsRequest) request).data().typesFilter().isEmpty(), Review Comment: nit: It would be better to be precise here and validate if the states are actually the correct one. This also applies to the other matchers. `expectCreateTopicsRequestWithTopics` may be a good inspiration for the structure. ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -18,74 +18,196 @@ package kafka.admin import joptsimple.OptionException import org.junit.jupiter.api.Assertions._ -import kafka.utils.TestUtils -import org.apache.kafka.common.ConsumerGroupState +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.ConsumerGroupListing -import java.util.Optional - +import org.apache.kafka.common.{ConsumerGroupState, GroupType} +import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.MethodSource + +import java.util.Optional class ListConsumerGroupTest extends ConsumerGroupCommandTest { - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroups(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" +val protocolGroup = "protocol-group" + +createOffsetsTopic() + addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) +addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list") val service = getConsumerGroupService(cgcArgs) -val expectedGroups = Set(group, simpleGroup) +val expectedGroups = Set(protocolGroup, group, simpleGroup) var foundGroups = Set.empty[String] TestUtils.waitUntilTrue(() => { foundGroups = service.listConsumerGroups().toSet expectedGroups == foundGroups }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @Test def testListWithUnrecognizedNewConsumerOption(): Unit = { val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list") assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs)) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListConsumerGroupsWithStates(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state") val service = getConsumerGroupService(cgcArgs) +var expectedListing = Set( + new ConsumerGroupListing( +simpleGroup, +true, +Optional.of(ConsumerGroupState.EMPTY), +Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( +group, +false, +Optional.of(ConsumerGroupState.STABLE), +Optional.of(GroupType.CLASSIC) + ) +) + +testWaitUntilTrue(Set.empty, ConsumerGroupState.values.toSet, expectedListing, service) Review Comment: I like it but the name could be better. How about `assertGroupListing` or something like this? ## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ## @@ -189,16 +197,65 @@ object
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1935587072 Accidently closed the PR, reopening again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR fix scala compile issue [kafka]
mberndt123 commented on PR #15343: URL: https://github.com/apache/kafka/pull/15343#issuecomment-1935582325 odd 路♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org