Re: [PR] KAFKA-12216; Improve flaky test reporting [kafka]

2024-02-09 Thread via GitHub


dajac commented on PR #14862:
URL: https://github.com/apache/kafka/pull/14862#issuecomment-1936915696

   I need to update the PR to only apply this while building PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-09 Thread via GitHub


dajac commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936915364

   @gharris1727 I see. So if I understand correctly, two tests are flaky in the 
suite so we move the entire suite. This does not seem right to me. This 
particular suite is the one that you want to run when doing changes in the 
admin client.
   
   I have been working on https://github.com/apache/kafka/pull/14862 which 
changes how we report flaky tests in Jenkins. I believe that this is a better 
was to handle them. What do you think?
   
   However, I do agree with marking the slow tests as integration tests. I 
suppose that they should have been integration tests anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-09 Thread via GitHub


gharris1727 commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936910131

   @dajac No, those were added because they were flaky: 
https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.admin.KafkaAdminClientTest=FLAKY
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-09 Thread via GitHub


dajac commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936903652

   @gharris1727 Thanks for looking into this. Overall, I like the idea. 
However, I see many suites that are unit tests and therefore we must be kept as 
unit tests (e.g. admin client test). Are those recategorized because there were 
slow?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-09 Thread via GitHub


gharris1727 commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936821280

   @mumrah I used this page: 
https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=FLAKY
and sorted by Flaky, Failed, and Mean Execution Time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark slowest and most flaky tests as integration tests [kafka]

2024-02-09 Thread via GitHub


mumrah commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936811537

   This is great, @gharris1727! How did you come up with the list of tests to 
mark as integration?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-09 Thread via GitHub


jolshan merged PR #15324:
URL: https://github.com/apache/kafka/pull/15324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]

2024-02-09 Thread via GitHub


jolshan commented on PR #15262:
URL: https://github.com/apache/kafka/pull/15262#issuecomment-1936789321

   The issues started for 3.7 on the same day so it is one of the 3 commits 
backported feb 2 
   
https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=3.7=kafka.server.LogDirFailureTest
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16225) Flaky test suite LogDirFailureTest

2024-02-09 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816260#comment-17816260
 ] 

Justine Olshan commented on KAFKA-16225:


Looks like it happened on 3.7 the same day which narrows it down to 3 commits?
https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=3.7=kafka.server.LogDirFailureTest

> Flaky test suite LogDirFailureTest
> --
>
> Key: KAFKA-16225
> URL: https://issues.apache.org/jira/browse/KAFKA-16225
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> I see this failure on trunk and in PR builds for multiple methods in this 
> test suite:
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was:     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>     
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)    
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)    
> at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715)    
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186)
>     
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat}
> It appears this assertion is failing
> [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715]
> The other error which is appearing is this:
> {noformat}
> org.opentest4j.AssertionFailedError: Unexpected exception type thrown, 
> expected:  but was: 
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67)    
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)    
> at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)    
> at 
> kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164)
>     
> at 
> kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat}
> Failures appear to have started in this commit, but this does not indicate 
> that this commit is at fault: 
> [https://github.com/apache/kafka/tree/3d95a69a28c2d16e96618cfa9a1eb69180fb66ea]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]

2024-02-09 Thread via GitHub


jolshan commented on PR #15262:
URL: https://github.com/apache/kafka/pull/15262#issuecomment-1936787524

   Hey folks. We've seen a large increase in LogDirFailureTest after this PR. 
Can we take a look and see if something here caused it?
   
   gradle enterprise: 
https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest
   jira: https://issues.apache.org/jira/browse/KAFKA-16225


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark tests with the most flaky failures as integration tests [kafka]

2024-02-09 Thread via GitHub


gharris1727 commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936785675

   We did save a little time, but not really too much because the build tends 
to spend a lot of time waiting for compileTestScala to complete :)
   
   `./gradlew clean test` is 45 minutes on my machine, 
   `./gradlew clean unitTest` was ~10m, and now is ~8m. 
   If the compilation is already finished, `./gradlew unitTest` is ~5m.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16225) Flaky test suite LogDirFailureTest

2024-02-09 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816258#comment-17816258
 ] 

Justine Olshan commented on KAFKA-16225:


I'm encountering this too. It looks pretty bad

https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest

> Flaky test suite LogDirFailureTest
> --
>
> Key: KAFKA-16225
> URL: https://issues.apache.org/jira/browse/KAFKA-16225
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> I see this failure on trunk and in PR builds for multiple methods in this 
> test suite:
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was:     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>     
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)    
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)    
> at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715)    
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186)
>     
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat}
> It appears this assertion is failing
> [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715]
> The other error which is appearing is this:
> {noformat}
> org.opentest4j.AssertionFailedError: Unexpected exception type thrown, 
> expected:  but was: 
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67)    
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)    
> at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)    
> at 
> kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164)
>     
> at 
> kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat}
> Failures appear to have started in this commit, but this does not indicate 
> that this commit is at fault: 
> [https://github.com/apache/kafka/tree/3d95a69a28c2d16e96618cfa9a1eb69180fb66ea]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] fix syntax error [kafka]

2024-02-09 Thread via GitHub


mberndt123 opened a new pull request, #15350:
URL: https://github.com/apache/kafka/pull/15350

   Seems like a good idea to balance those parens. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark tests with the most flaky failures as integration tests [kafka]

2024-02-09 Thread via GitHub


ijuma commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936765040

   This is very cool. Any chance you could do the same for the top N slowest 
unit tests (they're not unit tests if they're that slow)? Then the `unitTest` 
command would be useful enough to run locally before submitting PRs, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-09 Thread via GitHub


splett2 commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1484905012


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   @CalvinConfluent 
   hmm, interesting. this is an inconsistency between the KRaft implementation 
of reassignment completion and the ZK controller implementation of reassignment 
completion.
   
   In the ZK controller, we only allow a reassignment to be completed if the 
ISR contains all of the target replicas. So in your second example, the ISR 
would need to contain [0, 1, 7, 8].
   
   code reference is in `KafkaController.isReassignmentComplete`:
   ```
   val targetReplicas = assignment.targetReplicas.toSet
   targetReplicas.subsetOf(isr)
   ```
   
   I am wondering whether it was intentional to change the criteria? IMO, the 
ZK controller criteria for completing reassignment makes more sense than the 
existing kcontroller criteria, but requiring at least min ISR replicas to be in 
the ISR (your PR) also seems reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-09 Thread via GitHub


splett2 commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1484905012


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   @CalvinConfluent 
   hmm, interesting. this is an inconsistency in the KRaft implementation of 
reassignment completion and the ZK controller implementation of reassignment 
completion.
   
   In the ZK controller, we only allow a reassignment to be completed if the 
ISR contains all of the target replicas. So in your second example, the ISR 
would need to contain [0, 1, 7, 8].
   
   code reference is in `KafkaController.isReassignmentComplete`:
   ```
   val targetReplicas = assignment.targetReplicas.toSet
   targetReplicas.subsetOf(isr)
   ```
   
   I am wondering whether it was intentional to change the criteria? IMO, the 
ZK controller criteria for completing reassignment makes more sense than the 
existing kcontroller criteria, but requiring at least min ISR replicas to be in 
the ISR (your PR) also seems reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16242: Mark tests with the most flaky failures as integration tests [kafka]

2024-02-09 Thread via GitHub


gharris1727 commented on PR #15349:
URL: https://github.com/apache/kafka/pull/15349#issuecomment-1936736732

   Alright, I got 3 consecutive green builds, and I haven't seen any flaky 
failures locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-09 Thread via GitHub


CalvinConfluent commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1484883905


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   A better example is with min ISR=3
   1. originally replicas [0,1,2,3], ISR[0,1,2,3]
   2. Start reassignment. Target replica set [0,1,7,8]. Temporarily, the 
replica set is [0,1,2,3,7,8], ISR[0,1,2,3].
   3. Replication goes on and some brokers fail and up, and we could experience 
the following transition for ISR. [0,1,2,3] -> [3,7] -> [7] -> [7,8]
   4. At this point, the reassignment can be completed.
   5. In the current world, we can't do much for the above. But in the ELR 
world, we can have something different in step 3.
   
   ISR/ELR: [0,1,2,3]/[] -> [3,7]/[2] -> [7]/[2,3] -> [7,8]/[2,3]. If we 
complete the reassignment now, we loss the potential good replicas [2,3] and 
have a higher risk of data loss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-09 Thread via GitHub


mjsax commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1936706379

   > Merged the code of 
[KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) into this PR.
   
   Why? We are mixing up two ticket if we do this (cf 
https://github.com/apache/kafka/pull/14426#discussion_r1483677544)
   
   Can you remove those changes? Fixing the grace period should be kept 
separate to get different commits for different fixes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps

2024-02-09 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816239#comment-17816239
 ] 

Philip Nee commented on KAFKA-16156:


There seem to be a subtle difference in behavior between the async and the 
legacy consumer.  The legacy one catches the error without doing anything, the 
async client doesn't seem to be handling the exception.  The fix would be easy 
- but I'll run the same test first

> System test failing for new consumer on endOffsets with negative timestamps
> ---
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer, system tests
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid 
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Received ListOffsetResponse 
> ListOffsetsResponseData(throttleTimeMs=0, 
> topics=[ListOffsetsTopicResponse(name='input-topic', 
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, 
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from 
> broker worker2:9092 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse 
> response for input-topic-0. Fetched offset 42804, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Updating last stable offset for 
> partition input-topic-0 to 42804 
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] Fetch offsets completed 
> successfully for partitions and timestamps {input-topic-0=-1}. Result 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
>  (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer 
> clientId=consumer-transactions-test-consumer-group-1, 
> groupId=transactions-test-consumer-group] No events to process 
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event 
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
>   at 
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
>   at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
>   at 
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
>   at 
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
>   at 
> 

[jira] [Commented] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816238#comment-17816238
 ] 

Matthias J. Sax commented on KAFKA-16241:
-

{quote} Detected out-of-order KTable update
{quote}
I would hope that versioned store by default would fix this, but there is no 
real timeline for this... (we could also add a sensor for it and remove the log 
line – we did discuss this in the past with no resolution... not sure...)
{quote}Detected that shutdown was requested. All clients in this app will now 
begin to shutdown
{quote}
Interesting – I did not look into the logs in detail but just collected them – 
it on the main processing loop so no wonder we this this so often; guess 
`taskManager.rebalanceInProgress()` just return `true` for a longer period of 
while (I would assume that `isRunning()` is already `false`) – maybe just a 
simple static boolean flag to execute `maybeSendShutdown();` only once might be 
sufficient to address this?

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip, streams-3.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816234#comment-17816234
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16241:


Off-topic but I opened the logs for a second and there is a huge amount of 
spam. I know everyone is probably aware of the "Detected out-of-order KTable 
update" logspam by now, but there was over 100MB from that one log message 
alone (for a single node)...yikes

Something that's new, or new to me at least, is this:
{code:java}
[2024-02-09 03:23:55,168] WARN [i-0fede2697f39580f9-StreamThread-1] 
stream-thread [i-0fede2697f39580f9-StreamThread-1] Detected that shutdown was 
requested. All clients in this app will now begin to shutdown 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2024-02-09 03:23:55,168] INFO [i-0fede2697f39580f9-StreamThread-1] [Consumer 
instanceId=ip-172-31-14-207-1, 
clientId=i-0fede2697f39580f9-StreamThread-1-consumer, groupId=stream-soak-test] 
Request joining group due to: Shutdown requested 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) {code}
Those two lines are logged repeatedly (and excessively) in a very tight loop at 
the end of the logs. Filtering those two lines out dropped the file size by 
another 200MB...that's a lot!! And it's all from about 10 seconds of real time. 
So it's a VERY busy loop...

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip, streams-3.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-09 Thread via GitHub


