[jira] [Resolved] (KAFKA-15490) Invalid path provided to the log failure channel upon I/O error when writing broker metadata checkpoint
[ https://issues.apache.org/jira/browse/KAFKA-15490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15490. --- Fix Version/s: 3.6.2 Resolution: Fixed > Invalid path provided to the log failure channel upon I/O error when writing > broker metadata checkpoint > --- > > Key: KAFKA-15490 > URL: https://issues.apache.org/jira/browse/KAFKA-15490 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.4.0, 3.4.1, 3.5.1, 3.6.1 >Reporter: Alexandre Dupriez >Assignee: Divij Vaidya >Priority: Minor > Fix For: 3.6.2 > > > There is a small bug/typo in the handling of I/O error when writing broker > metadata checkpoint in {{{}KafkaServer{}}}. The path provided to the log dir > failure channel is the full path of the checkpoint file whereas only the log > directory is expected > ([source|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/server/KafkaServer.scala#L958C8-L961C8]). > {code:java} > case e: IOException => >val dirPath = checkpoint.file.getAbsolutePath >logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while writing > meta.properties to $dirPath", e){code} > As a result, after an {{IOException}} is captured and enqueued in the log dir > failure channel ({{{}{}}} is to be replaced with the actual path of > the log directory): > {code:java} > [2023-09-22 17:07:32,052] ERROR Error while writing meta.properties to > /meta.properties (kafka.server.LogDirFailureChannel) > java.io.IOException{code} > The log dir failure handler cannot lookup the log directory: > {code:java} > [2023-09-22 17:07:32,053] ERROR [LogDirFailureHandler]: Error due to > (kafka.server.ReplicaManager$LogDirFailureHandler) > org.apache.kafka.common.errors.LogDirNotFoundException: Log dir > /meta.properties is not found in the config.{code} > An immediate fix for this is to use the {{logDir}} provided from to the > checkpointing method instead of the path of the metadata file. > For brokers with only one log directory, this bug will result in preventing > the broker from shutting down as expected. > The L{{{}ogDirNotFoundException{}}} then kills the log dir failure handler > thread, and subsequent {{IOException}} are not handled, and the broker never > stops. > {code:java} > [2024-02-27 02:13:13,564] INFO [LogDirFailureHandler]: Stopped > (kafka.server.ReplicaManager$LogDirFailureHandler){code} > Another consideration here is whether the {{LogDirNotFoundException}} should > terminate the log dir failure handler thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15490: Fix dir path when marking offline [kafka]
showuon commented on PR #15490: URL: https://github.com/apache/kafka/pull/15490#issuecomment-1999010980 Sorry that I missed the ping. Thanks Satish for helping review. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update javadocs and exception string in "deprecated" ProcessorRecordContext#hashcode [kafka]
ableegoldman merged PR #15508: URL: https://github.com/apache/kafka/pull/15508 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove deprecation and exception throw in ProcessorRecordContext#hashcode [kafka]
ableegoldman commented on PR #15508: URL: https://github.com/apache/kafka/pull/15508#issuecomment-1998996811 Well 1 of the 4 JDK builds was aborted for some unknown but the other 3 builds completed with only unrelated test failures. Given this is literally only touching on javadocs and error message strings, I'm going to make a judgement call and say it's probably safe to merge and definitely not the cause of the one build being aborted early Thanks for the extra pair of eyes to unravel this "mystery" for me lol. Should always check the original PR for comments when something doesn't make sense...my bad for being lazy here in the initial PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on PR #15507: URL: https://github.com/apache/kafka/pull/15507#issuecomment-1998993124 @ChrisAHolland thank for double checking the failed tests. Will merge it later! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on PR #15507: URL: https://github.com/apache/kafka/pull/15507#issuecomment-1998991332 @chia7712 I believe test failures are orthogonal, it seems all recently merged PR's have been having test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
sjhajharia commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1525774714 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -42,14 +42,26 @@ object AddPartitionsToTxnManager { val VerificationTimeMsMetricName = "VerificationTimeMs" } +/** + * This is an enum which handles the Partition Response based on the Produce Request Version and the exact operation + *defaultOperation: This is the default workflow which maps to cases when the Produce Request Version was lower than expected or when exercising the offset commit request path Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16370) offline rollback procedure from kraft mode to zookeeper mode.
[ https://issues.apache.org/jira/browse/KAFKA-16370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kaushik srinivas updated KAFKA-16370: - Issue Type: Bug (was: Improvement) > offline rollback procedure from kraft mode to zookeeper mode. > - > > Key: KAFKA-16370 > URL: https://issues.apache.org/jira/browse/KAFKA-16370 > Project: Kafka > Issue Type: Bug >Reporter: kaushik srinivas >Priority: Major > > From the KIP, > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration,] > > h2. Finalizing the Migration > Once the cluster has been fully upgraded to KRaft mode, the controller will > still be running in migration mode and making dual writes to KRaft and ZK. > Since the data in ZK is still consistent with that of the KRaft metadata log, > it is still possible to revert back to ZK. > *_The time that the cluster is running all KRaft brokers/controllers, but > still running in migration mode, is effectively unbounded._* > Once the operator has decided to commit to KRaft mode, the final step is to > restart the controller quorum and take it out of migration mode by setting > _zookeeper.metadata.migration.enable_ to "false" (or unsetting it). The > active controller will only finalize the migration once it detects that all > members of the quorum have signaled that they are finalizing the migration > (again, using the tagged field in ApiVersionsResponse). Once the controller > leaves migration mode, it will write a ZkMigrationStateRecord to the log and > no longer perform writes to ZK. It will also disable its special handling of > ZK RPCs. > *At this point, the cluster is fully migrated and is running in KRaft mode. A > rollback to ZK is still possible after finalizing the migration, but it must > be done offline and it will cause metadata loss (which can also cause > partition data loss).* > > Trying out the same in a kafka cluster which is migrated from zookeeper into > kraft mode. We observe the rollback is possible by deleting the "/controller" > node in the zookeeper before the rollback from kraft mode to zookeeper is > done. > The above snippet indicates that the rollback from kraft to zk after > migration is finalized is still possible in offline method. Is there any > already known steps to be done as part of this offline method of rollback ? > From our experience, we currently know of the step "deletion of /controller > node in zookeeper to force zookeper based brokers to be elected as new > controller after the rollback is done". Are there any additional > steps/actions apart from this ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15490: Fix dir path when marking offline [kafka]
satishd merged PR #15490: URL: https://github.com/apache/kafka/pull/15490 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15490: Fix dir path when marking offline [kafka]
satishd commented on PR #15490: URL: https://github.com/apache/kafka/pull/15490#issuecomment-1998903881 Failing tests are unrelated to this change, merging it to 3.6 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: implement lazy deserialization for segment [kafka]
github-actions[bot] commented on PR #14957: URL: https://github.com/apache/kafka/pull/14957#issuecomment-1998877709 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR:Add hint for `resumeDeletions` in `TopicDeletionManager` [kafka]
hudeqi opened a new pull request, #15543: URL: https://github.com/apache/kafka/pull/15543 The `resumeDeletions` method is called from multiple places and is the core logic responsible for executing the deletion. Therefore, adding comments to indicate where this method is being called can help better understand this part of the logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1525699817 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1456,7 +1456,7 @@ public CompletableFuture scheduleWriteOperation( * @param producerEpoch The producer epoch. * @param timeout The write operation timeout. * @param opThe write operation. - * + * @param apiVersionThe Version of the Txn_Offset_Commit request Review Comment: can we keep the new line below the last argument? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]
jolshan commented on code in PR #15486: URL: https://github.com/apache/kafka/pull/15486#discussion_r1525699580 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -44,7 +44,7 @@ object AddPartitionsToTxnManager { /** * This is an enum which handles the Partition Response based on the Produce Request Version and the exact operation - *defaultOperation: This is the default workflow which maps to cases when the Produce Request Version was lower than expected or when exercising the offset commit request path + *defaultOperation: This is the default workflow which maps to cases when the Produce Request Version or the Txn_offset_commit request was lower than expected Review Comment: nit: replace "lower than expected" with "lower than the first version supporting the new error code" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15206) Flaky test RemoteIndexCacheTest.testClose()
[ https://issues.apache.org/jira/browse/KAFKA-15206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-15206: -- Fix Version/s: 3.7.1 > Flaky test RemoteIndexCacheTest.testClose() > --- > > Key: KAFKA-15206 > URL: https://issues.apache.org/jira/browse/KAFKA-15206 > Project: Kafka > Issue Type: Test >Reporter: Divij Vaidya >Assignee: Kamal Chandraprakash >Priority: Minor > Labels: flaky-test > Fix For: 3.8.0, 3.7.1 > > > Test fails 2% of the time. > [https://ge.apache.org/scans/tests?search.timeZoneId=Europe/Berlin&tests.container=kafka.log.remote.RemoteIndexCacheTest&tests.test=testClose()] > > This test should be modified to test > assertTrue(cache.cleanerThread.isShutdownComplete) in a > TestUtils.waitUntilTrue condition which will catch the InterruptedException > and exit successfully on it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]
showuon merged PR #15523: URL: https://github.com/apache/kafka/pull/15523 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15206: Fix the flaky RemoteIndexCacheTest.testClose test [kafka]
showuon commented on PR #15523: URL: https://github.com/apache/kafka/pull/15523#issuecomment-1998835444 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998834117 Thanks all for the review! I've backported to v3.7. For v3.6, there are more codes diff, I'd like to run CI first before push the change. PR: https://github.com/apache/kafka/pull/15542 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon opened a new pull request, #15542: URL: https://github.com/apache/kafka/pull/15542 Backported https://github.com/apache/kafka/pull/15474 to v3.6 branch. Since there is more code diff, I'd like to make sure the backport doesn't break any tests. Fix getOffsetByMaxTimestamp for compressed records. This PR adds: 1. For inPlaceAssignment case, compute the correct offset for maxTimestamp when traversing the batch records, and set to ValidationResult in the end, instead of setting to last offset always. 2. For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log create time, like non-compressed, and inPlaceAssignment cases, instead of setting to last offset always. 3. Add tests to verify the fix. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16378) Under tiered storage, deleting local logs does not free disk space
[ https://issues.apache.org/jira/browse/KAFKA-16378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jianbin Chen updated KAFKA-16378: - Component/s: Tiered-Storage > Under tiered storage, deleting local logs does not free disk space > -- > > Key: KAFKA-16378 > URL: https://issues.apache.org/jira/browse/KAFKA-16378 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Jianbin Chen >Priority: Major > Attachments: image-2024-03-15-09-33-13-903.png > > > Of course, this is an occasional phenomenon, as long as the tiered storage > topic triggered the deletion of the local log action, there is always the > possibility of residual file references, but these files on the local disk is > already impossible to find! > I use the implementation as: [Aiven-Open/tiered-storage-for-apache-kafka: > RemoteStorageManager for Apache Kafka® Tiered Storage > (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka] > I also filed an issue in their community, which also contains a full > description of the problem > [Disk space not released · Issue #513 · > Aiven-Open/tiered-storage-for-apache-kafka > (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/513] > !image-2024-03-15-09-33-13-903.png! > You can clearly see in this figure that the kafka log has already output the > log of the operation that deleted the log, but the log is still referenced > and the disk space has not been released -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16378) Under tiered storage, deleting local logs does not free disk space
[ https://issues.apache.org/jira/browse/KAFKA-16378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jianbin Chen updated KAFKA-16378: - Issue Type: Bug (was: Wish) > Under tiered storage, deleting local logs does not free disk space > -- > > Key: KAFKA-16378 > URL: https://issues.apache.org/jira/browse/KAFKA-16378 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jianbin Chen >Priority: Major > Attachments: image-2024-03-15-09-33-13-903.png > > > Of course, this is an occasional phenomenon, as long as the tiered storage > topic triggered the deletion of the local log action, there is always the > possibility of residual file references, but these files on the local disk is > already impossible to find! > I use the implementation as: [Aiven-Open/tiered-storage-for-apache-kafka: > RemoteStorageManager for Apache Kafka® Tiered Storage > (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka] > I also filed an issue in their community, which also contains a full > description of the problem > [Disk space not released · Issue #513 · > Aiven-Open/tiered-storage-for-apache-kafka > (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/513] > !image-2024-03-15-09-33-13-903.png! > You can clearly see in this figure that the kafka log has already output the > log of the operation that deleted the log, but the log is still referenced > and the disk space has not been released -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16378) Under tiered storage, deleting local logs does not free disk space
Jianbin Chen created KAFKA-16378: Summary: Under tiered storage, deleting local logs does not free disk space Key: KAFKA-16378 URL: https://issues.apache.org/jira/browse/KAFKA-16378 Project: Kafka Issue Type: Wish Affects Versions: 3.7.0 Reporter: Jianbin Chen Attachments: image-2024-03-15-09-33-13-903.png Of course, this is an occasional phenomenon, as long as the tiered storage topic triggered the deletion of the local log action, there is always the possibility of residual file references, but these files on the local disk is already impossible to find! I use the implementation as: [Aiven-Open/tiered-storage-for-apache-kafka: RemoteStorageManager for Apache Kafka® Tiered Storage (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka] I also filed an issue in their community, which also contains a full description of the problem [Disk space not released · Issue #513 · Aiven-Open/tiered-storage-for-apache-kafka (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/513] !image-2024-03-15-09-33-13-903.png! You can clearly see in this figure that the kafka log has already output the log of the operation that deleted the log, but the log is still referenced and the disk space has not been released -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]
chia7712 commented on code in PR #14471: URL: https://github.com/apache/kafka/pull/14471#discussion_r1525644188 ## tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +public class CsvUtils { +private final CsvMapper mapper = new CsvMapper(); + +ObjectReader readerFor(Class clazz) { +return mapper.readerFor(clazz).with(getSchema(clazz)); +} + +ObjectWriter writerFor(Class clazz) { +return mapper.writerFor(clazz).with(getSchema(clazz)); +} + +private CsvSchema getSchema(Class clazz) { +String[] fields; +if (CsvRecordWithGroup.class == clazz) +fields = CsvRecordWithGroup.FIELDS; +else if (CsvRecordNoGroup.class == clazz) +fields = CsvRecordNoGroup.FIELDS; +else +throw new IllegalStateException("Unhandled class " + clazz); + +return mapper.schemaFor(clazz).sortedBy(fields); +} + +public interface CsvRecord { +} + +public static class CsvRecordWithGroup extends CsvRecordNoGroup { Review Comment: this naming is weird to me. It should be fine to de-couple them by adding all fields to `CsvRecordWithGroup` ## tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +public class CsvUtils { +private final CsvMapper mapper = new CsvMapper(); + +ObjectReader readerFor(Class clazz) { +return mapper.readerFor(clazz).with(getSchema(clazz)); +} + +ObjectWriter writerFor(Class clazz) { +return mapper.writerFor(clazz).with(getSchema(clazz)); +} + +private CsvSchema getSchema(Class clazz) { +String[] fields; +if (CsvRecordWithGroup.class == clazz) +fields = CsvRecordWithGroup.FIELDS; +else if (CsvRecordNoGroup.class == clazz) +fields = CsvRecordNoGroup.FIELDS; +else +throw new IllegalStateException("Unhandled class " + clazz); + +return mapper.schemaFor(clazz).sortedBy(fields); +} + +public interface CsvRecord { Review Comment: Do we need this dumb interface? We use `ObjectWriter` to output strings and it can take `Object` directly. ## tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java: ## @@ -155,4 +157,17 @@ public static Set minus(Set set, T...toRemove) { return res; } +/** + * This is a simple wrapper around `CommandLineUtils.printUsageAndExit`. + * It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`. + * Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]]
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1525616031 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -40,18 +40,22 @@ public static BoundedList newArrayBacked(int maxLength) { } public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { +if (initialCapacity <= 0) { +throw new IllegalArgumentException("Invalid non-positive initialCapacity of " + initialCapacity); +} return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); } -public BoundedList(int maxLength, List underlying) { +private BoundedList(int maxLength, List underlying) { if (maxLength <= 0) { throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength); } -this.maxLength = maxLength; + if (underlying.size() > maxLength) { throw new BoundedListTooLongException("Cannot wrap list, because it is longer than " + -"the maximum length " + maxLength); +"the maximum length " + maxLength); Review Comment: Done, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]
jolshan commented on code in PR #15541: URL: https://github.com/apache/kafka/pull/15541#discussion_r1525612166 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -270,14 +270,7 @@ public void run() { while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); - -try { -// It is possible for the transaction manager to throw errors when aborting. Catch these -// so as not to interfere with the rest of the shutdown logic. -transactionManager.beginAbort(); -} catch (Exception e) { -log.error("Error in kafka producer I/O thread while aborting transaction: ", e); -} +transactionManager.beginAbort(); Review Comment: I wonder if we could keep the try catch block, but rather than just log the error, set forceClose to true cc: @kirktrue what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16377) Fix flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-16377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16377: Component/s: streams unit tests > Fix flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-16377 > URL: https://issues.apache.org/jira/browse/KAFKA-16377 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Major > > {quote} > [2024-03-13T16:07:11.125Z] Gradle Test Run :streams:test > Gradle Test > Executor 95 > HighAvailabilityTaskAssignorIntegrationTest > > shouldScaleOutWithWarmupTasksAndPersistentStores(String, TestInfo) > > "shouldScaleOutWithWarmupTasksAndPersistentStores(String, > TestInfo).balance_subtopology" FAILED > [2024-03-13T16:07:11.125Z] java.lang.AssertionError: the first assignment > after adding a node should be unstable while we warm up the state. > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.assertFalseNoRetry(HighAvailabilityTaskAssignorIntegrationTest.java:310) > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$7(HighAvailabilityTaskAssignorIntegrationTest.java:237) > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:395) > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:443) > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:392) > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:232) > [2024-03-13T16:07:11.125Z] at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:130) > {quote} > https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-15474/runs/12/nodes/9/steps/88/log/?start=0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]
CalvinConfluent opened a new pull request, #15541: URL: https://github.com/apache/kafka/pull/15541 This is a mitigation fix for the https://issues.apache.org/jira/browse/KAFKA-16217. Exceptions should not block closing the producers. This PR reverts a part of the change https://github.com/apache/kafka/pull/13591 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1525585193 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -40,18 +40,22 @@ public static BoundedList newArrayBacked(int maxLength) { } public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { +if (initialCapacity <= 0) { +throw new IllegalArgumentException("Invalid non-positive initialCapacity of " + initialCapacity); +} return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); } -public BoundedList(int maxLength, List underlying) { +private BoundedList(int maxLength, List underlying) { if (maxLength <= 0) { throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength); } -this.maxLength = maxLength; + if (underlying.size() > maxLength) { throw new BoundedListTooLongException("Cannot wrap list, because it is longer than " + -"the maximum length " + maxLength); +"the maximum length " + maxLength); Review Comment: please revert this one, thanks :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7663: Reprocessing on user added global stores restore [kafka]
wcarlson5 commented on code in PR #15414: URL: https://github.com/apache/kafka/pull/15414#discussion_r1525585101 ## streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java: ## @@ -76,6 +76,8 @@ public void init(final ProcessorContext context, false ); // register the store +open = true; Review Comment: The store needs to be open for the processor to write to it when starting up. This actually matches the rocks store where it is opened right before registering instead of afterwards -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16274) Update replica_scale_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16274: - Assignee: (was: Philip Nee) > Update replica_scale_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16274 > URL: https://issues.apache.org/jira/browse/KAFKA-16274 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{replica_scale_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16275: - Assignee: (was: Philip Nee) > Update kraft_upgrade_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16275 > URL: https://issues.apache.org/jira/browse/KAFKA-16275 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{kraft_upgrade_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16271) Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16271: - Assignee: (was: Philip Nee) > Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol > config > -- > > Key: KAFKA-16271 > URL: https://issues.apache.org/jira/browse/KAFKA-16271 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in > {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The tricky wrinkle here is that the existing test relies on client-side > assignment strategies that aren't applicable with the new KIP-848-enabled > consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16276) Update transactions_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16276: - Assignee: (was: Philip Nee) > Update transactions_test.py to support KIP-848’s group protocol config > -- > > Key: KAFKA-16276 > URL: https://issues.apache.org/jira/browse/KAFKA-16276 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{transactions_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The wrinkle here is that {{transactions_test.py}} was not able to run as-is. > That might deprioritize this until whatever is causing that is resolved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16272: - Assignee: (was: Philip Nee) > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1998592089 Rebased this due to merge conflict. @gaurav-narula I also made what I believe are some improvement to `BrokerLifecycleManagerTest.testKraftJBODMetadataVersionUpdateEvent`, can you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16352: Txn may get get stuck in PrepareCommit or PrepareAbort s… [kafka]
jolshan commented on code in PR #15524: URL: https://github.com/apache/kafka/pull/15524#discussion_r1525559114 ## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala: ## @@ -177,6 +178,86 @@ class TransactionMarkerChannelManagerTest { any()) } + @Test + def shouldNotLoseTxnCompletionAfterLoad(): Unit = { +mockCache() + +val expectedTransition = txnMetadata2.prepareComplete(time.milliseconds()) + +when(metadataCache.getPartitionLeaderEndpoint( + ArgumentMatchers.eq(partition1.topic), + ArgumentMatchers.eq(partition1.partition), + any()) +).thenReturn(Some(broker1)) + +// Build a successful client response. +val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1) +val successfulResponse = new WriteTxnMarkersResponse( + Collections.singletonMap(producerId2: java.lang.Long, Collections.singletonMap(partition1, Errors.NONE))) +val successfulClientResponse = new ClientResponse(header, null, null, + time.milliseconds(), time.milliseconds(), false, null, null, + successfulResponse) + +// Build a disconnected client response. +val disconnectedClientResponse = new ClientResponse(header, null, null, + time.milliseconds(), time.milliseconds(), true, null, null, + null) + +// Test matrix to cover various scenarios: +val clientResponses = Seq(successfulClientResponse, disconnectedClientResponse) +val getTransactionStateResponses = Seq( + // NOT_COORDINATOR error case + Left(Errors.NOT_COORDINATOR), + // COORDINATOR_LOAD_IN_PROGRESS + Left(Errors.COORDINATOR_LOAD_IN_PROGRESS), + // "Newly loaded" transaction state with the new epoch. + Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch2, txnMetadata2))) +) + +clientResponses.foreach { clientResponse => + getTransactionStateResponses.foreach { getTransactionStateResponse => +// Reset data from previous iteration. +txnMetadata2.topicPartitions.add(partition1) +clearInvocations(txnStateManager) +// Send out markers for a transaction before load. +channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult, + txnMetadata2, expectedTransition) + +// Drain the marker to make it "in-flight". +val requests1 = channelManager.generateRequests().asScala +assertEquals(1, requests1.size) + +// Simulate a partition load: Review Comment: To confirm my understanding, this test simulates sending out marker requests with various responses. We want to send out the requests and then unload and load the pending markers so that we bump the epoch and send out the requests again. This test ensures that when we do these operations, we still correctly complete the transaction when we get a successful response after reloading. Is there ever a case of `getTransactionStateResponse` where we have the old epoch? I noticed we have the case with the new epoch. Also in the case with the new epoch, do we want to confirm the transaction doesn't complete? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998576991 Thanks for all reviews and @showuon. This is not only a important fix to kafka but also a great experience to me :) @showuon Could you please backport this fix? thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]
chia7712 commented on PR #15476: URL: https://github.com/apache/kafka/pull/15476#issuecomment-1998570706 @johnnychhsu Could you please rebase code and address following comments. 1. update KIP-734 according to https://github.com/apache/kafka/pull/15476#discussion_r1523677272 2. complete the tests according to https://github.com/apache/kafka/pull/15474#discussion_r1524067155 3. remove the shallow part in RecordsInfo.shallowOffsetOfMaxTimestamp according to https://github.com/apache/kafka/pull/15474#discussion_r1520262997 Sorry that we bring some extra works to this PR, and thank to you for taking over this PR :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
[ https://issues.apache.org/jira/browse/KAFKA-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-14971. Resolution: Duplicate KAFKA-15945 has filed PR so close this ticket > Flaky Test > org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs > -- > > Key: KAFKA-14971 > URL: https://issues.apache.org/jira/browse/KAFKA-14971 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Priority: Major > Labels: flaky-test, mirror-maker > > The test testSyncTopicConfigs in > `org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs` > seems to be flaky. Found here : > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests] > Ran on local against the [same PR > |https://github.com/apache/kafka/pull/13594]and it passed. > > > {code:java} > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, > because it's explicitly defined on the target topic! ==> expected: <2000> but > was: <8640> > Stacktrace > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, > because it's explicitly defined on the target topic! ==> expected: <2000> but > was: <8640> > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) > at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:2
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 merged PR #15474: URL: https://github.com/apache/kafka/pull/15474 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15523) Flaky test org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs
[ https://issues.apache.org/jira/browse/KAFKA-15523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15523. Resolution: Duplicate KAFKA-15945 has filed PR so close this ticket > Flaky test > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs > --- > > Key: KAFKA-15523 > URL: https://issues.apache.org/jira/browse/KAFKA-15523 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.6.0, 3.5.1 >Reporter: Josep Prat >Priority: Major > Labels: flaky, flaky-test > > Last seen: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationSSLTest/Build___JDK_17_and_Scala_2_13___testSyncTopicConfigs__/] > > h3. Error Message > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Topic: mm2-status.backup.internal didn't get created in the cluster ==> > expected: but was: {code} > h3. Stacktrace > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Topic: mm2-status.backup.internal didn't get created in the cluster ==> > expected: but was: at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at > app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitForTopicCreated(MirrorConnectorsIntegrationBaseTest.java:1041) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:224) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:63) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoke
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1998564199 the failed tests pass on my local. ```sh ./gradlew cleanTest core:test --tests LogDirFailureTest --tests ReplicaManagerTest connect:mirror:test --tests MirrorConnectorsIntegrationTransactionsTest streams:test --tests HighAvailabilityTaskAssignorIntegrationTest ``` the links for those flaky are shown below: https://issues.apache.org/jira/browse/KAFKA-16225 https://issues.apache.org/jira/browse/KAFKA-16376 https://issues.apache.org/jira/browse/KAFKA-16377 https://issues.apache.org/jira/browse/KAFKA-15945 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16377) Fix flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
Chia-Ping Tsai created KAFKA-16377: -- Summary: Fix flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores Key: KAFKA-16377 URL: https://issues.apache.org/jira/browse/KAFKA-16377 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai {quote} [2024-03-13T16:07:11.125Z] Gradle Test Run :streams:test > Gradle Test Executor 95 > HighAvailabilityTaskAssignorIntegrationTest > shouldScaleOutWithWarmupTasksAndPersistentStores(String, TestInfo) > "shouldScaleOutWithWarmupTasksAndPersistentStores(String, TestInfo).balance_subtopology" FAILED [2024-03-13T16:07:11.125Z] java.lang.AssertionError: the first assignment after adding a node should be unstable while we warm up the state. [2024-03-13T16:07:11.125Z] at org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.assertFalseNoRetry(HighAvailabilityTaskAssignorIntegrationTest.java:310) [2024-03-13T16:07:11.125Z] at org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$7(HighAvailabilityTaskAssignorIntegrationTest.java:237) [2024-03-13T16:07:11.125Z] at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:395) [2024-03-13T16:07:11.125Z] at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:443) [2024-03-13T16:07:11.125Z] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:392) [2024-03-13T16:07:11.125Z] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) [2024-03-13T16:07:11.125Z] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) [2024-03-13T16:07:11.125Z] at org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:232) [2024-03-13T16:07:11.125Z] at org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:130) {quote} https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-15474/runs/12/nodes/9/steps/88/log/?start=0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16376) Fix flaky ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric
Chia-Ping Tsai created KAFKA-16376: -- Summary: Fix flaky ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric Key: KAFKA-16376 URL: https://issues.apache.org/jira/browse/KAFKA-16376 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai {quote} [2024-03-13T17:22:47.835Z] > Task :core:test [2024-03-13T17:22:47.835Z] kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric() failed, log available in /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15474/core/build/reports/testOutput/kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric().test.stdout [2024-03-13T17:22:47.835Z] [2024-03-13T17:22:49.409Z] Gradle Test Run :core:test > Gradle Test Executor 97 > ReplicaManagerTest > testRemoteFetchExpiresPerSecMetric() FAILED [2024-03-13T17:22:49.409Z] org.opentest4j.AssertionFailedError: The ExpiresPerSec value is not incremented. Current value is: 0 [2024-03-13T17:22:49.409Z] at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) [2024-03-13T17:22:49.409Z] at org.junit.jupiter.api.Assertions.fail(Assertions.java:138) [2024-03-13T17:22:49.409Z] at kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric(ReplicaManagerTest.scala:4174) {quote} https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-15474/runs/12/nodes/9/steps/88/log/?start=0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1525523190 ## server-common/src/test/java/org/apache/kafka/server/mutable/BoundedListTest.java: ## @@ -165,18 +199,24 @@ public void testIterator() { @Test public void testIteratorIsImmutable() { -BoundedList list = new BoundedList<>(3, new ArrayList<>(Arrays.asList(1, 2, 3))); +BoundedList list = BoundedList.newArrayBacked(3); +list.add(1); +list.add(2); +list.add(3); assertThrows(UnsupportedOperationException.class, -() -> list.iterator().remove()); +() -> list.iterator().remove()); Review Comment: @chia7712 Resolved indenting changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16269: Update reassign_partitions_test.py to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15540: URL: https://github.com/apache/kafka/pull/15540 Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢 Note: this requires #15330. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16268: Update fetch_from_follower_test.py to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15539: URL: https://github.com/apache/kafka/pull/15539 Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢 Note: this requires #15330. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16270: Update snapshot_test.py to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15538: URL: https://github.com/apache/kafka/pull/15538 Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢 Note: this requires #15330. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config [kafka]
kirktrue opened a new pull request, #15537: URL: https://github.com/apache/kafka/pull/15537 Added a new optional `group_protocol` parameter to the test methods, then passed that down to the `setup_consumer` method. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new `@matrix` block instead of adding the `group_protocol=["classic", "consumer"]` to the existing blocks 😢 Note: this requires #15330. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16276) Update transactions_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16276: - Assignee: Philip Nee (was: Kirk True) > Update transactions_test.py to support KIP-848’s group protocol config > -- > > Key: KAFKA-16276 > URL: https://issues.apache.org/jira/browse/KAFKA-16276 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{transactions_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The wrinkle here is that {{transactions_test.py}} was not able to run as-is. > That might deprioritize this until whatever is causing that is resolved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16273) Update consume_bench_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16273: - Assignee: Philip Nee (was: Kirk True) > Update consume_bench_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16273 > URL: https://issues.apache.org/jira/browse/KAFKA-16273 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{consume_bench_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16271) Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16271: - Assignee: Philip Nee (was: Kirk True) > Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol > config > -- > > Key: KAFKA-16271 > URL: https://issues.apache.org/jira/browse/KAFKA-16271 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in > {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The tricky wrinkle here is that the existing test relies on client-side > assignment strategies that aren't applicable with the new KIP-848-enabled > consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16272: - Assignee: Philip Nee (was: Kirk True) > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16275: - Assignee: Philip Nee (was: Kirk True) > Update kraft_upgrade_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16275 > URL: https://issues.apache.org/jira/browse/KAFKA-16275 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{kraft_upgrade_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16274) Update replica_scale_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16274: - Assignee: Philip Nee (was: Kirk True) > Update replica_scale_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16274 > URL: https://issues.apache.org/jira/browse/KAFKA-16274 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{replica_scale_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1525490485 ## server-common/src/test/java/org/apache/kafka/server/mutable/BoundedListTest.java: ## @@ -165,18 +199,24 @@ public void testIterator() { @Test public void testIteratorIsImmutable() { -BoundedList list = new BoundedList<>(3, new ArrayList<>(Arrays.asList(1, 2, 3))); +BoundedList list = BoundedList.newArrayBacked(3); +list.add(1); +list.add(2); +list.add(3); assertThrows(UnsupportedOperationException.class, -() -> list.iterator().remove()); +() -> list.iterator().remove()); Review Comment: ditto ## server-common/src/test/java/org/apache/kafka/server/mutable/BoundedListTest.java: ## @@ -165,18 +199,24 @@ public void testIterator() { @Test public void testIteratorIsImmutable() { -BoundedList list = new BoundedList<>(3, new ArrayList<>(Arrays.asList(1, 2, 3))); +BoundedList list = BoundedList.newArrayBacked(3); +list.add(1); +list.add(2); +list.add(3); assertThrows(UnsupportedOperationException.class, -() -> list.iterator().remove()); +() -> list.iterator().remove()); assertThrows(UnsupportedOperationException.class, -() -> list.listIterator().remove()); +() -> list.listIterator().remove()); } @Test public void testSubList() { -BoundedList list = new BoundedList<>(3, new ArrayList<>(Arrays.asList(1, 2, 3))); +BoundedList list = BoundedList.newArrayBacked(3); +list.add(1); +list.add(2); +list.add(3); assertEquals(Arrays.asList(2), list.subList(1, 2)); assertThrows(UnsupportedOperationException.class, -() -> list.subList(1, 2).remove(2)); +() -> list.subList(1, 2).remove(2)); Review Comment: Could you please remove the indentation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15691) Update system tests to use AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15691: -- Summary: Update system tests to use AsyncKafkaConsumer (was: Update system tests to use new consumer) > Update system tests to use AsyncKafkaConsumer > - > > Key: KAFKA-15691 > URL: https://issues.apache.org/jira/browse/KAFKA-15691 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15691) Add new system tests to use new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15691: -- Priority: Blocker (was: Major) > Add new system tests to use new consumer > > > Key: KAFKA-15691 > URL: https://issues.apache.org/jira/browse/KAFKA-15691 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-15691) Add new system tests to use new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reopened KAFKA-15691: --- > Add new system tests to use new consumer > > > Key: KAFKA-15691 > URL: https://issues.apache.org/jira/browse/KAFKA-15691 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15691) Update system tests to use new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15691: -- Summary: Update system tests to use new consumer (was: Add new system tests to use new consumer) > Update system tests to use new consumer > --- > > Key: KAFKA-15691 > URL: https://issues.apache.org/jira/browse/KAFKA-15691 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16269) Update reassign_partitions_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16269: -- Description: This task is to update the test method(s) in {{reassign_partitions_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was: This task is to update the test method(s) in {{reassign_partitions_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update reassign_partitions_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16269 > URL: https://issues.apache.org/jira/browse/KAFKA-16269 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{reassign_partitions_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16270) Update snapshot_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16270: -- Description: This task is to update the test method(s) in {{snapshot_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was:This task is to update the test method(s) in {{snapshot_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update snapshot_test.py to support KIP-848’s group protocol config > -- > > Key: KAFKA-16270 > URL: https://issues.apache.org/jira/browse/KAFKA-16270 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{snapshot_test.py}} to support > the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16275: -- Description: This task is to update the test method(s) in {{kraft_upgrade_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was:This task is to update the test method(s) in {{kraft_upgrade_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update kraft_upgrade_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16275 > URL: https://issues.apache.org/jira/browse/KAFKA-16275 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{kraft_upgrade_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16273) Update consume_bench_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16273: -- Description: This task is to update the test method(s) in {{consume_bench_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was:This task is to update the test method(s) in {{consume_bench_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update consume_bench_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16273 > URL: https://issues.apache.org/jira/browse/KAFKA-16273 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{consume_bench_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16267) Update consumer_group_command_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16267: -- Description: This task is to update the test method(s) in {{consumer_group_command_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was:This task is to update the test method(s) in {{consumer_group_command_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update consumer_group_command_test.py to support KIP-848’s group protocol > config > > > Key: KAFKA-16267 > URL: https://issues.apache.org/jira/browse/KAFKA-16267 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in > {{consumer_group_command_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16269) Update reassign_partitions_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16269: -- Description: This task is to update the test method(s) in {{reassign_partitions_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. was:This task is to update the test method(s) in {{reassign_partitions_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update reassign_partitions_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16269 > URL: https://issues.apache.org/jira/browse/KAFKA-16269 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{reassign_partitions_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16268) Update fetch_from_follower_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16268: -- Description: This task is to update the test method(s) in {{fetch_from_follower_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was:This task is to update the test method(s) in {{fetch_from_follower_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update fetch_from_follower_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16268 > URL: https://issues.apache.org/jira/browse/KAFKA-16268 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{fetch_from_follower_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16272: -- Description: This task is to update the test method(s) in {{connect_distributed_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was:This task is to update the test method(s) in {{connect_distributed_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16274) Update replica_scale_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16274: -- Description: This task is to update the test method(s) in {{replica_scale_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. was:This task is to update the test method(s) in {{replica_scale_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. > Update replica_scale_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16274 > URL: https://issues.apache.org/jira/browse/KAFKA-16274 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{replica_scale_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16271) Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16271: -- Description: This task is to update the test method(s) in {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. The tricky wrinkle here is that the existing test relies on client-side assignment strategies that aren't applicable with the new KIP-848-enabled consumer. was: This task is to update the test method(s) in {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. The tricky wrinkle here is that the existing test relies on client-side assignment strategies that aren't applicable with the new KIP-848-enabled consumer. > Update consumer_rolling_upgrade_test.py to support KIP-848’s group protocol > config > -- > > Key: KAFKA-16271 > URL: https://issues.apache.org/jira/browse/KAFKA-16271 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in > {{consumer_rolling_upgrade_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The tricky wrinkle here is that the existing test relies on client-side > assignment strategies that aren't applicable with the new KIP-848-enabled > consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16231) Update consumer_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16231: -- Description: This task is to update {{consumer_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. For example, here's how it would look to add the new group_protocol parameter to the parameterized tests: {code:python} @cluster(num_nodes=6) @matrix( assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor", "org.apache.kafka.clients.consumer.StickyAssignor"], metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True], group_protocol=["classic", "consumer"] ) def test_the_consumer(self, assignment_strategy, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol="classic"): consumer = self.setup_consumer("my_topic", group_protocol=group_protocol) {code} The {{group_protocol}} parameter will default to {{{}classic{}}}. {*}Note{*}: we only test the new group protocol setting when {{use_new_coordinator}} is {{{}True{}}}, as that is the only supported mode. was: This task is to update {{consumer_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. For example, here's how it would look to add the new group_protocol parameter to the parameterized tests: {code:python} @cluster(num_nodes=6) @matrix( assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor", "org.apache.kafka.clients.consumer.StickyAssignor"], metadata_quorum=[quorum.zk], use_new_coordinator=[False] ) @matrix( assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor", "org.apache.kafka.clients.consumer.StickyAssignor"], metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True], group_protocol=["classic", "consumer"] ) def test_the_consumer(self, assignment_strategy, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol="classic"): consumer = self.setup_consumer("my_topic", group_protocol=group_protocol) {code} The {{group_protocol}} parameter will default to {{{}classic{}}}. {*}Note{*}: we only test the new group protocol setting when {{use_new_coordinator}} is {{{}True{}}}, as that is the only supported mode. > Update consumer_test.py to support KIP-848’s group protocol config > -- > > Key: KAFKA-16231 > URL: https://issues.apache.org/jira/browse/KAFKA-16231 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update {{consumer_test.py}} to support the {{group.protocol}} > configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > For example, here's how it would look to add the new group_protocol parameter > to the parameterized tests: > {code:python} > @cluster(num_nodes=6) > @matrix( > > assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", > > "org.apache.kafka.clients.consumer.RoundRobinAssignor", > > "org.apache.kafka.clients.consumer.StickyAssignor"], > metadata_quorum=[quorum.zk, quorum.isolated_kraft], > use_new_coordinator=[False] > ) > @matrix( > metadata_quorum=[quorum.isolated_kraft], > use_new_coordinator=[True], > group_protocol=["classic", "consumer"] > ) > def test_the_consumer(self,
[jira] [Updated] (KAFKA-16276) Update transactions_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16276: -- Description: This task is to update the test method(s) in {{transactions_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. See KAFKA-16231 as an example of how the test parameters can be changed. The wrinkle here is that {{transactions_test.py}} was not able to run as-is. That might deprioritize this until whatever is causing that is resolved. was: This task is to update the test method(s) in {{transactions_test.py}} to support the {{group.protocol}} configuration introduced in [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] by adding an optional {{group_protocol}} argument to the tests and matrixes. The wrinkle here is that {{transactions_test.py}} was not able to run as-is. That might deprioritize this until whatever is causing that is resolved. > Update transactions_test.py to support KIP-848’s group protocol config > -- > > Key: KAFKA-16276 > URL: https://issues.apache.org/jira/browse/KAFKA-16276 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{transactions_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. > The wrinkle here is that {{transactions_test.py}} was not able to run as-is. > That might deprioritize this until whatever is causing that is resolved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on PR #15511: URL: https://github.com/apache/kafka/pull/15511#issuecomment-1998471082 Thanks for the updates @lucasbru, left some minor comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16375: --- Description: The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining. (was: The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining. As a potential improvement, consider if the member could keep the reconciliation if it rejoined but got the same assignment.) > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525452774 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -976,6 +974,10 @@ void maybeReconcile() { } revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions); +}).whenComplete((__, error) -> { +if (error != null) { +log.error("Reconciliation failed.", error); Review Comment: Is there a reason why we want this log here? We already have the same but inside the `revokeAndAssign`, when reconciliation completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
ChrisAHolland commented on PR #15507: URL: https://github.com/apache/kafka/pull/15507#issuecomment-1998410642 @chia7712 Thank you, updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525446624 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1028,9 +1028,9 @@ public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() { verifyReconciliationNotTriggered(membershipManager); membershipManager.poll(time.milliseconds()); +membershipManager.onHeartbeatRequestSent(); Review Comment: I would suggest we add the check that a reconciliation was triggered here, just adding `verifyReconciliationTriggeredAndCompleted(membershipManager, Collections.emptyList());` right after poll. It's part of what this PR is introducing and it completes the pic of what's happening when getting the first (empty) assignment that can be reconciled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]
chia7712 commented on code in PR #15507: URL: https://github.com/apache/kafka/pull/15507#discussion_r1525419354 ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -40,18 +40,17 @@ public static BoundedList newArrayBacked(int maxLength) { } public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { +if (initialCapacity <= 0) { +throw new IllegalArgumentException("Invalid non-positive initialCapacity of " + initialCapacity); +} return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); } -public BoundedList(int maxLength, List underlying) { +private BoundedList(int maxLength, List underlying) { if (maxLength <= 0) { throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength); } this.maxLength = maxLength; -if (underlying.size() > maxLength) { Review Comment: this check is still useful to me since all helpers are based on this constructor, and so it is a good safety catch. ## server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java: ## @@ -40,18 +40,17 @@ public static BoundedList newArrayBacked(int maxLength) { } public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { +if (initialCapacity <= 0) { Review Comment: nice check! ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -172,8 +172,7 @@ ControllerResult> incrementalAlterConfigs( Map>> configChanges, boolean newlyCreatedResource ) { -List outputRecords = Review Comment: Could you please revert those changes? the simpler the better :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525418611 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2279,22 +2277,23 @@ private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscri return mockJoinAndReceiveAssignment(expectSubscriptionUpdated, createAssignment(expectSubscriptionUpdated)); } -private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean expectSubscriptionUpdated, +private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean triggerReconciliation, Review Comment: Agree with the renamed param, but could we update it also in the same overloaded method above, just for consistency -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]
chia7712 merged PR #15506: URL: https://github.com/apache/kafka/pull/15506 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525386165 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -911,9 +911,13 @@ void maybeReconcile() { SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); final LocalAssignmentImpl resolvedAssignment = new LocalAssignmentImpl(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); -if (resolvedAssignment.equals(currentAssignment)) { -log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} " + -"is equal to the member current assignment.", resolvedAssignment); +if (currentAssignment != LocalAssignmentImpl.NONE && +resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 && + resolvedAssignment.partitions.equals(currentAssignment.partitions)) { +log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " + Review Comment: just to consider maybe simplifying the log and clarify the situation: isn't the message here simply that we're ignoring the reconciliation because resolved target is equals to the current assignment? I get the point about intermediate assignments, but an intermediate one would have updated the current assignment so it wouldn't be equals to the resolved target (or leave a reconciliation in progress so it wouldn't even make it to this check) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16375: --- Priority: Critical (was: Major) > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining. > As a potential improvement, consider if the member could keep the > reconciliation if it rejoined but got the same assignment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lianetm commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1525360383 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -988,7 +989,8 @@ long getExpirationTimeForTimeout(final long timeoutMs) { * then complete the reconciliation by updating the assignment and making the appropriate state * transition. Note that if any of the 2 callbacks fails, the reconciliation should fail. */ -private void revokeAndAssign(SortedSet assignedTopicIdPartitions, +private void revokeAndAssign(LocalAssignmentImpl resolvedAssignment, Review Comment: It was this same one [KAFKA-16185](https://issues.apache.org/jira/browse/KAFKA-16185) thought for handling the situation around discarding reconciliations (which is what that rejoin check is for). But I had missed it in the review too. I just created [KAFKA-16375](https://issues.apache.org/jira/browse/KAFKA-16375) to handle it in a follow-up PR (probably moving away of all epochs to cover that edge case, and identifying the rejoin simply by the member going through a transition to join). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
Lianet Magrans created KAFKA-16375: -- Summary: Fix logic for discarding reconciliation if member rejoined Key: KAFKA-16375 URL: https://issues.apache.org/jira/browse/KAFKA-16375 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining. As a potential improvement, consider if the member could keep the reconciliation if it rejoined but got the same assignment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: AddPartitionsToTxnManager performance optimizations [kafka]
jolshan commented on PR #15454: URL: https://github.com/apache/kafka/pull/15454#issuecomment-1998091660 Hmm. I was rerunning just to confirm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AddPartitionsToTxnManager performance optimizations [kafka]
mimaison merged PR #15454: URL: https://github.com/apache/kafka/pull/15454 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
kphelps opened a new pull request, #15536: URL: https://github.com/apache/kafka/pull/15536 When there is a commit for multiple topic partitions and some, but not all, exceed the offset metadata limit, the pending commit is not properly cleaned up leading to `UNSTABLE_OFFSET_COMMIT` errors when trying to fetch the offsets with `read_committed`. This change makes it so the invalid commits are not added to the `pendingOffsetCommits` set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AddPartitionsToTxnManager performance optimizations [kafka]
mimaison commented on PR #15454: URL: https://github.com/apache/kafka/pull/15454#issuecomment-1998016477 Test failures don't seem related (all passed locally), merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [WIP] Splitting consumer tests [kafka]
lianetm opened a new pull request, #15535: URL: https://github.com/apache/kafka/pull/15535 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change link to deprecated class to substitute class [kafka]
cadonna closed pull request #15531: MINOR: Change link to deprecated class to substitute class URL: https://github.com/apache/kafka/pull/15531 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change link to deprecated class to substitute class [kafka]
cadonna commented on PR #15531: URL: https://github.com/apache/kafka/pull/15531#issuecomment-1997917052 Closing because it does not make sense to only change a link when the rest of the content is out-of-date. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16373) Docker Official Image for Apache Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827162#comment-17827162 ] Kenneth Eversole commented on KAFKA-16373: -- This already seems to have been done in a previous KIP https://issues.apache.org/jira/browse/KAFKA-15445 > Docker Official Image for Apache Kafka > -- > > Key: KAFKA-16373 > URL: https://issues.apache.org/jira/browse/KAFKA-16373 > Project: Kafka > Issue Type: New Feature >Affects Versions: 3.8.0 >Reporter: Krish Vora >Assignee: Krish Vora >Priority: Major > Labels: KIP-1028 > > KIP-1028: Docker Official Image for Apache Kafka: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1028%3A+Docker+Official+Image+for+Apache+Kafka] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1525144034 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Modified the emitNonJoinedOuterRecords-method for asymmetric windowing. Added a unit-test for asymmetric windows in KStreamKStreamOuterJoinTest.java. It fails if we break at the first timestamp that is too large. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]
VictorvandenHoven commented on code in PR #15510: URL: https://github.com/apache/kafka/pull/15510#discussion_r1525144034 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords( try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; +boolean outerJoinLeftBreak = false; +boolean outerJoinRightBreak = false; while (it.hasNext()) { -boolean outerJoinLeftBreak = false; -boolean outerJoinRightBreak = false; Review Comment: Modified the emitNonJoinedOuterRecords-method for asymmetric windowing. Added a unit-test for asymmetric windows in a KStreamKStreamOuterJoinTest.java. It fails if we break at the first timestamp that is too large. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827134#comment-17827134 ] BDeus edited comment on KAFKA-16361 at 3/14/24 3:47 PM: Is it a regression related to this feature https://issues.apache.org/jira/browse/KAFKA-14450 ? was (Author: baz33): Is it a regression related to this feature [https://issues.apache.org/jira/browse/KAFKA-14450|http://example.com] ? > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = > topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], > offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, parti
[jira] [Comment Edited] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827134#comment-17827134 ] BDeus edited comment on KAFKA-16361 at 3/14/24 3:46 PM: Is it a regression related to this feature [https://issues.apache.org/jira/browse/KAFKA-14450|http://example.com] ? was (Author: baz33): Is it a regression related to this ticket https://issues.apache.org/jira/browse/KAFKA-14867 ? > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = > topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], > offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, partit
[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827134#comment-17827134 ] BDeus commented on KAFKA-16361: --- Is it a regression related to this ticket https://issues.apache.org/jira/browse/KAFKA-14867 ? > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = > topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], > offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, partition = 66, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 4, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Part
Re: [PR] KAFKA-16226; Reduce synchronization between producer threads (#15323) [kafka]
msn-tldr commented on PR #15498: URL: https://github.com/apache/kafka/pull/15498#issuecomment-1997688081 @ijuma thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226; Reduce synchronization between producer threads (#15323) [kafka]
msn-tldr commented on PR #15493: URL: https://github.com/apache/kafka/pull/15493#issuecomment-199768 @ijuma thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]
lucasbru commented on code in PR #15511: URL: https://github.com/apache/kafka/pull/15511#discussion_r1524575611 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1497,4 +1502,93 @@ public PollResult poll(final long currentTimeMs) { List stateListeners() { return unmodifiableList(stateUpdatesListeners); } + +private final static class LocalAssignmentImpl implements LocalAssignment { + +private static final long NONE_EPOCH = -1; + +private static final LocalAssignmentImpl NONE = new LocalAssignmentImpl(NONE_EPOCH, Collections.emptyMap()); + +private final long localEpoch; + +private final Map> partitions; + +public LocalAssignmentImpl(long localEpoch, Map> partitions) { +this.localEpoch = localEpoch; +this.partitions = partitions; +if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +} + +public LocalAssignmentImpl(long localEpoch, SortedSet topicIdPartitions) { +this.localEpoch = localEpoch; +this.partitions = new HashMap<>(); +if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) { +throw new IllegalArgumentException("Local epoch must be set if there are partitions"); +} +topicIdPartitions.forEach(topicIdPartition -> { +Uuid topicId = topicIdPartition.topicId(); +partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition()); +}); +} + +@Override +public String toString() { +return "{" + +"localEpoch=" + localEpoch + +", partitions=" + partitions + +'}'; +} + +@Override +public boolean equals(final Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +final LocalAssignmentImpl that = (LocalAssignmentImpl) o; +return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions); +} + +@Override +public int hashCode() { +return Objects.hash(localEpoch, partitions); +} + +@Override +public Map> getPartitions() { +return partitions; +} + +@Override +public boolean isNone() { +return localEpoch == NONE_EPOCH; +} + +Optional updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) { + +// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection +if (localEpoch != NONE_EPOCH) { +// check if the new assignment is different from the current target assignment Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -529,25 +530,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); -// InstanceId - only sent if has changed since the last heartbeat -// Always send when leaving the group as a static member -membershipManager.groupInstanceId().ifPresent(groupInstanceId -> { -if (!groupInstanceId.equals(sentFields.instanceId) || membershipManager.memberEpoch() == ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) { -data.setInstanceId(groupInstanceId); -sentFields.instanceId = groupInstanceId; -} -}); +// InstanceId - always send when leaving the group as a static member +membershipManager.groupInstanceId().ifPresent(data::setInstanceId); -// RebalanceTimeoutMs - only sent if has changed since the last heartbeat -if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { +// RebalanceTimeoutMs - only sent when joining +if (membershipManager.memberEpoch() == 0) { Review Comment: Done ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -1952,19 +1942,22 @@ private void assertStaleMemberLeavesGroupAndClearsAssignment(MembershipManagerIm // Should reset epoch to leave the group and release the assignment (right away because // there is no onPartitionsLost callback defined) verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); -assertTrue(membershipManager.currentAssignment().isEmpty()); +