CalvinConfluent commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1484850347


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   I think it is more like a design issue/bug. The following example is valid 
currently:
   An existing replica set is [0,1,2], and a new partition assignment is 
[0,1,3]. So the "adding" replica is 3. The controller can finish the 
reassignment if the current ISR includes all the "adding" replicas like [3]. 
   This PR prevents completing such reassignment if min ISR is 2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-09 Thread via GitHub


CalvinConfluent commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1484850347


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   I think it is more like a design issue/bug. The following example is valid 
currently:
   An existing replica set is [0,1,2], and a new partition assignment is 
[0,1,3]. So the "adding" replica is 3. The controller can finish the 
reassignment if the ISR if the current ISR is [3]. 
   This PR prevents completing such reassignment if min ISR is 2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16202: Extra dot in error message in producer [kafka]

2024-02-09 Thread via GitHub


infantlikesprogramming commented on code in PR #15296:
URL: https://github.com/apache/kafka/pull/15296#discussion_r1484843384


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -704,7 +704,7 @@ private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionRespons
 "topic-partition may not exist or the user may not 
have Describe access to it",
 batch.topicPartition);
 } else {
-log.warn("Received invalid metadata error in produce 
request on partition {} due to {}. Going " +
+log.warn("Received invalid metadata error in produce 
request on partition {} due to {} Going " +
 "to request metadata update now", 
batch.topicPartition,

Review Comment:
   Thank you so much for your comment. I have made changes based on your review 
in the latest (4th) commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15665: Enforce min ISR when complete partition reassignment. [kafka]

2024-02-09 Thread via GitHub


splett2 commented on code in PR #14604:
URL: https://github.com/apache/kafka/pull/14604#discussion_r1484842081


##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   I haven't been following along the ELR KIP closely so I may be missing some 
of the details. 
   My understanding is that a reassignment is only completed once the target 
replicas are all in the ISR. Isn't it better to just reject the initiation of a 
reassignment where the target replica count is less than the configured ISR?
   
   From what I can tell, if a user initiates a reassignment where the target 
replica count < ISR, the reassignment will just get stuck indefinitely.



##
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java:
##
@@ -124,6 +124,8 @@ Optional 
maybeCompleteReassignment(List targetIs
 if (!newTargetIsr.contains(replica)) return Optional.empty();
 }
 
+if (newTargetIsr.size() < Math.min(minIsrCount, 
newTargetReplicas.size())) return Optional.empty();

Review Comment:
   I haven't been following along the ELR KIP closely so I may be missing some 
of the details. 
   My understanding is that a reassignment is only completed once the target 
replicas are all in the ISR. Isn't it better to just reject the initiation of a 
reassignment where the target replica count is less than the configured ISR?
   
   From what I can tell, if a user initiates a reassignment where the target 
replica count < min ISR count, the reassignment will just get stuck 
indefinitely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-09 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1484643624


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -127,6 +127,13 @@ public synchronized Cluster fetch() {
 return cache.cluster();
 }
 
+/**
+ * Get the current metadata cache.
+ */
+public synchronized MetadataCache fetchCache() {

Review Comment:
   Perhaps we could make `cache` volatile and avoid the synchronization?



##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -39,8 +39,9 @@
 import java.util.stream.Collectors;
 
 /**
- * An internal mutable cache of nodes, topics, and partitions in the Kafka 
cluster. This keeps an up-to-date Cluster
+ * An internal immutable cache of nodes, topics, and partitions in the Kafka 
cluster. This keeps an up-to-date Cluster
  * instance which is optimized for read access.
+ * Prefer to extend MetadataCache's API for internal client usage Vs the 
public {@link Cluster}
  */
 public class MetadataCache {

Review Comment:
   We don't have to do it here, but "snapshot" might be a better name since it 
suggests immutability.



##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -113,14 +116,28 @@ Optional nodeById(int id) {
 return Optional.ofNullable(nodes.get(id));
 }
 
-Cluster cluster() {
+public Cluster cluster() {
 if (clusterInstance == null) {
 throw new IllegalStateException("Cached Cluster instance should 
not be null, but was.");
 } else {
 return clusterInstance;
 }
 }
 
+/**
+ * Get leader-epoch for partition.
+ * @param tp partition
+ * @return leader-epoch if known, else return optional.empty()
+ */
+public Optional leaderEpochFor(TopicPartition tp) {

Review Comment:
   Would it make sense to use `OptionalInt`?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
  * @param latestLeaderEpoch latest leader's epoch.
  */
 void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) {
-if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+if (latestLeaderEpoch.isPresent()

Review Comment:
   Hmm, this looks like a change in behavior. Previously we would override a 
known current leader epoch if `latestLeaderEpoch` is not defined. I am not sure 
if that was intentional. I recall there we had some logic which assumed that 
metadata version might be downgraded and no longer provide a leader epoch. Not 
sure it matters here though since we are not changing the leader, just updating 
the epoch.



##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -123,6 +126,34 @@ public static Cluster clusterWith(final int nodes, final 
String topic, final int
 return clusterWith(nodes, Collections.singletonMap(topic, partitions));
 }
 
+public static MetadataCache metadataCacheWith(final int nodes, final 
Map topicPartitionCounts) {

Review Comment:
   Useful to have a brief javadoc to explain replica counts and assignment 
strategy.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -879,17 +884,12 @@ private List 
drainBatchesForOneNode(Metadata metadata, Node node,
 // Only proceed if the partition has no in-flight batches.
 if (isMuted(tp))
 continue;
-Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
-// Although a small chance, but skip this partition if leader has 
changed since the partition -> node assignment obtained from outside the loop.

Review Comment:
   I guess this race is no longer possible since we are using the snapshot, 
which is immutable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16242: Mark tests with the most flaky failures as integration tests [kafka]

2024-02-09 Thread via GitHub


gharris1727 opened a new pull request, #15349:
URL: https://github.com/apache/kafka/pull/15349

   These tests have had at least one flaky test failure in the last 28 days, 
and aren't already marked as integration tests.
   
   I looked at the top-50 flakiest tests in the project with gradle enterprise, 
and then spot-checked the top-50 flakiest tests in core, connect, tools, and 
streams.
   
   I've also temporarily changed the Jenkinsfile to only run the unit tests to 
get an idea of the runtime and success rate of the unit tests in Jenkins.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-09 Thread via GitHub


OmniaGM commented on code in PR #15348:
URL: https://github.com/apache/kafka/pull/15348#discussion_r1484726099


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##
@@ -0,0 +1,1485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.classic.ClassicGroup;
+import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.opentest4j.AssertionFailedError;
+
+import java.net.InetAddress;
+import 

Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-09 Thread via GitHub


OmniaGM commented on code in PR #15348:
URL: https://github.com/apache/kafka/pull/15348#discussion_r1484726099


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##
@@ -0,0 +1,1485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.classic.ClassicGroup;
+import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.opentest4j.AssertionFailedError;
+
+import java.net.InetAddress;
+import 

Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-09 Thread via GitHub


jeqo commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1936481385

   @jolshan thanks for catching this! adding it now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16241:

Affects Version/s: 3.6.1

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip, streams-3.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16241:

Attachment: streams-3.zip

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip, streams-3.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-09 Thread via GitHub


jolshan commented on code in PR #15348:
URL: https://github.com/apache/kafka/pull/15348#discussion_r1484696012


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##
@@ -0,0 +1,1485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.classic.ClassicGroup;
+import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.opentest4j.AssertionFailedError;
+
+import java.net.InetAddress;
+import 

[jira] [Created] (KAFKA-16242) Mark flaky tests as integration tests

2024-02-09 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16242:
---

 Summary: Mark flaky tests as integration tests
 Key: KAFKA-16242
 URL: https://issues.apache.org/jira/browse/KAFKA-16242
 Project: Kafka
  Issue Type: Test
  Components: unit tests
Reporter: Greg Harris
Assignee: Greg Harris


It would be valuable to have the `unitTest` gradle target run only reliable 
tests that can be used for CI gating. If we push all of the flaky or 
possibly-flaky tests to the integrationTest target, we can start gating CI on 
deterministic failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16241:

Attachment: streams-2.zip

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-09 Thread via GitHub


jolshan commented on code in PR #15348:
URL: https://github.com/apache/kafka/pull/15348#discussion_r1484689761


##
checkstyle/suppressions.xml:
##
@@ -336,11 +336,11 @@
 
 
+  
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/>

Review Comment:
   what's your take on in file suppressions vs suppressions.xml?
   
   On a previous PR I was asked to make the supppression in file for specific 
methods: https://github.com/apache/kafka/pull/15139#discussion_r1451075038



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16241:

Attachment: streams-1.zip

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16241:
---

 Summary: Kafka Streams hits IllegalStateException trying to 
recycle a task
 Key: KAFKA-16241
 URL: https://issues.apache.org/jira/browse/KAFKA-16241
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Running with EOS-v2 (not sure if relevant or not) and hitting:
{code:java}
[2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
cleanly. Attempting to close remaining tasks before re-throwing: 
(org.apache.kafka.streams.processor.internals.TaskManager)
java.lang.IllegalStateException: Illegal state RESTORING while recycling active 
task 1_0
    at 
org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
    at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
    at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 {code}
Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]

2024-02-09 Thread via GitHub


mimaison commented on code in PR #15274:
URL: https://github.com/apache/kafka/pull/15274#discussion_r1484652868


##
tests/kafkatest/services/console_consumer.py:
##
@@ -21,7 +21,7 @@
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
+from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, 
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7

Review Comment:
   That seems to work. However while running the system tests I found an issue 
loading configs via `--consumer.config`. I pushed 
https://github.com/apache/kafka/pull/15274/commits/c6355fe630a7eae8ea55c8806c66b04cf389bfa2
 to fix it.
   
   I've kicked another run on our system tests CI, hopefully it will be clean 
now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-09 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1484638827


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -52,7 +52,7 @@ public class MetadataCache {
 private final Map metadataByPartition;
 private final Map topicIds;
 private final Map topicNames;
-private Cluster clusterInstance;
+private InternalCluster clusterInstance;

Review Comment:
   > We should confirm though.
   
   I don't see it being used mutably in code. I see historically, it was made 
mutable to support deletion/updates within cache, but the deletion/update code 
has since been removed. As far i can see, read-only semantic. So i have treated 
`MetadataCache` as immutable cache, made its internal data structures 
unmodifiable and updated the javadoc.
   
   All clients test pass locally, hopefully Jenkins signal is green too 爛 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-09 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1484632678


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -111,37 +233,138 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
 assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListGroupCommand(quorum: String): Unit = {
+  @Test
+  def testConsumerGroupTypesFromString(): Unit = {
+var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+assertEquals(Set(GroupType.CONSUMER), result)
+
+result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, 
classic")
+assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, 
Classic")
+assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("  bad, generic"))
+
+assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("   ,   ,"))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testListGroupCommandClassicProtocol(quorum: String, groupProtocol: 
String): Unit = {
 val simpleGroup = "simple-group"
+val protocolGroup = "protocol-group"
+
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)
+addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
 var out = ""
 
 var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
 TestUtils.waitUntilTrue(() => {
   out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-  !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group)
-}, s"Expected to find $simpleGroup, $group and no header, but found $out")
+  !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group) && out.contains(protocolGroup)
+}, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, 
but found $out")
 
 cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
 TestUtils.waitUntilTrue(() => {
   out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-  out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
-}, s"Expected to find $simpleGroup, $group and the header, but found $out")
+  out.contains("STATE") && !out.contains("TYPE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+}, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+TestUtils.waitUntilTrue(() => {
+  out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+  out.contains("TYPE") && !out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+}, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "--type")
+TestUtils.waitUntilTrue(() => {
+  out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+  out.contains("TYPE") && out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+}, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
 
 cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "Stable")
 TestUtils.waitUntilTrue(() => {
   out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-  out.contains("STATE") && out.contains(group) && out.contains("Stable")
-}, s"Expected to find $group in state Stable and the header, but found 
$out")
+  out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+}, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
 
 cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "stable")
 TestUtils.waitUntilTrue(() => {
   out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-  out.contains("STATE") && out.contains(group) && out.contains("Stable")
-}, s"Expected to find $group in state Stable and 

Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-09 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1484632200


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -18,74 +18,196 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
 val simpleGroup = "simple-group"
+val protocolGroup = "protocol-group"
+
+createOffsetsTopic()

Review Comment:
   Is it required or is this just for uniformity?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-09 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1484631548


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -18,44 +18,47 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
 val simpleGroup = "simple-group"
+val protocolGroup = "protocol-group"
+
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)
+addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
 
 val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
 val service = getConsumerGroupService(cgcArgs)
 
-val expectedGroups = Set(group, simpleGroup)
+val expectedGroups = Set(protocolGroup, group, simpleGroup)
 var foundGroups = Set.empty[String]
 TestUtils.waitUntilTrue(() => {
   foundGroups = service.listConsumerGroups().toSet
   expectedGroups == foundGroups
 }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {
 val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--list")
 assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroupsWithStates(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): 
Unit = {
 val simpleGroup = "simple-group"
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)

Review Comment:
   AH yes thankss I understand, I was just asking if I should make changes 
related to the state methods in this PR, since I got comments before to not put 
unrelated changes in the same PR, I'll just do it and you can lmk if you wanna 
revert it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-09 Thread via GitHub


rreddy-22 commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1484628868


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2811,6 +2811,72 @@ public void testListConsumerGroupsWithStates() throws 
Exception {
 }
 }
 
+@Test
+public void testListConsumerGroupsWithTypes() throws Exception {
+try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+// Test with a specific state filter but no type filter in list 
consumer group options.
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+env.kafkaClient().prepareResponseFrom(
+request -> request instanceof ListGroupsRequest &&
+!((ListGroupsRequest) 
request).data().statesFilter().isEmpty() &&
+((ListGroupsRequest) 
request).data().typesFilter().isEmpty(),

Review Comment:
   haha okay, I actually started off by adding it to the unsupported version 
response method before getting the comments, it's honestly weird that the other 
tests don't validate anything lol, I had this urge to change the state tests 
also 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16229: Fix slow expired producer id deletion [kafka]

2024-02-09 Thread via GitHub


jolshan commented on PR #15324:
URL: https://github.com/apache/kafka/pull/15324#issuecomment-1936317088

   Hey there @jeqo looks like check style failed. Do you mind adding the apache 
header to your new benchmark?
   
   
```/home/jenkins/workspace/Kafka_kafka-pr_PR-15324/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java:1:
 Line does not match expected header line of '/*'. [Header]```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper

2024-02-09 Thread Gaurav Salvi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816120#comment-17816120
 ] 

Gaurav Salvi edited comment on KAFKA-16239 at 2/9/24 3:42 PM:
--

[~divijvaidya]  : What should the references to 'IntegrationTestHelper'  
replaced with? Or we can simply remove the lines with it.

Also I would like to work on it. Please assign to me.

 


was (Author: JIRAUSER304146):
[~divijvaidya]  : What should the references to 'IntegrationTestHelper'  
replaced with? Or we can simply remove the lines with it.

 

> Clean up references to non-existent IntegrationTestHelper
> -
>
> Key: KAFKA-16239
> URL: https://issues.apache.org/jira/browse/KAFKA-16239
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: newbie
>
> A bunch of places in the code javadocs and READ docs refer to a class called 
> IntegrationTestHelper. Such a class does not exist. 
> This task will clean up all referenced to IntegrationTestHelper from Kafka 
> code base.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper

2024-02-09 Thread Gaurav Salvi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816120#comment-17816120
 ] 

Gaurav Salvi commented on KAFKA-16239:
--

[~divijvaidya]  : What should the references to 'IntegrationTestHelper'  
replaced with? Or we can simply remove the lines with it.

 

> Clean up references to non-existent IntegrationTestHelper
> -
>
> Key: KAFKA-16239
> URL: https://issues.apache.org/jira/browse/KAFKA-16239
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: newbie
>
> A bunch of places in the code javadocs and READ docs refer to a class called 
> IntegrationTestHelper. Such a class does not exist. 
> This task will clean up all referenced to IntegrationTestHelper from Kafka 
> code base.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-09 Thread via GitHub


lianetm commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1936141808

   @Phuc-Hong-Tran regarding this:
   
   > Just for clarification, when we were talking about "implement and test 
everything up to the point where the field is populated", does that mean we're 
not gonna implement and test the part where the client receive the assignment 
from broker at this stage?
   
   We do need to:
   - `implement and test everything up to the point where the field is 
populated ` (from the point the user calls subscribe with the 
SubscriptionPattern, to the point where the supplied regex is available in the 
HB builder, to be included in the HB request). 
   - include the field in the HB request. This is where we do need the RPC to 
be updated to support the new field.
   
   I could be missing something, but I would say we don't need any changes for 
the part where the client receives the assignment from the broker after 
subscribing to a regex. It should be exactly the same logic as when a client 
receives an assignment from the broker after subscribing to a list of topics. 
After sending the HB with the new regex, the client will receive the list of 
partitions assigned to it, and will reconcile them, just as it reconciles all 
assignments received (no matter the subscription type that led to receiving 
that assignment).
   
   Just for the record, the legacy coordinator does have a bit of logic 
([here](https://github.com/apache/kafka/blob/ec4a8aaadbc95cfcf0de2f5e1385373f095298ca/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L367))
 for validating assignments received after subscribing to a regex, where it 
checks that the assignment received matches the regex. Our initial thought was 
not to include any assignment validation like that in the client, in an attempt 
to simplify it: the broker is the sole responsible for computing the regex and 
target assignment, the client takes and reconciles whatever the broker sends, 
and if the subscription changes from the client side, we have a common logic 
(not specific for regex), to make sure that the new subscription is sent to the 
broker (what the legacy achieved with the rejoin)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-09 Thread via GitHub


lianetm commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1936091369

   Hey @Phuc-Hong-Tran , regarding the mixed usage of subscribe with `Pattern` 
and with `SubscriptionPattern`, my opinion is that it is something we should 
live with to provide a smooth transition, while the usage of `Pattern` is 
deprecated. So I would say that we shouldn't restrict it by throwing any new 
exception to the user (which btw, would introduce API level changes not 
included in the KIP, so it would require an updated/new KIP). We could just 
allow subsequent calls to both subscribe with Pattern or SubscriptionPattern, 
and just ensure that the latest prevails. This is the behaviour for subsequent 
calls to `subscribe(Pattern..)`, tested in 
[testSubsequentPatternSubscription](https://github.com/apache/kafka/blob/ec4a8aaadbc95cfcf0de2f5e1385373f095298ca/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L417).
  
   
   Just for the record, there is a restriction (see 
[here](https://github.com/apache/kafka/blob/ec4a8aaadbc95cfcf0de2f5e1385373f095298ca/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L162))
 for not allowing mixed usage of subscribe, but only when mixing different 
subscription types (topics, partitions, pattern). We continue to respect that, 
without introducing any new restriction for the calls that in the end represent 
the same pattern-based subscription type (AUTO_PATTERN).  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Refactor GroupMetadataManagerTest [kafka]

2024-02-09 Thread via GitHub


dajac opened a new pull request, #15348:
URL: https://github.com/apache/kafka/pull/15348

   `GroupMetadataManagerTest` class got a little under control. We have too 
many things defined in it. As a first steps, this patch extracts all the inner 
classes. It also extracts all the helper methods. However, the logic is not 
changed at all.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16102) about DynamicListenerConfig, the dynamic modification of the listener's port or IP does not take effect.

2024-02-09 Thread Jialun Peng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815470#comment-17815470
 ] 

Jialun Peng edited comment on KAFKA-16102 at 2/9/24 2:06 PM:
-

Hi, can you take a look at this issue? [~cmccabe_impala_fa3f] 


was (Author: JIRAUSER303739):
Hi, can you take a look at this issue?

> about DynamicListenerConfig, the dynamic modification of the listener's port 
> or IP does not take effect.
> 
>
> Key: KAFKA-16102
> URL: https://issues.apache.org/jira/browse/KAFKA-16102
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.6.0
> Environment: Must be present in any environment.
>Reporter: Jialun Peng
>Assignee: Jialun Peng
>Priority: Minor
>  Labels: easyfix
> Fix For: 3.8.0
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> When I dynamically modify the parameters related to Kafka listeners, such as 
> changing the IP or port value of a listener, the dynamic parameters under the 
> corresponding path in ZooKeeper are updated. However, in reality, the 
> modification of the IP or port for the corresponding listener does not take 
> effect. This phenomenon consistently occurs. And there is a slight 
> improvement as the error "Security protocol cannot be updated for existing 
> listener" will be eliminated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2024-02-09 Thread via GitHub


Phuc-Hong-Tran commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-1935914902

   @philipnee All done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2024-02-09 Thread via GitHub


Phuc-Hong-Tran commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-1935913572

   @philipnee Will do


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-09 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1935911914

   Merged the code of 
[KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) into this PR.
   Everything else left as it was.
   All the tests still passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16240) Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft

2024-02-09 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge updated KAFKA-16240:
--
Description: 
Failed run 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/]

Stack trace
{code:java}
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: deleteRecords(api=DELETE_RECORDS) at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
 at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) 
at 
kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860)
{code}

  was:
Failed run 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/]

Stack trace

```

 org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: deleteRecords(api=DELETE_RECORDS) at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
 at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) 
at 
kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860)

```


> Flaky test 
> PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
> -
>
> Key: KAFKA-16240
> URL: https://issues.apache.org/jira/browse/KAFKA-16240
> Project: Kafka
>  Issue Type: Test
>Reporter: Gantigmaa Selenge
>Priority: Minor
>
> Failed run 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/]
> Stack trace
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: deleteRecords(api=DELETE_RECORDS) at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>  at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16240) Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft

2024-02-09 Thread Gantigmaa Selenge (Jira)
Gantigmaa Selenge created KAFKA-16240:
-

 Summary: Flaky test 
PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
 Key: KAFKA-16240
 URL: https://issues.apache.org/jira/browse/KAFKA-16240
 Project: Kafka
  Issue Type: Test
Reporter: Gantigmaa Selenge


Failed run 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/]

Stack trace

```

 org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: deleteRecords(api=DELETE_RECORDS) at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
 at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
 at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) 
at 
kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860)

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16240) Flaky test PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft

2024-02-09 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge updated KAFKA-16240:
--
Priority: Minor  (was: Major)

> Flaky test 
> PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
> -
>
> Key: KAFKA-16240
> URL: https://issues.apache.org/jira/browse/KAFKA-16240
> Project: Kafka
>  Issue Type: Test
>Reporter: Gantigmaa Selenge
>Priority: Minor
>
> Failed run 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15300/8/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords_String__quorum_kraft_2/]
> Stack trace
> ```
>  org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a 
> node assignment. Call: deleteRecords(api=DELETE_RECORDS) at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>  at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(PlaintextAdminIntegrationTest.scala:860)
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [MINOR] Fix docker image build [kafka]

2024-02-09 Thread via GitHub


stanislavkozlovski merged PR #15347:
URL: https://github.com/apache/kafka/pull/15347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13835) Fix two bugs related to dynamic broker configs in KRaft

2024-02-09 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816049#comment-17816049
 ] 

Divij Vaidya commented on KAFKA-13835:
--

[~cmccabe] the associated PR is merged. Can you please add the correct fix 
version and resolve this if you feel that this Jira is complete.

> Fix two bugs related to dynamic broker configs in KRaft
> ---
>
> Key: KAFKA-13835
> URL: https://issues.apache.org/jira/browse/KAFKA-13835
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Critical
>  Labels: 4.0-blocker
>
> The first bug is that we were calling reloadUpdatedFilesWithoutConfigChange 
> when a topic configuration was changed, but not when a broker configuration 
> was changed. This was backwards -- this function must be called only for 
> BROKER configs, and never for TOPIC configs. (Also, this function is called 
> only for specific broker configs, not for cluster configs.)
> The second bug is that there were several configurations such as 
> `max.connections` which were related to broker listeners, but which did not 
> involve creating or removing new listeners. We can and should support these 
> configurations in KRaft, since no additional work is needed to support them. 
> Only adding or removing listeners is unsupported. This PR adds support for 
> these by fixing the configuration change validation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12670) KRaft support for unclean.leader.election.enable

2024-02-09 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-12670:
-
Component/s: kraft

> KRaft support for unclean.leader.election.enable
> 
>
> Key: KAFKA-12670
> URL: https://issues.apache.org/jira/browse/KAFKA-12670
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Reporter: Colin McCabe
>Assignee: Ryan Dielhenn
>Priority: Major
>
> Implement KRaft support for the unclean.leader.election.enable 
> configurations.  These configurations can be set at the topic, broker, or 
> cluster level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools

2024-02-09 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816045#comment-17816045
 ] 

Divij Vaidya edited comment on KAFKA-14349 at 2/9/24 12:52 PM:
---

[~cmccabe] might have forgot to close this since it's still open. As per 
[https://github.com/apache/kafka/blob/092dc7fc467ed7d354ec504d6939b3fcd7b80632/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L298]
(I might be wrong) but we are only reconfiguring the IO thread pool and not the 
network thread pool.

Let's wait for [~cmccabe] to clarify this.


was (Author: divijvaidya):
[~cmccabe] might have forgot to close this since it's still open. I am closing 
this. I have verified that controller threads pools can be dynamically resized 
as per 
https://github.com/apache/kafka/blob/092dc7fc467ed7d354ec504d6939b3fcd7b80632/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L298

> Support dynamically resizing the KRaft controller's thread pools
> 
>
> Key: KAFKA-14349
> URL: https://issues.apache.org/jira/browse/KAFKA-14349
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: 4.0-blocker, kip-500
>
> Support dynamically resizing the KRaft controller's request handler and 
> network handler thread pools. See {{DynamicBrokerConfig.scala}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools

2024-02-09 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816045#comment-17816045
 ] 

Divij Vaidya commented on KAFKA-14349:
--

[~cmccabe] might have forgot to close this since it's still open. I am closing 
this. I have verified that controller threads pools can be dynamically resized 
as per 
https://github.com/apache/kafka/blob/092dc7fc467ed7d354ec504d6939b3fcd7b80632/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L298

> Support dynamically resizing the KRaft controller's thread pools
> 
>
> Key: KAFKA-14349
> URL: https://issues.apache.org/jira/browse/KAFKA-14349
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: 4.0-blocker, kip-500
>
> Support dynamically resizing the KRaft controller's request handler and 
> network handler thread pools. See {{DynamicBrokerConfig.scala}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Fix docker image build [kafka]

2024-02-09 Thread via GitHub


VedarthConfluent opened a new pull request, #15347:
URL: https://github.com/apache/kafka/pull/15347

   Base image has removed bash. So we need to install it to run the bash 
scripts inside docker container.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools

2024-02-09 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816043#comment-17816043
 ] 

Divij Vaidya commented on KAFKA-14349:
--

[~cmccabe] can we remove "Modifying certain dynamic configurations on the 
standalone KRaft controller" from 
[https://kafka.apache.org/37/documentation.html#kraft_missing] after this JIRA?

> Support dynamically resizing the KRaft controller's thread pools
> 
>
> Key: KAFKA-14349
> URL: https://issues.apache.org/jira/browse/KAFKA-14349
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: 4.0-blocker, kip-500
>
> Support dynamically resizing the KRaft controller's request handler and 
> network handler thread pools. See {{DynamicBrokerConfig.scala}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16238) ConnectRestApiTest broken after KIP-1004

2024-02-09 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-16238.

Fix Version/s: 3.8.0
   Resolution: Fixed

> ConnectRestApiTest broken after KIP-1004
> 
>
> Key: KAFKA-16238
> URL: https://issues.apache.org/jira/browse/KAFKA-16238
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.8.0
>
>
> KIP-1004 introduced a new configuration for connectors: 'tasks.max.enforce'.
> The ConnectRestApiTest system test needs to be updated to expect the new 
> configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16238: Fix ConnectRestApiTest system test [kafka]

2024-02-09 Thread via GitHub


mimaison merged PR #15346:
URL: https://github.com/apache/kafka/pull/15346


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: EventAccumulator should signal to one thread when key becomes available [kafka]

2024-02-09 Thread via GitHub


dajac merged PR #15340:
URL: https://github.com/apache/kafka/pull/15340


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14127) KIP-858: Handle JBOD broker disk failure in KRaft

2024-02-09 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816038#comment-17816038
 ] 

Divij Vaidya commented on KAFKA-14127:
--

Hey folks

3.7's documentation still says that JBOD is a missing feature [1] in KRaft. 
Could we please fix that?

[1] https://kafka.apache.org/37/documentation.html#kraft_missing

> KIP-858: Handle JBOD broker disk failure in KRaft
> -
>
> Key: KAFKA-14127
> URL: https://issues.apache.org/jira/browse/KAFKA-14127
> Project: Kafka
>  Issue Type: Improvement
>  Components: jbod, kraft
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>  Labels: 4.0-blocker, kip-500, kraft
> Fix For: 3.7.0
>
>
> Supporting configurations with multiple storage directories in KRaft mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-02-09 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim resolved KAFKA-16162.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> New created topics are unavailable after upgrading to 3.7
> -
>
> Key: KAFKA-16162
> URL: https://issues.apache.org/jira/browse/KAFKA-16162
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
> request will include the `LogDirs` fields with UUID for each log dir in each 
> broker. This info will be stored in the controller and used to identify if 
> the log dir is known and online while handling AssignReplicasToDirsRequest 
> [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
>  
> While upgrading from old version, the kafka cluster will run in 3.7 binary 
> with old metadata version, and then upgrade to newer version using 
> kafka-features.sh. That means, while brokers startup and send the 
> brokerRegistration request, it'll be using older metadata version without 
> `LogDirs` fields included. And it makes the controller has no log dir info 
> for all brokers. Later, after upgraded, if new topic is created, the flow 
> will go like this:
> 1. Controller assign replicas and adds in metadata log
> 2. brokers fetch the metadata and apply it
> 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
> 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica 
> assignment, controller will think the log dir in current replica is offline, 
> so triggering offline handler, and reassign leader to another replica, and 
> offline, until no more replicas to assign, so assigning leader to -1 (i.e. no 
> leader) 
> So, the results will be that new created topics are unavailable (with no 
> leader) because the controller thinks all log dir are offline.
> {code:java}
> lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
> quickstart-events3 --bootstrap-server localhost:9092  
> 
> Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
> 3   ReplicationFactor: 3Configs: segment.bytes=1073741824
>   Topic: quickstart-events3   Partition: 0Leader: none
> Replicas: 7,2,6 Isr: 6
>   Topic: quickstart-events3   Partition: 1Leader: none
> Replicas: 2,6,7 Isr: 6
>   Topic: quickstart-events3   Partition: 2Leader: none
> Replicas: 6,7,2 Isr: 6
> {code}
> The log snippet in the controller :
> {code:java}
> # handling 1st assignReplicaToDirs request
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] 
> offline-dir-assignment: changing partition(s): quickstart-events3-0, 
> quickstart-events3-2, quickstart-events3-1 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [7K5JBERyyqFFxIXSXYluJA, AA, AA], 
> partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
> change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
> isr=null, leader=-2, replicas=null, removingReplicas=null, 
> addingReplicas=null, leaderRecoveryState=-1, 
> directories=[7K5JBERyyqFFxIXSXYluJA, AA, 
> AA], eligibleLeaderReplicas=null, lastKnownELR=null) for 
> topic quickstart-events3 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [AA, 

[jira] [Commented] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7

2024-02-09 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816037#comment-17816037
 ] 

Omnia Ibrahim commented on KAFKA-16162:
---

Marking this as resolved as the pr was committed

> New created topics are unavailable after upgrading to 3.7
> -
>
> Key: KAFKA-16162
> URL: https://issues.apache.org/jira/browse/KAFKA-16162
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
>
> In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration 
> request will include the `LogDirs` fields with UUID for each log dir in each 
> broker. This info will be stored in the controller and used to identify if 
> the log dir is known and online while handling AssignReplicasToDirsRequest 
> [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093].
>  
> While upgrading from old version, the kafka cluster will run in 3.7 binary 
> with old metadata version, and then upgrade to newer version using 
> kafka-features.sh. That means, while brokers startup and send the 
> brokerRegistration request, it'll be using older metadata version without 
> `LogDirs` fields included. And it makes the controller has no log dir info 
> for all brokers. Later, after upgraded, if new topic is created, the flow 
> will go like this:
> 1. Controller assign replicas and adds in metadata log
> 2. brokers fetch the metadata and apply it
> 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment
> 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica 
> assignment, controller will think the log dir in current replica is offline, 
> so triggering offline handler, and reassign leader to another replica, and 
> offline, until no more replicas to assign, so assigning leader to -1 (i.e. no 
> leader) 
> So, the results will be that new created topics are unavailable (with no 
> leader) because the controller thinks all log dir are offline.
> {code:java}
> lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic 
> quickstart-events3 --bootstrap-server localhost:9092  
> 
> Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: 
> 3   ReplicationFactor: 3Configs: segment.bytes=1073741824
>   Topic: quickstart-events3   Partition: 0Leader: none
> Replicas: 7,2,6 Isr: 6
>   Topic: quickstart-events3   Partition: 1Leader: none
> Replicas: 2,6,7 Isr: 6
>   Topic: quickstart-events3   Partition: 2Leader: none
> Replicas: 6,7,2 Isr: 6
> {code}
> The log snippet in the controller :
> {code:java}
> # handling 1st assignReplicaToDirs request
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned 
> partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] 
> offline-dir-assignment: changing partition(s): quickstart-events3-0, 
> quickstart-events3-2, quickstart-events3-1 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, AA] -> 
> [7K5JBERyyqFFxIXSXYluJA, AA, AA], 
> partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition 
> change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, 
> isr=null, leader=-2, replicas=null, removingReplicas=null, 
> addingReplicas=null, leaderRecoveryState=-1, 
> directories=[7K5JBERyyqFFxIXSXYluJA, AA, 
> AA], eligibleLeaderReplicas=null, lastKnownELR=null) for 
> topic quickstart-events3 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for 
> quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: 
> [AA, AA, 

Re: [PR] KAFKA-16215; KAFKA-16178: Fix member not rejoining after error [kafka]

2024-02-09 Thread via GitHub


AndrewJSchofield commented on code in PR #15311:
URL: https://github.com/apache/kafka/pull/15311#discussion_r1484254595


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -397,13 +400,17 @@ private void onErrorResponse(final 
ConsumerGroupHeartbeatResponse response,
 membershipManager.memberId(), 
membershipManager.memberEpoch());
 logInfo(message, response, currentTimeMs);
 membershipManager.transitionToFenced();
+// Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases it assignment

Review Comment:
   nit: "its" assignment. And below also



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14782: Implementation Details Different from Documentation (del… [kafka]

2024-02-09 Thread via GitHub


whiteyesx closed pull request #13721: KAFKA-14782: Implementation Details 
Different from Documentation (del…
URL: https://github.com/apache/kafka/pull/13721


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14616) Topic recreation with offline broker causes permanent URPs

2024-02-09 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim resolved KAFKA-14616.
---
Fix Version/s: 3.7.0
 Assignee: Colin McCabe
   Resolution: Fixed

> Topic recreation with offline broker causes permanent URPs
> --
>
> Key: KAFKA-14616
> URL: https://issues.apache.org/jira/browse/KAFKA-14616
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Omnia Ibrahim
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.7.0
>
>
> We are facing an odd situation when we delete and recreate a topic while 
> broker is offline in KRAFT mode. 
> Here’s what we saw step by step
>  # Created topic {{foo.test}} with 10 partitions and 4 replicas — Topic 
> {{foo.test}} was created with topic ID {{MfuZbwdmSMaiSa0g6__TPg}}
>  # Took broker 4 offline — which held replicas for partitions  {{0, 3, 4, 5, 
> 7, 8, 9}}
>  # Deleted topic {{foo.test}} — The deletion process was successful, despite 
> the fact that broker 4 still held replicas for partitions {{0, 3, 4, 5, 7, 8, 
> 9}} on local disk.
>  # Recreated topic {{foo.test}} with 10 partitions and 4 replicas. — Topic 
> {{foo.test}} was created with topic ID {{RzalpqQ9Q7ub2M2afHxY4Q}} and 
> partitions {{0, 1, 2, 7, 8, 9}} got assigned to broker 4 (which was still 
> offline). Notice here that partitions {{0, 7, 8, 9}} are common between the 
> assignment of the deleted topic ({{{}topic_id: MfuZbwdmSMaiSa0g6__TPg{}}}) 
> and the recreated topic ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}).
>  # Brough broker 4 back online.
>  # Broker started to create new partition replicas for the recreated topic 
> {{foo.test}} ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}})
>  # The broker hit the following error {{Tried to assign topic ID 
> RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already 
> contained topic ID MfuZbwdmSMaiSa0g6__TPg}} . As a result of this error the 
> broker decided to rename log dir for partitions {{0, 3, 4, 5, 7, 8, 9}} to 
> {{{}-.-delete{}}}.
>  # Ran {{ls }}
> {code:java}
> foo.test-0.658f87fb9a2e42a590b5d7dcc28862b5-delete/
> foo.test-1/
> foo.test-2/
> foo.test-3.a68f05d05bcc4e579087551b539af311-delete/
> foo.test-4.79ce30a5310d4950ad1b28f226f74895-delete/
> foo.test-5.76ed04da75bf46c3a63342be1eb44450-delete/
> foo.test-6/
> foo.test-7.c2d33db3bf844e9ebbcd9ef22f5270da-delete/
> foo.test-8.33836969ac714b41b69b5334a5068ce0-delete/
> foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/{code}
>       9. Waited until the deletion of the old topic was done and ran {{ls 
> }} again, now we were expecting to see log dir for partitions 
> {{0, 1, 2, 7, 8, 9}} however the result is:
> {code:java}
> foo.test-1/
> foo.test-2/
> foo.test-6/{code}
>      10. Ran {{kafka-topics.sh --command-config cmd.properties 
> --bootstrap-server  --describe --topic foo.test}}
> {code:java}
> Topic: foo.test TopicId: RzalpqQ9Q7ub2M2afHxY4Q PartitionCount: 10 
> ReplicationFactor: 4 Configs: 
> min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=3145728,unclean.leader.election.enable=false,retention.bytes=10
> Topic: foo.test Partition: 0 Leader: 2 Replicas: 2,3,4,5 Isr: 2,3,5
> Topic: foo.test Partition: 1 Leader: 3 Replicas: 3,4,5,6 Isr: 3,5,6,4
> Topic: foo.test Partition: 2 Leader: 5 Replicas: 5,4,6,1 Isr: 5,6,1,4
> Topic: foo.test Partition: 3 Leader: 5 Replicas: 5,6,1,2 Isr: 5,6,1,2
> Topic: foo.test Partition: 4 Leader: 6 Replicas: 6,1,2,3 Isr: 6,1,2,3
> Topic: foo.test Partition: 5 Leader: 1 Replicas: 1,6,2,5 Isr: 1,6,2,5
> Topic: foo.test Partition: 6 Leader: 6 Replicas: 6,2,5,4 Isr: 6,2,5,4
> Topic: foo.test Partition: 7 Leader: 2 Replicas: 2,5,4,3 Isr: 2,5,3
> Topic: foo.test Partition: 8 Leader: 5 Replicas: 5,4,3,1 Isr: 5,3,1
> Topic: foo.test Partition: 9 Leader: 3 Replicas: 3,4,1,6 Isr: 3,1,6{code}
> Here’s a sample of broker logs
>  
> {code:java}
> {"timestamp":"2023-01-11T15:19:53,620Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  log for partition foo.test-9 in 
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete.","logger":"kafka.log.LogManager"}
> {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  time index 
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.timeindex.deleted.","logger":"kafka.log.LogSegment"}
> {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  offset index 
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.index.deleted.","logger":"kafka.log.LogSegment"}
> 

[jira] [Commented] (KAFKA-14616) Topic recreation with offline broker causes permanent URPs

2024-02-09 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816034#comment-17816034
 ] 

Omnia Ibrahim commented on KAFKA-14616:
---

I'll mark this as resolved as [~cmccabe] committed the pr. 

> Topic recreation with offline broker causes permanent URPs
> --
>
> Key: KAFKA-14616
> URL: https://issues.apache.org/jira/browse/KAFKA-14616
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Omnia Ibrahim
>Priority: Major
>
> We are facing an odd situation when we delete and recreate a topic while 
> broker is offline in KRAFT mode. 
> Here’s what we saw step by step
>  # Created topic {{foo.test}} with 10 partitions and 4 replicas — Topic 
> {{foo.test}} was created with topic ID {{MfuZbwdmSMaiSa0g6__TPg}}
>  # Took broker 4 offline — which held replicas for partitions  {{0, 3, 4, 5, 
> 7, 8, 9}}
>  # Deleted topic {{foo.test}} — The deletion process was successful, despite 
> the fact that broker 4 still held replicas for partitions {{0, 3, 4, 5, 7, 8, 
> 9}} on local disk.
>  # Recreated topic {{foo.test}} with 10 partitions and 4 replicas. — Topic 
> {{foo.test}} was created with topic ID {{RzalpqQ9Q7ub2M2afHxY4Q}} and 
> partitions {{0, 1, 2, 7, 8, 9}} got assigned to broker 4 (which was still 
> offline). Notice here that partitions {{0, 7, 8, 9}} are common between the 
> assignment of the deleted topic ({{{}topic_id: MfuZbwdmSMaiSa0g6__TPg{}}}) 
> and the recreated topic ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}).
>  # Brough broker 4 back online.
>  # Broker started to create new partition replicas for the recreated topic 
> {{foo.test}} ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}})
>  # The broker hit the following error {{Tried to assign topic ID 
> RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already 
> contained topic ID MfuZbwdmSMaiSa0g6__TPg}} . As a result of this error the 
> broker decided to rename log dir for partitions {{0, 3, 4, 5, 7, 8, 9}} to 
> {{{}-.-delete{}}}.
>  # Ran {{ls }}
> {code:java}
> foo.test-0.658f87fb9a2e42a590b5d7dcc28862b5-delete/
> foo.test-1/
> foo.test-2/
> foo.test-3.a68f05d05bcc4e579087551b539af311-delete/
> foo.test-4.79ce30a5310d4950ad1b28f226f74895-delete/
> foo.test-5.76ed04da75bf46c3a63342be1eb44450-delete/
> foo.test-6/
> foo.test-7.c2d33db3bf844e9ebbcd9ef22f5270da-delete/
> foo.test-8.33836969ac714b41b69b5334a5068ce0-delete/
> foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/{code}
>       9. Waited until the deletion of the old topic was done and ran {{ls 
> }} again, now we were expecting to see log dir for partitions 
> {{0, 1, 2, 7, 8, 9}} however the result is:
> {code:java}
> foo.test-1/
> foo.test-2/
> foo.test-6/{code}
>      10. Ran {{kafka-topics.sh --command-config cmd.properties 
> --bootstrap-server  --describe --topic foo.test}}
> {code:java}
> Topic: foo.test TopicId: RzalpqQ9Q7ub2M2afHxY4Q PartitionCount: 10 
> ReplicationFactor: 4 Configs: 
> min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=3145728,unclean.leader.election.enable=false,retention.bytes=10
> Topic: foo.test Partition: 0 Leader: 2 Replicas: 2,3,4,5 Isr: 2,3,5
> Topic: foo.test Partition: 1 Leader: 3 Replicas: 3,4,5,6 Isr: 3,5,6,4
> Topic: foo.test Partition: 2 Leader: 5 Replicas: 5,4,6,1 Isr: 5,6,1,4
> Topic: foo.test Partition: 3 Leader: 5 Replicas: 5,6,1,2 Isr: 5,6,1,2
> Topic: foo.test Partition: 4 Leader: 6 Replicas: 6,1,2,3 Isr: 6,1,2,3
> Topic: foo.test Partition: 5 Leader: 1 Replicas: 1,6,2,5 Isr: 1,6,2,5
> Topic: foo.test Partition: 6 Leader: 6 Replicas: 6,2,5,4 Isr: 6,2,5,4
> Topic: foo.test Partition: 7 Leader: 2 Replicas: 2,5,4,3 Isr: 2,5,3
> Topic: foo.test Partition: 8 Leader: 5 Replicas: 5,4,3,1 Isr: 5,3,1
> Topic: foo.test Partition: 9 Leader: 3 Replicas: 3,4,1,6 Isr: 3,1,6{code}
> Here’s a sample of broker logs
>  
> {code:java}
> {"timestamp":"2023-01-11T15:19:53,620Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  log for partition foo.test-9 in 
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete.","logger":"kafka.log.LogManager"}
> {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  time index 
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.timeindex.deleted.","logger":"kafka.log.LogSegment"}
> {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  offset index 
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/.index.deleted.","logger":"kafka.log.LogSegment"}
> {"timestamp":"2023-01-11T15:19:53,615Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
>  log 
> 

Re: [PR] KAFKA-7504 Mitigate sendfile(2) blocking in network threads by waming up fetch data [kafka]

2024-02-09 Thread via GitHub


divijvaidya commented on code in PR #14289:
URL: https://github.com/apache/kafka/pull/14289#discussion_r1484189476


##
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##
@@ -421,6 +446,18 @@ private AbstractIterator 
batchIterator(int start) {
 return new RecordBatchIterator<>(inputStream);
 }
 
+/**
+ * Try populating OS page cache with file content
+ */
+public void prepareForRead() throws IOException {
+if (DEVNULL_PATH != null) {
+long size = Math.min(channel.size(), end) - start;

Review Comment:
   Isn't `size` member in FileRecords precomputed for FileRecords during 
construction and maintained whenever there is more mutation? Hence, we can 
simply use `this.size` here. No?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java:
##
@@ -39,19 +40,40 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
  Records records,
  boolean firstEntryIncomplete,
  Optional> 
abortedTransactions) {
-this(fetchOffsetMetadata, records, firstEntryIncomplete, 
abortedTransactions, Optional.empty());
+this(fetchOffsetMetadata,
+ records,
+ firstEntryIncomplete,
+ abortedTransactions,
+ Optional.empty(),
+ LogOffsetMetadata.UNKNOWN_OFFSET_METADATA);
 }
 
 public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
  Records records,
  boolean firstEntryIncomplete,
  Optional> 
abortedTransactions,
- Optional 
delayedRemoteStorageFetch) {
+ Optional 
delayedRemoteStorageFetch,
+ LogOffsetMetadata maxOffsetMetadata) {
 this.fetchOffsetMetadata = fetchOffsetMetadata;
 this.records = records;
 this.firstEntryIncomplete = firstEntryIncomplete;
 this.abortedTransactions = abortedTransactions;
 this.delayedRemoteStorageFetch = delayedRemoteStorageFetch;
+this.maxOffsetMetadata = maxOffsetMetadata;
+}
+
+public FetchDataInfo withMaxOffsetMetadata(LogOffsetMetadata 
maxOffsetMetadata) {
+return new FetchDataInfo(fetchOffsetMetadata,
+ records,
+ firstEntryIncomplete,
+ abortedTransactions,
+ delayedRemoteStorageFetch,
+ maxOffsetMetadata);
+}
+
+public boolean isLastSegment() {

Review Comment:
   last segment is always the active segment. Shall we call it 
isActiveSegment()? 
   
   Also, FetchDataInfo is not a property of a segment, hence isActiveSegment() 
sounds counter intuitive. But it also cannot have data spanning across multiple 
segments. So, perhaps rename the function to, isFetchDataFromActiveSegment()



##
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##
@@ -421,6 +446,18 @@ private AbstractIterator 
batchIterator(int start) {
 return new RecordBatchIterator<>(inputStream);
 }
 
+/**
+ * Try populating OS page cache with file content
+ */
+public void prepareForRead() throws IOException {
+if (DEVNULL_PATH != null) {
+long size = Math.min(channel.size(), end) - start;
+try (FileChannel devnullChannel = FileChannel.open(DEVNULL_PATH, 
StandardOpenOption.WRITE)) {
+channel.transferTo(start, size, devnullChannel);

Review Comment:
   do we want to pre-populate the entire content represented by the FileRecords 
or just the one that will be read by writeTo() method? For example, we are 
pre-populating all content represented by FileRecords here but we might end up 
reading only a part of it in writeTo() (see length param in writeTo()). Can we 
instead stick to a pre-populating a smaller chunk, perhaps 32KB which is the 
chunk size used by SSL transport layer while reading from the file.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java:
##
@@ -39,19 +40,40 @@ public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
  Records records,
  boolean firstEntryIncomplete,
  Optional> 
abortedTransactions) {
-this(fetchOffsetMetadata, records, firstEntryIncomplete, 
abortedTransactions, Optional.empty());
+this(fetchOffsetMetadata,
+ records,
+ firstEntryIncomplete,
+ abortedTransactions,
+ Optional.empty(),
+ LogOffsetMetadata.UNKNOWN_OFFSET_METADATA);
 }
 
 public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
  Records records,
  boolean firstEntryIncomplete,
   

Re: [PR] KAFKA-13403 Fix KafkaServer crashes when deleting topics due to the race in log deletion [kafka]

2024-02-09 Thread via GitHub


divijvaidya commented on PR #11438:
URL: https://github.com/apache/kafka/pull/11438#issuecomment-1935743682

   We have a test failing for this PR which has not failed in trunk for last 28 
days [2]. @arunmathew88 can you please debug why this PR is causing that test 
to fail.
   
   [1] https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11438/6/
   [2] 
https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=trunk=Europe%2FBerlin=kafka.log.remote.RemoteIndexCacheTest=testIndexFileAlreadyExistOnDiskButNotInCache()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve Code Style [kafka]

2024-02-09 Thread via GitHub


divijvaidya merged PR #15319:
URL: https://github.com/apache/kafka/pull/15319


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: ZKAcl Migration Client Test Problem failing in Scala version 2.12 [kafka]

2024-02-09 Thread via GitHub


divijvaidya closed pull request #15342: MINOR:  ZKAcl Migration Client Test 
Problem failing in Scala version 2.12
URL: https://github.com/apache/kafka/pull/15342


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]

2024-02-09 Thread via GitHub


divijvaidya merged PR #15239:
URL: https://github.com/apache/kafka/pull/15239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: set test global timeout as 10 mins [kafka]

2024-02-09 Thread via GitHub


gaurav-narula commented on code in PR #15065:
URL: https://github.com/apache/kafka/pull/15065#discussion_r1484162204


##
clients/src/test/resources/junit-platform.properties:
##
@@ -13,3 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 junit.jupiter.params.displayname.default = "{displayName}.{argumentsWithNames}"
+junit.jupiter.execution.timeout.default = 10 m

Review Comment:
   Also stumbled upon [Thread 
Mode](https://github.com/junit-team/junit5/pull/2949) which avoids pitfalls 
where the test code [doesn't 
interrupt](https://github.com/junit-team/junit5/issues/2087).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: ZKAcl Migration Client Test Problem failing in Scala version 2.12 [kafka]

2024-02-09 Thread via GitHub


divijvaidya commented on PR #15342:
URL: https://github.com/apache/kafka/pull/15342#issuecomment-1935712826

   Thank you for looking into this @highluck  but this seems to have already 
been fixed by https://github.com/apache/kafka/pull/15343


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16239) Clean up references to non-existent IntegrationTestHelper

2024-02-09 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-16239:


 Summary: Clean up references to non-existent IntegrationTestHelper
 Key: KAFKA-16239
 URL: https://issues.apache.org/jira/browse/KAFKA-16239
 Project: Kafka
  Issue Type: Improvement
Reporter: Divij Vaidya


A bunch of places in the code javadocs and READ docs refer to a class called 
IntegrationTestHelper. Such a class does not exist. 

This task will clean up all referenced to IntegrationTestHelper from Kafka code 
base.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]

2024-02-09 Thread via GitHub


jlprat commented on PR #15239:
URL: https://github.com/apache/kafka/pull/15239#issuecomment-1935630910

   
https://issues.apache.org/jira/browse/KAFKA-12895?focusedCommentId=17365016=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17365016
   This could be done when KIP-751 is done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]

2024-02-09 Thread via GitHub


jlprat commented on PR #15239:
URL: https://github.com/apache/kafka/pull/15239#issuecomment-1935623714

   > I've tried that and thought it worked because of a silly mistake that I 
made. But core actually needs it, so it needs to stay.
   
   Correct, `core` needs the `compat` library as it works for both 2.12 and 
2.13. Once 2.12 support is removed (in 4.0) we can refactor the 2.13 core and 
remove the `compat` library.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16238: Fix ConnectRestApiTest system test [kafka]

2024-02-09 Thread via GitHub


mimaison opened a new pull request, #15346:
URL: https://github.com/apache/kafka/pull/15346

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16238) ConnectRestApiTest broken after KIP-1004

2024-02-09 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16238:
--

 Summary: ConnectRestApiTest broken after KIP-1004
 Key: KAFKA-16238
 URL: https://issues.apache.org/jira/browse/KAFKA-16238
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Mickael Maison
Assignee: Mickael Maison


KIP-1004 introduced a new configuration for connectors: 'tasks.max.enforce'.

The ConnectRestApiTest system test needs to be updated to expect the new 
configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]

2024-02-09 Thread via GitHub


dajac commented on code in PR #15275:
URL: https://github.com/apache/kafka/pull/15275#discussion_r1484072806


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1392,4 +1356,16 @@ public void registerStateListener(MemberStateListener 
listener) {
 }
 this.stateUpdatesListeners.add(listener);
 }
+
+/**
+ * If either a new target assignment or new metadata is available that we 
have not yet attempted
+ * to reconcile, and we are currently in state RECONCILING, trigger 
reconciliation.
+ */
+@Override
+public PollResult poll(final long currentTimeMs) {
+if (state == MemberState.RECONCILING) {
+maybeReconcile();

Review Comment:
   I have one remaining question for my understanding. If there is unresolved 
topic ids, is it going to request a metadata update (in 
`findResolvableAssignmentAndTriggerMetadataUpdate`) every time that `poll` is 
called? I believe that it won't be the case because the `Metadata` class will 
request a full update only once. Is my understanding correct?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1521,6 +1660,18 @@ private ConsumerRebalanceListenerInvoker 
consumerRebalanceListenerInvoker() {
 );
 }
 
+private SortedSet topicPartitions(Map> topicIdMap, Map topicIdNames) {

Review Comment:
   nit: Could it be static?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-02-09 Thread via GitHub


dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1484046178


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2811,6 +2811,72 @@ public void testListConsumerGroupsWithStates() throws 
Exception {
 }
 }
 
+@Test
+public void testListConsumerGroupsWithTypes() throws Exception {
+try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+// Test with a specific state filter but no type filter in list 
consumer group options.
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+env.kafkaClient().prepareResponseFrom(
+request -> request instanceof ListGroupsRequest &&
+!((ListGroupsRequest) 
request).data().statesFilter().isEmpty() &&
+((ListGroupsRequest) 
request).data().typesFilter().isEmpty(),

Review Comment:
   nit: It would be better to be precise here and validate if the states are 
actually the correct one. This also applies to the other matchers. 
`expectCreateTopicsRequestWithTopics` may be a good inspiration for the 
structure.



##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -18,74 +18,196 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
 val simpleGroup = "simple-group"
+val protocolGroup = "protocol-group"
+
+createOffsetsTopic()
+
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)
+addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
 
 val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
 val service = getConsumerGroupService(cgcArgs)
 
-val expectedGroups = Set(group, simpleGroup)
+val expectedGroups = Set(protocolGroup, group, simpleGroup)
 var foundGroups = Set.empty[String]
 TestUtils.waitUntilTrue(() => {
   foundGroups = service.listConsumerGroups().toSet
   expectedGroups == foundGroups
 }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
+  @Test
   def testListWithUnrecognizedNewConsumerOption(): Unit = {
 val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--list")
 assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroupsWithStates(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): 
Unit = {
 val simpleGroup = "simple-group"
 addSimpleGroupExecutor(group = simpleGroup)
 addConsumerGroupExecutor(numConsumers = 1)
 
 val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
 val service = getConsumerGroupService(cgcArgs)
 
+var expectedListing = Set(
+  new ConsumerGroupListing(
+simpleGroup,
+true,
+Optional.of(ConsumerGroupState.EMPTY),
+Optional.of(GroupType.CLASSIC)
+  ),
+  new ConsumerGroupListing(
+group,
+false,
+Optional.of(ConsumerGroupState.STABLE),
+Optional.of(GroupType.CLASSIC)
+  )
+)
+
+testWaitUntilTrue(Set.empty, ConsumerGroupState.values.toSet, 
expectedListing, service)

Review Comment:
   I like it but the name could be better. How about `assertGroupListing` or 
something like this?



##
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##
@@ -189,16 +197,65 @@ object 

Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-09 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1935587072

   Accidently closed the PR, reopening again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR fix scala compile issue [kafka]

2024-02-09 Thread via GitHub


mberndt123 commented on PR #15343:
URL: https://github.com/apache/kafka/pull/15343#issuecomment-1935582325

   odd 路‍♂️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org