Re: [PR] KAFKA-16045 Fix flaky testMigrateTopicDeletions [kafka]
chia7712 commented on PR #15082: URL: https://github.com/apache/kafka/pull/15082#issuecomment-2084345952 @mumrah Do you still work at this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]
mjsax commented on PR #15615: URL: https://github.com/apache/kafka/pull/15615#issuecomment-2084340094 @AyoubOm -- Thanks for the PR! I did not forget about it. Just did not find time yet to take a look. Still plan to get this merged for 3.8 release... Sorry for the long wait, and thanks for being patient! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12317: fix documentation [kafka]
mjsax commented on PR #15689: URL: https://github.com/apache/kafka/pull/15689#issuecomment-2084338816 Thanks for the PR @florin-akermann. I agree with @AyoubOm that both statements you intend to remove are still correct? Seems the stream-stream join section did not talk about null-keys and thus left/outer join do not need an update? Might be worth to update the stream-stream inner join row, to point out that null-key records are dropped? I also just checked FK-left join, and it still says (this statement is missing for inner-join though): > Records for which the foreignKeyExtractor produces null are ignored and do not trigger a join. If you want to join with null foreign keys, use a suitable sentinel value to do so (i.e. "NULL" for a String field, or -1 for an auto-incrementing integer field). It seems https://github.com/apache/kafka/pull/14107 remove it incorrectly for inner join instead of left join? Might be worth to go over the docs once more to double check (I made a pass, but might have missed something). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
chia7712 commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1584100563 ## core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java: ## @@ -106,6 +106,7 @@ public List getAdditionalExtensions() { }, (AfterTestExecutionCallback) context -> clusterShim.stop(), new ClusterInstanceParameterResolver(clusterShim), +new GenericParameterResolver<>(clusterShim, ZkClusterInstance.class), Review Comment: This change involve the conflicts. For another, it seems to me this could encourage developers to use specific type of `ClusterInstance`. I agree there are some test cases requiring the specific cluster type. They can cast the `ClusterInstance` to either `zk` and `Raft` so it seems to me enabling injection is overkill. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
chia7712 commented on code in PR #15808: URL: https://github.com/apache/kafka/pull/15808#discussion_r1584095279 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -1086,4 +1087,10 @@ class RemoteIndexCacheTest { case e @ (_ : NoSuchFileException | _ : UncheckedIOException) => Optional.empty() } } + + private def numThreadsRunning(threadNamePrefix: String, isDaemon: Boolean): mutable.Set[Thread] = { Review Comment: We can fix the build error by rewriting this function via java lambda. For example: ```scala private def numThreadsRunning(threadNamePrefix: String, isDaemon: Boolean): java.util.Set[Thread] = { Thread.getAllStackTraces.keySet.stream().filter { t => isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) }.collect(Collectors.toSet) } ``` ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -23,25 +23,26 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.RemoteIndexCache.{DIR_NAME, Entry, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName} -import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex} +import org.apache.kafka.storage.internals.log.RemoteIndexCache._ +import org.apache.kafka.storage.internals.log._ import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.any -import org.mockito.invocation.InvocationOnMock import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock import org.slf4j.{Logger, LoggerFactory} -import java.io.{File, FileInputStream, IOException, PrintWriter, UncheckedIOException} +import java.io._ Review Comment: We should avoid using `_` since it could cause naming conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]
mjsax closed pull request #14446: KIP-759: new DSL operation on KStreams interface URL: https://github.com/apache/kafka/pull/14446 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]
mjsax commented on PR #14446: URL: https://github.com/apache/kafka/pull/14446#issuecomment-2084319053 Closing this PR. Replace by #15740 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842229#comment-17842229 ] Matthias J. Sax commented on KAFKA-16514: - Cool. You can find details on the wiki: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] – if you don't get to it, also totally ok; Sophie mentioned that she might also be able to pick it up (but most likely not for 3.8 release...) (Btw: the wiki account creation via self-service is currently broken, but we can create an account manually if you don't have one.) Would you be interested to do a PR to update the JavaDocs in the meantime to fix them? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
mjsax commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2084303833 @ashmeet13 -- any updates on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16644: Description: We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a left-hand side record is deleted, the join now emits two tombstone records instead of one. The problem was not detected via unit test, because the tests use a `Map` instead of a `List` when verifying the result topic records ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] We should update all test cases to use `List` when reading from the output topic, and of course fix the introduced bug: The `SubscriptionSendProcessorSupplier` is sending two subscription records instead of just a single one: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] was: We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a left-hand side record is deleted, the join now emits two tombstone records instead of one. The problem was not detected via unit test, because the test use a `Map` instead of a `List` when verifying the result topic records ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] We should update all test cases to use `List` when reading from the output topic, and of course fix the introduces bug: The `SubscriptionSendProcessorSupplier` is sending two subscription records instead of just a single one: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16644: Summary: FK join emits duplicate tombstone on left-side delete (was: FK join emit duplicate tombstone on left-side delete) > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the test use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduces bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16644) FK join emit duplicate tombstone on left-side delete
Matthias J. Sax created KAFKA-16644: --- Summary: FK join emit duplicate tombstone on left-side delete Key: KAFKA-16644 URL: https://issues.apache.org/jira/browse/KAFKA-16644 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0 Reporter: Matthias J. Sax We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a left-hand side record is deleted, the join now emits two tombstone records instead of one. The problem was not detected via unit test, because the test use a `Map` instead of a `List` when verifying the result topic records ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] We should update all test cases to use `List` when reading from the output topic, and of course fix the introduces bug: The `SubscriptionSendProcessorSupplier` is sending two subscription records instead of just a single one: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
mjsax commented on code in PR #14107: URL: https://github.com/apache/kafka/pull/14107#discussion_r1584066077 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java: ## @@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) { @Override public void process(final Record> record) { +// clear cashed hash from previous record +recordHash = null; // drop out-of-order records from versioned tables (cf. KIP-914) if (useVersionedSemantics && !record.value().isLatest) { LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); droppedRecordsSensor.record(); return; } +if (leftJoin) { +leftJoinInstructions(record); +} else { +defaultJoinInstructions(record); +} +} -final long[] currentHash = record.value().newValue == null ? -null : -Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - -final int partition = context().recordMetadata().get().partition(); +private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); +final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); +if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { +forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); +} +forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); Review Comment: @florin-akermann @wcarlson5 -- Seems we introduces a bug here. Filed: https://issues.apache.org/jira/browse/KAFKA-16644 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16467) Add README to docs folder
[ https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16467. --- Fix Version/s: 3.8.0 Resolution: Fixed > Add README to docs folder > - > > Key: KAFKA-16467 > URL: https://issues.apache.org/jira/browse/KAFKA-16467 > Project: Kafka > Issue Type: Improvement >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Fix For: 3.8.0 > > > We don't have a guide in project root folder or docs folder to show how to > run local website. It's good to provide a way to run document with kafka-site > repository. > > Option 1: Add links to wiki page > [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes] > and > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. > Option 2: Show how to run the document within container. For example: moving > `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16467: Add README to docs folder [kafka]
showuon merged PR #15664: URL: https://github.com/apache/kafka/pull/15664 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16627) Remove ClusterConfig parameter in BeforeEach and AfterEach
[ https://issues.apache.org/jira/browse/KAFKA-16627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16627. Fix Version/s: 3.8.0 Resolution: Fixed > Remove ClusterConfig parameter in BeforeEach and AfterEach > -- > > Key: KAFKA-16627 > URL: https://issues.apache.org/jira/browse/KAFKA-16627 > Project: Kafka > Issue Type: Improvement >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Minor > Fix For: 3.8.0 > > > In the past we modify configs like server broker properties by modifying the > ClusterConfig reference passed to BeforeEach and AfterEach based on the > requirements of the tests. > While after KAFKA-16560, the ClusterConfig become immutable, modify the > ClusterConfig reference no longer reflects any changes to the test cluster. > Then pass ClusterConfig to BeforeEach and AfterEach become redundant. We > should remove this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16627: Remove ClusterConfig parameter in BeforeEach and AfterEach [kafka]
chia7712 merged PR #15824: URL: https://github.com/apache/kafka/pull/15824 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Replaced Utils.join() with JDK API. [kafka]
chia7712 commented on code in PR #15823: URL: https://github.com/apache/kafka/pull/15823#discussion_r1583986944 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1051,13 +1051,13 @@ public static ValidString in(String... validStrings) { public void ensureValid(String name, Object o) { String s = (String) o; if (!validStrings.contains(s)) { -throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", ")); +throw new ConfigException(name, o, "String must be one of: " + String.join(",", validStrings)); Review Comment: `,` -> `, ` ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1079,12 +1079,12 @@ public static CaseInsensitiveValidString in(String... validStrings) { public void ensureValid(String name, Object o) { String s = (String) o; if (s == null || !validStrings.contains(s.toUpperCase(Locale.ROOT))) { -throw new ConfigException(name, o, "String must be one of (case insensitive): " + Utils.join(validStrings, ", ")); +throw new ConfigException(name, o, "String must be one of (case insensitive): " + String.join(",", validStrings)); } } public String toString() { -return "(case insensitive) [" + Utils.join(validStrings, ", ") + "]"; +return "(case insensitive) [" + String.join(",", validStrings) + "]"; Review Comment: ditto ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1051,13 +1051,13 @@ public static ValidString in(String... validStrings) { public void ensureValid(String name, Object o) { String s = (String) o; if (!validStrings.contains(s)) { -throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", ")); +throw new ConfigException(name, o, "String must be one of: " + String.join(",", validStrings)); } } public String toString() { -return "[" + Utils.join(validStrings, ", ") + "]"; +return "[" + String.join(",", validStrings) + "]"; Review Comment: ditto ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1205,7 +1205,8 @@ public void ensureValid(String name, Object value) { } if (!foundIllegalCharacters.isEmpty()) { -throw new ConfigException(name, value, "String may not contain control sequences but had the following ASCII chars: " + Utils.join(foundIllegalCharacters, ", ")); +throw new ConfigException(name, value, "String may not contain control sequences but had the following ASCII chars: " + + foundIllegalCharacters.stream().map(Object::toString).collect(Collectors.joining(","))); Review Comment: ditto ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1079,12 +1079,12 @@ public static CaseInsensitiveValidString in(String... validStrings) { public void ensureValid(String name, Object o) { String s = (String) o; if (s == null || !validStrings.contains(s.toUpperCase(Locale.ROOT))) { -throw new ConfigException(name, o, "String must be one of (case insensitive): " + Utils.join(validStrings, ", ")); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12572: Add import ordering checkstyle rule and configure an automatic formatter [kafka]
chia7712 commented on PR #10428: URL: https://github.com/apache/kafka/pull/10428#issuecomment-2083932644 @dongjinleekr Do you have free time to fix the conflicts? I'd like to review it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule
[ https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842195#comment-17842195 ] Chia-Ping Tsai commented on KAFKA-16643: there is a jira for it https://issues.apache.org/jira/browse/KAFKA-10787 maybe we can discuss that in the jira > Add ModifierOrder checkstyle rule > - > > Key: KAFKA-16643 > URL: https://issues.apache.org/jira/browse/KAFKA-16643 > Project: Kafka > Issue Type: Task > Components: build >Reporter: Greg Harris >Priority: Minor > > Checkstyle offers the ModifierOrder rule: > [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that > Kafka violates in a lot of places. We should decide if this is a checkstyle > rule we should be following or not, and potentially enable it moving forward. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842192#comment-17842192 ] Chia-Ping Tsai commented on KAFKA-16639: {quote} I took this because I wasn't sure if you wanted me to look into it. Please go ahead assign it back. We would love to review it. Thanks. Chia-Ping Tsai {quote} Sorry I just make sure there is someone who can have a fix for it, and won't have duplicate PR. Thanks for your response (and reviews in the future). We will prepare the patch ASAP. > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule
[ https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842190#comment-17842190 ] Greg Harris commented on KAFKA-16643: - I think that might be a different checkstyle rule: [https://checkstyle.sourceforge.io/checks/imports/importorder.html] this is for the modifiers on variables/methods etc (static final abstract etc) > Add ModifierOrder checkstyle rule > - > > Key: KAFKA-16643 > URL: https://issues.apache.org/jira/browse/KAFKA-16643 > Project: Kafka > Issue Type: Task > Components: build >Reporter: Greg Harris >Priority: Minor > > Checkstyle offers the ModifierOrder rule: > [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that > Kafka violates in a lot of places. We should decide if this is a checkstyle > rule we should be following or not, and potentially enable it moving forward. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
chia7712 merged PR #15788: URL: https://github.com/apache/kafka/pull/15788 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
chia7712 commented on PR #15788: URL: https://github.com/apache/kafka/pull/15788#issuecomment-2083898472 ``` ./gradlew cleanTest :streams:test --tests SlidingWindowedKStreamIntegrationTest.shouldRestoreAfterJoinRestart :tools:test --tests MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful --tests MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful --tests ListConsumerGroupTest.testListGroupCommandConsumerProtocol :storage:test --tests TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates --tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout :metadata:test --tests QuorumControllerTest.testBootstrapZkMigrationRecord --tests QuorumControllerTest.testNoOpRecordWriteAfterTimeout :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test --tests IdentityReplicationIntegrationTest.testSyncTopicConfigs --tests MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs --tests MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault --tests MirrorConnectorsIntegrationExactlyOnceTest.testSyn cTopicConfigs :core:test --tests ConsumerBounceTest.testClose --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests PlaintextConsumerTest.testSimpleConsumption --tests PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords --tests PlaintextAdminIntegrationTest.testDescribeConfigsForLog4jLogLevels --tests PlaintextConsumerTest.testPartitionsForAutoCreate --tests SaslMultiMechanismConsumerTest.testCoordinatorFailover --tests ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart --tests ZooKeeperClientTest.testBlockOnRequestCompletionFromStateChangeHandler --tests ReplicationQuotasTest.shouldBootstrapTwoBrokersWithFollowerThrottle ``` I don't observe related error and they pass on my local. will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule
[ https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842189#comment-17842189 ] Chia-Ping Tsai commented on KAFKA-16643: I have saw a lot of conflicts caused by the inconsistent import order... > Add ModifierOrder checkstyle rule > - > > Key: KAFKA-16643 > URL: https://issues.apache.org/jira/browse/KAFKA-16643 > Project: Kafka > Issue Type: Task > Components: build >Reporter: Greg Harris >Priority: Minor > > Checkstyle offers the ModifierOrder rule: > [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that > Kafka violates in a lot of places. We should decide if this is a checkstyle > rule we should be following or not, and potentially enable it moving forward. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16643) Add ModifierOrder checkstyle rule
[ https://issues.apache.org/jira/browse/KAFKA-16643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842188#comment-17842188 ] Chia-Ping Tsai commented on KAFKA-16643: +10 to this jira! > Add ModifierOrder checkstyle rule > - > > Key: KAFKA-16643 > URL: https://issues.apache.org/jira/browse/KAFKA-16643 > Project: Kafka > Issue Type: Task > Components: build >Reporter: Greg Harris >Priority: Minor > > Checkstyle offers the ModifierOrder rule: > [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that > Kafka violates in a lot of places. We should decide if this is a checkstyle > rule we should be following or not, and potentially enable it moving forward. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842187#comment-17842187 ] Philip Nee edited comment on KAFKA-16639 at 4/29/24 11:55 PM: -- hey - sorry. I took this because I wasn't sure if you wanted me to look into it. Please go ahead assign it back. We would love to review it. Thanks. [~chia7712] was (Author: JIRAUSER283568): hey - sorry. I took this because I wasn't sure if you wanted me to look into it. Please go ahead assign it back. We would love to review it. THanks. > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15897) Flaky Test: testWrongIncarnationId() – kafka.server.ControllerRegistrationManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-15897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15897. Fix Version/s: 3.8.0 Resolution: Fixed > Flaky Test: testWrongIncarnationId() – > kafka.server.ControllerRegistrationManagerTest > - > > Key: KAFKA-15897 > URL: https://issues.apache.org/jira/browse/KAFKA-15897 > Project: Kafka > Issue Type: Test >Reporter: Apoorv Mittal >Assignee: Chia-Ping Tsai >Priority: Major > Labels: flaky-test > Fix For: 3.8.0 > > > Build run: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/ > > {code:java} > org.opentest4j.AssertionFailedError: expected: <(false,1,0)> but was: > <(true,0,0)>Stacktraceorg.opentest4j.AssertionFailedError: expected: > <(false,1,0)> but was: <(true,0,0)>at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) >at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) >at > app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at > app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) > at > app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) > at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) > at > app//kafka.server.ControllerRegistrationManagerTest.$anonfun$testWrongIncarnationId$3(ControllerRegistrationManagerTest.scala:228) >at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at > app//kafka.server.ControllerRegistrationManagerTest.testWrongIncarnationId(ControllerRegistrationManagerTest.scala:226) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method)at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) >at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) >at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) >at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) >at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) >at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at >
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842187#comment-17842187 ] Philip Nee commented on KAFKA-16639: hey - sorry. I took this because I wasn't sure if you wanted me to look into it. Please go ahead assign it back. We would love to review it. THanks. > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15897: fix ControllerRegistrationManagerTest [kafka]
chia7712 merged PR #15828: URL: https://github.com/apache/kafka/pull/15828 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15897: fix ControllerRegistrationManagerTest [kafka]
chia7712 commented on PR #15828: URL: https://github.com/apache/kafka/pull/15828#issuecomment-2083890768 loop the tests 300 times, and the flaky is gone. I will merge it to see what happens to our CI :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842186#comment-17842186 ] Chia-Ping Tsai commented on KAFKA-16639: [~pnee] I just notice that you have took over this jira. Do you plan to offer a patch for it? If you have no free cycles, we can file a PR for it and then be waiting for your reviews :) > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842185#comment-17842185 ] Sal Sorrentino commented on KAFKA-16514: Bug, feature gap; /təˈmeɪ.t̬oʊ/ , /təˈmɑː.təʊ/. I only know I followed the programmatic interface provided but could not achieve the advertised results without perusing source code to find the undocumented secret sauce ;) Your point IS taken however. I am happy to create a KIP if that is the best path forward, but am unfamiliar with the process and will need to do some reading. I don't think there is an urgent need to get this in 3.8, but I think documenting the issue and the workaround is appropriate in the short term. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842184#comment-17842184 ] sanghyeok An commented on KAFKA-16637: -- [~kirktrue] thanks for your comments. I didn't catch issued you mentioned. i will check it out! However, to provide a non-zero duration, still does not work well; !image-2024-04-30-08-33-06-367.png|width=839,height=289! I changed the my code. (Duration.ofSeconds(1)). However, same logs are printed. !image-2024-04-30-08-33-50-435.png|width=1116,height=139! Is there any workaround? > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16637: - Attachment: image-2024-04-30-08-33-06-367.png > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16637: - Attachment: image-2024-04-30-08-33-50-435.png > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: image-2024-04-30-08-33-06-367.png, > image-2024-04-30-08-33-50-435.png > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] a slight change. [kafka]
gharris1727 commented on PR #15812: URL: https://github.com/apache/kafka/pull/15812#issuecomment-2083856829 Hi @gongxuanzhang and thank you for the contribution! This is actually an issue in a lot of places, I checked with this checkstyle.xml rule: ``` ``` and `./gradlew checkstyleMain checkstyleTest --continue`. While fixing this in one place is good, it would make sense to try and fix this everywhere if we decide to address it at all. I've created https://issues.apache.org/jira/browse/KAFKA-16643 for this, and you can take this on if you're interested. If so, please see the contributing guide: https://kafka.apache.org/contributing.html and join the mailing list and JIRA. | module | violations | |---|---| | `:streams:upgrade-system-tests-24:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-22:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-23:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-26:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-28:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-30:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-25:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-31:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-27:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-32:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-36:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-35:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-34:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-33:checkstyleTest` | 1 | | `:streams:upgrade-system-tests-37:checkstyleTest` | 1 | | `:generator:checkstyleMain` | 2 | | `:connect:mirror-client:checkstyleMain` | 1 | | `:connect:api:checkstyleMain` | 1 | | `:connect:json:checkstyleMain` | 1 | | `:server-common:checkstyleMain` | 10 | | `:raft:checkstyleMain` | 11 | | `:storage:checkstyleMain` | 2 | | `:trogdor:checkstyleMain` | 17 | | `:server:checkstyleMain` | 122 | | `:connect:mirror:checkstyleMain` | 3 | | `:connect:test-plugins:checkstyleMain` | 2 | | `:tools:checkstyleMain` | 9 | | `:storage:storage-api:checkstyleMain` | 20 | | `:streams:examples:checkstyleMain` | 5 | | `:streams:test-utils:checkstyleMain` | 2 | | `:group-coordinator:checkstyleMain` | 46 | | `:metadata:checkstyleMain` | 72 | | `:raft:checkstyleTest` | 5 | | `:connect:runtime:checkstyleMain` | 2 | | `:group-coordinator:checkstyleTest` | 10 | | `:server-common:checkstyleTest` | 4 | | `:trogdor:checkstyleTest` | 3 | | `:metadata:checkstyleTest` | 73 | | `:streams:test-utils:checkstyleTest` | 12 | | `:streams:checkstyleMain` | 57 | | `:clients:checkstyleTest` | 87 | | `:clients:checkstyleMain` | 122 | | `:core:checkstyleMain` | 1 | | `:shell:checkstyleMain` | 10 | | `:core:checkstyleTest` | 12 | | `:shell:checkstyleTest` | 2 | | `:jmh-benchmarks:checkstyleMain` | 1 | | `:connect:mirror:checkstyleTest` | 4 | | `:connect:runtime:checkstyleTest` | 15 | | `:streams:checkstyleTest` | 201 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16643) Add ModifierOrder checkstyle rule
Greg Harris created KAFKA-16643: --- Summary: Add ModifierOrder checkstyle rule Key: KAFKA-16643 URL: https://issues.apache.org/jira/browse/KAFKA-16643 Project: Kafka Issue Type: Task Components: build Reporter: Greg Harris Checkstyle offers the ModifierOrder rule: [https://checkstyle.sourceforge.io/checks/modifier/modifierorder.html] that Kafka violates in a lot of places. We should decide if this is a checkstyle rule we should be following or not, and potentially enable it moving forward. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842183#comment-17842183 ] Matthias J. Sax commented on KAFKA-16514: - I would not call it a bug, but more like a feature gap. Historically, consumer.close() sends a leave group request. For KS, we only added an internal config to the consumer to disable sending the leave group request. Independently, static group membership was added, and a new admit API was added to remove a static member from a group. The purpose for this admin feature was (IIRC), to allow triggering a rebalance for static groups (which usually run with high session timeouts) for the case of a crashed member that won't come back again. Thus, it's two independent features that just don't play nice with each other. – In additional, we combine both features with KafkaStreams#close(CloseOptions) but given how the APIs are build, it only works for static members. Thus, there is not really a reason, but it just happens that it all was implemented this way given historic context etc. I am in favor of doing a KIP to add something similar like "CloseOption" to Consumer#close() (independent of static membership of not). [~sal.sorrentino] Would you be interested to pick it up and write a KIP? We might still be able to get it into 3.8 release if we hurry up. [~lianetm] – what is the purpose of -2 code? In the end, not sending any request, with a large enough session timeout, no rebalance would be triggered anyway? What does change is we send -2 instead of just not sending any leaver group request on close()? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16637: -- Fix Version/s: 3.8.0 > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > Fix For: 3.8.0 > > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16637: -- Component/s: clients consumer > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16637: -- Labels: kip-848-client-support (was: ) > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > Labels: kip-848-client-support > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16637: - Assignee: Kirk True > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Assignee: Kirk True >Priority: Minor > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
pasharik opened a new pull request, #15830: URL: https://github.com/apache/kafka/pull/15830 - Updating `AclCommandTest` to support KRaft - Tests which are using `AclAuthoriser` are not updated, as they are expected to be removed after full migration to KRaft - Changed `AclCommand` console output for adding and removing ACLs. In the original implementation, `listAcls()` method was called directly from `addAcls()` and `removeAcls()` methods, which caused a race condition in KRaft mode, so the test become flaky - This is quite a big change for our UI, as I understand. Should it go throuhg KIP process? ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842160#comment-17842160 ] Lianet Magrans commented on KAFKA-16637: Hey [~chickenchickenlove], thanks for reporting this. I wonder if you're hitting a known issue with the consumer api and timeous (being fixed with https://issues.apache.org/jira/browse/KAFKA-16200 and https://issues.apache.org/jira/browse/KAFKA-15974). I tried your code, changing only the call poll from what you had `kafkaConsumer.poll(Duration.ZERO)` , to provide a non-zero duration, and it all worked as expected. So I guess it could be related to the timeout enforcement issues being fixed on the consumer side. > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Minor > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > *Docker-compose.yaml* > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > *Consumer Code* > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > *Consumer logs* > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > *Broker logs* > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on PR #15798: URL: https://github.com/apache/kafka/pull/15798#issuecomment-2083638319 TODO: This patch is waiting on https://github.com/apache/kafka/pull/15818 to make sure the response future is not completed until the records are successfully replayed and persisted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1583740908 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1169,6 +1173,64 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } +/** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ +private void throwIfClassicProtocolIsNotSupported( +ConsumerGroup group, +String memberId, +String protocolType, +JoinGroupRequestProtocolCollection protocols +) { +if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { +throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported."); +} +} + +/** + * Deserialize the subscription in JoinGroupRequestProtocolCollection. + * All the protocols have the same subscription, so the method picks a random one. + * + * @param protocols The JoinGroupRequestProtocolCollection. + * @return The Subscription. + */ +private static ConsumerPartitionAssignor.Subscription deserializeSubscription( +JoinGroupRequestProtocolCollection protocols +) { +try { +return ConsumerProtocol.deserializeSubscription( Review Comment: Yeah it makes sense. I can open another pr once this one is merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1583698116 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -240,27 +185,33 @@ private void emitNonJoinedOuterRecords( // There might be an outer record for the other joinSide which window has not closed yet // We rely on the ordering of KeyValueIterator final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); -if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs +>= sharedTimeTracker.streamTime) { if (timestampedKeyAndJoinSide.isLeftSide()) { -outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side +outerJoinLeftWindowOpen = +true; // there are no more candidates to emit on left-outerJoin-side } else { -outerJoinRightWindowOpen = true; // there are no more candidates to emit on right-outerJoin-side +outerJoinRightWindowOpen = +true; // there are no more candidates to emit on right-outerJoin-side } // We continue with the next outer record continue; } - + final K key = timestampedKeyAndJoinSide.getKey(); -final LeftOrRightValue leftOrRightValue = next.value; -final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue); +final LeftOrRightValue leftOrRightValue = next.value; +final VThis thisValue = getThisValue(leftOrRightValue); +final VOther otherValue = getOtherValue(leftOrRightValue); +final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue); context().forward( - record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) + record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) ); Review Comment: nit: This could be brought out to a method which depends only on `record` and `next`. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## Review Comment: > No more un safe type casting to be found in this class. Not that much has changed in this abstract class. I have moved some code in methods. Very nice! I really like the new types, and the abstract methods are very minimal. > Side Note: I don't have the correct formatter configured :/ I couldn't find any contribution notes to set the correct one. I'm not aware of a standard formatter either. I remember tweaking the IntelliJ default rules slightly, but taking a look at them now I'm not seeing evidence of what I changed. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java: ## Review Comment: The left and right classes are perfect in my opinion. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -43,161 +42,106 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin implements ProcessorSupplier { +abstract class KStreamKStreamJoin implements ProcessorSupplier { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); - -private final String otherWindowName; +private final boolean outer; +private final ValueJoinerWithKey joiner; private final long joinBeforeMs; private final long joinAfterMs; private final long joinGraceMs; +private final String otherWindowName; +private final TimeTrackerSupplier sharedTimeTrackerSupplier; private final boolean enableSpuriousResultFix; +private final Optional outerJoinWindowName; private final long windowsBeforeMs; private final long windowsAfterMs; -private final boolean outer; -private final boolean isLeftSide; -private final Optional outerJoinWindowName; -private final ValueJoinerWithKey joiner; - -private final TimeTrackerSupplier sharedTimeTrackerSupplier; - -KStreamKStreamJoin(final boolean isLeftSide, - final String
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1583691897 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -10921,6 +10855,544 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } +@Test +public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.build())) +.withConsumerGroupMaxSize(1) +.build(); + +JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); + +Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); +assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); +} + +@Test +public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception { +int minSessionTimeout = 50; +int maxSessionTimeout = 100; +String groupId = "group-id"; + +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withClassicGroupMinSessionTimeoutMs(minSessionTimeout) +.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) +.build(); + +JoinGroupRequestData requestWithSmallSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withSessionTimeoutMs(minSessionTimeout - 1) +.build(); +assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithSmallSessionTimeout)); + +JoinGroupRequestData requestWithLargeSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withSessionTimeoutMs(maxSessionTimeout + 1) +.build(); +assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithLargeSessionTimeout)); +} + +@Test +public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) +.build())) +.build(); + +JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.withDefaultProtocolTypeAndProtocols() +.build(); +assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + +JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withProtocolType("connect") +.withDefaultProtocolTypeAndProtocols() +.build(); +assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); +} + +@Test +public void testConsumerGroupJoinWithNewDynamicMember() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; +Uuid barTopicId = Uuid.randomUuid(); +String barTopicName = "bar"; + +
[PR] WIP [kafka]
cadonna opened a new pull request, #15829: URL: https://github.com/apache/kafka/pull/15829 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1583691897 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -10921,6 +10855,544 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } +@Test +public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.build())) +.withConsumerGroupMaxSize(1) +.build(); + +JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); + +Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); +assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); +} + +@Test +public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception { +int minSessionTimeout = 50; +int maxSessionTimeout = 100; +String groupId = "group-id"; + +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withClassicGroupMinSessionTimeoutMs(minSessionTimeout) +.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) +.build(); + +JoinGroupRequestData requestWithSmallSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withSessionTimeoutMs(minSessionTimeout - 1) +.build(); +assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithSmallSessionTimeout)); + +JoinGroupRequestData requestWithLargeSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withSessionTimeoutMs(maxSessionTimeout + 1) +.build(); +assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithLargeSessionTimeout)); +} + +@Test +public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) +.build())) +.build(); + +JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.withDefaultProtocolTypeAndProtocols() +.build(); +assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + +JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withProtocolType("connect") +.withDefaultProtocolTypeAndProtocols() +.build(); +assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); +} + +@Test +public void testConsumerGroupJoinWithNewDynamicMember() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; +Uuid barTopicId = Uuid.randomUuid(); +String barTopicName = "bar"; + +
[jira] [Created] (KAFKA-16642) Update KafkaConsumerTest to show parameters in test lists
Kirk True created KAFKA-16642: - Summary: Update KafkaConsumerTest to show parameters in test lists Key: KAFKA-16642 URL: https://issues.apache.org/jira/browse/KAFKA-16642 Project: Kafka Issue Type: Bug Components: clients, consumer, unit tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 {{KafkaConsumerTest}} was recently updated to make many of its tests parameterized to exercise both the {{CLASSIC}} and {{CONSUMER}} group protocols. However, in some of the tools in which [lists of tests are provided|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.consumer.KafkaConsumerTest=FLAKY], say, for analysis, the group protocol information is not exposed. For example, one test ({{{}testReturnRecordsDuringRebalance{}}}) is flaky, but it's difficult to know at a glance which group protocol is causing the problem because the list simply shows: {quote}{{testReturnRecordsDuringRebalance(GroupProtocol)[1]}} {quote} Ideally, it would expose more information, such as: {quote}{{testReturnRecordsDuringRebalance(GroupProtocol=CONSUMER)}} {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16642) Update KafkaConsumerTest to show parameters in test lists
[ https://issues.apache.org/jira/browse/KAFKA-16642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16642: -- Priority: Minor (was: Major) > Update KafkaConsumerTest to show parameters in test lists > - > > Key: KAFKA-16642 > URL: https://issues.apache.org/jira/browse/KAFKA-16642 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > {{KafkaConsumerTest}} was recently updated to make many of its tests > parameterized to exercise both the {{CLASSIC}} and {{CONSUMER}} group > protocols. However, in some of the tools in which [lists of tests are > provided|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.consumer.KafkaConsumerTest=FLAKY], > say, for analysis, the group protocol information is not exposed. For > example, one test ({{{}testReturnRecordsDuringRebalance{}}}) is flaky, but > it's difficult to know at a glance which group protocol is causing the > problem because the list simply shows: > {quote}{{testReturnRecordsDuringRebalance(GroupProtocol)[1]}} > {quote} > Ideally, it would expose more information, such as: > {quote}{{testReturnRecordsDuringRebalance(GroupProtocol=CONSUMER)}} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
lianetm commented on PR #15803: URL: https://github.com/apache/kafka/pull/15803#issuecomment-2083525470 Thanks for the catch and the fix @AndrewJSchofield, left a nit, but LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16608: Honour interrupted thread state on KafkaConsumer.poll [kafka]
lianetm commented on code in PR #15803: URL: https://github.com/apache/kafka/pull/15803#discussion_r1583641482 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1789,6 +1791,33 @@ public void testProcessBackgroundEventsTimesOut() throws Exception { } } +/** + * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} + * causes {@link InterruptException} to be thrown. + */ +@Test +public void testPollThrowsInterruptExceptionIfInterrupted() { +consumer = newConsumer(); +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +consumer.assign(singleton(tp)); + +// interrupt the thread and call poll +try { +Thread.currentThread().interrupt(); +assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); +} finally { +// clear interrupted state again since this thread may be reused by JUnit Review Comment: Just to make the comment accurate, I expect that we need to flip the interrupted flag here so that the assertion down below polling again does not throw. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1583634669 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1288,25 +1353,15 @@ private CoordinatorResult consumerGr .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) .setClientId(clientId) .setClientHost(clientHost) +.setClassicMemberMetadata(null) Review Comment: Our default value is already null. This is necessary as `updatedMemberBuilder = new ConsumerGroupMember.Builder(member)` can have non-null classicMemberMetadata and we need it to update the members that have changed from the classic protocol to the consumer protocol. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15342) Considering upgrading to Mockito 5.4.1 or later
[ https://issues.apache.org/jira/browse/KAFKA-15342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842126#comment-17842126 ] Greg Harris commented on KAFKA-15342: - [~chia7712] I think there may be more value to the integration test suite in the future: KAFKA-16242, as the unitTest target can be used for strict CI gating to prevent obviously broken builds. > Considering upgrading to Mockito 5.4.1 or later > --- > > Key: KAFKA-15342 > URL: https://issues.apache.org/jira/browse/KAFKA-15342 > Project: Kafka > Issue Type: Task > Components: unit tests >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 4.0.0 > > > We're currently stuck on Mockito 4.x.y because the 5.x.y line requires Java > 11 and, until we begin to work on Kafka 4.0.0, we continue to support Java 8. > Either directly before, or after releasing Kafka 4.0.0, we should try to > upgrade to a version of Mockito on the 5.x.y line. > If we're able to use a version that includes > [https://github.com/mockito/mockito/pull/3078|https://github.com/mockito/mockito/pull/3078,] > (which should be included in either a 5.4.1 or 5.5.0 release), we should > also revert the change made for > https://issues.apache.org/jira/browse/KAFKA-14682, which is just a temporary > workaround. Care should be taken that, after reverting that change, unused > stubbings are still correctly reported during our CI builds. > If the effort required to upgrade our Mockito version is too high, we can > either downgrade the severity of this ticket, or split it out into separate > subtasks for each to-be-upgraded module. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]
dongnuo123 commented on code in PR #15818: URL: https://github.com/apache/kafka/pull/15818#discussion_r1583572562 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1560,13 +1561,15 @@ private List consumerGroupStaticMemberGroupLeave( * @param records The list of records to be applied to the state. * @return The append future to be applied. */ -private CompletableFuture consumerGroupFenceMember( +private CoordinatorResult consumerGroupFenceMember( ConsumerGroup group, ConsumerGroupMember member, -List records +List records, +T response ) { if (validateOnlineDowngrade(group, member.memberId())) { -return convertToClassicGroup(group, member.memberId(), records); +CompletableFuture appendFuture = convertToClassicGroup(group, member.memberId(), records); Review Comment: moved `CoordinatorResult` to `convertToClassicGroup`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]
dongnuo123 commented on code in PR #15818: URL: https://github.com/apache/kafka/pull/15818#discussion_r1583571521 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java: ## @@ -44,8 +44,8 @@ public class CoordinatorResult { private final CompletableFuture appendFuture; /** - * The boolean indicating whether to replay the records. Without specifying. - * The default value is {@code appendFuture == null}. + * The boolean indicating whether to replay the records. + * The default value is {@code true} unless specified otherwise. */ private final Boolean replayRecords; Review Comment: Then let's go with `replayRecords ? 1 : 0`. I guess it doesn't matter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]
dajac commented on code in PR #15818: URL: https://github.com/apache/kafka/pull/15818#discussion_r1583565827 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java: ## @@ -44,8 +44,8 @@ public class CoordinatorResult { private final CompletableFuture appendFuture; /** - * The boolean indicating whether to replay the records. Without specifying. - * The default value is {@code appendFuture == null}. + * The boolean indicating whether to replay the records. + * The default value is {@code true} unless specified otherwise. */ private final Boolean replayRecords; Review Comment: I am not sure. The one that I gave was auto-generated by intellij. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2083394774 @cadonna—In most places I removed use of a `Timer` to calculate the deadline. Event classes no longer require a `Timer`, it is the caller who must call `CompletableEvent.calculateDeadlineMs()` when creating the event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
gharris1727 commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2083373419 Test failures appear unrelated, there's a targeted RemoteLogMetadataSerdeTest for this logic, and the storage tests appear to pass for me locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1583520696 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -998,7 +958,7 @@ public List partitionsFor(String topic, Duration timeout) { wakeupTrigger.setActiveTask(topicMetadataEvent.future()); try { Map> topicMetadata = -applicationEventHandler.addAndGet(topicMetadataEvent, timer); +applicationEventHandler.addAndGet(topicMetadataEvent); Review Comment: Removed use of `Timer` in favor of calculating the deadline from the `time` and `timeout` directly. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1107,7 +1067,7 @@ public Map offsetsForTimes(Map
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on PR #15640: URL: https://github.com/apache/kafka/pull/15640#issuecomment-2083370524 > Here I have a comment, I could not put at the right location in the code: > > On line 1362, in `commitSync()` the consumer waits on the `commitFuture` with a timer. I think, it should not wait on a timer there since we already wait on a timer in the background thread. I agree. What about the timed wait in `awaitPendingAsyncCommitsAndExecuteCommitCallbacks()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842121#comment-17842121 ] Greg Harris commented on KAFKA-15905: - Rather than re-reading the checkpoints topic, we could add source offsets to the emitted records, effectively writing each checkpoint twice. The framework could manage the offset consumer on our behalf, simplifying the mirror checkpoint task and preventing the need for credentials to consume from the checkpoints topic. > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Priority: Major > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842119#comment-17842119 ] Greg Harris commented on KAFKA-16622: - To clarify your specific points: > but the problem here is that if the consumer never fully catches up once, we > will never have a checkpoint. If the consumer never catches up later than the most recent MM2 checkpoint task restart, it will not have a checkpoint. In the above example, it needed to get past . > If as initial state the >{color:#00}OffsetSyncStore.{color}{color:#871094}offsetSyncs >{color}contained a distribution of {color:#00}OffsetSync rather than just >multiple copies of the last {color}{color:#00}OffsetSync , Checkpoints >would be computed earlier I think{color} {color:#00}This is the effect a solution to KAFKA-15905 would have. If we can restore the state of the `checkpointsPerConsumerGroup` variable, then it will be safe to keep those offset syncs in the sync store.{color} > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16577: New consumer fails with stop within allotted timeout in consumer_test.py system test [kafka]
kirktrue commented on PR #15784: URL: https://github.com/apache/kafka/pull/15784#issuecomment-2083344885 @cadonna—can you review this PR? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842117#comment-17842117 ] Greg Harris commented on KAFKA-16622: - Yeah [~ecomar] from the final state of the OffsetSyncStore, this appears to be working as intended: {noformat} [2024-04-26 10:58:44,557] TRACE [MirrorCheckpointConnector|task-0] New sync OffsetSync{topicPartition=mytopic-0, upstreamOffset=19998, downstreamOffset=19998} applied, new state is [19998:19998,19987:19987,19965:19965,19921:19921,19822:19822,19635:19635,19415:19415,18964:18964,18095:18095,16500:16500,:] (org.apache.kafka.connect.mirror.OffsetSyncStore:176){noformat} The gaps are 11, 22, 44, 99, 187, 220, 451, 869, 1595 which follow the approximate exponential that I would expect. Instead of the ~5 syncs I expected there's 9, which is better than I estimated because you have the offset.lag.max low. I would say the title of this issue isn't quite accurate now that we've investigated it, as the translation can happen at these intermediate points in addition to the end of the topic. If you had a consumer group with offset 19635 or 19636, that would be translated exactly, but a consumer group with offset 19700 would translate to 19636 and have some lag/reprocessing. This is intentional, as we made a trade-off about memory usage and precision to prioritize accuracy in the offset translation algorithm. You can see the discussion about this here: [https://lists.apache.org/thread/7qzxm1727y8rtrw6ds7t6hltkm55j5po] and here: [https://github.com/apache/kafka/pull/13178] to see more motivation for the current algorithm. I understand your concern though, and you're correct that KAFKA-15905 will help for the offsets between 0 and , and KAFKA-16364 will help with offsets close to the end of the topic. I also just opened this ticket: https://issues.apache.org/jira/browse/KAFKA-16641 for another improvement I thought of. It has a risk of mis-translating offsets for topics with gaps, but should be better than the old pre KAFKA-12468 algorithm, so we can discuss if it requires a configuration, and maybe it can be included in a KIP with KAFKA-16364. > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
hachikuji commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1583479619 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java: ## @@ -807,4 +809,62 @@ private static void writeSnapshotFooterRecord( builder.appendSnapshotFooterMessage(timestamp, snapshotFooterRecord); } } + +public static MemoryRecords withKRaftVersionRecord( +long initialOffset, +long timestamp, +int leaderEpoch, +ByteBuffer buffer, +KRaftVersionRecord kraftVersionRecord +) { +writeKRaftVersionRecord(buffer, initialOffset, timestamp, leaderEpoch, kraftVersionRecord); +buffer.flip(); +return MemoryRecords.readableRecords(buffer); +} + +private static void writeKRaftVersionRecord( +ByteBuffer buffer, +long initialOffset, +long timestamp, +int leaderEpoch, +KRaftVersionRecord kraftVersionRecord +) { +try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder( +buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, +TimestampType.CREATE_TIME, initialOffset, timestamp, +RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, +false, true, leaderEpoch, buffer.capacity()) +) { +builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord); +} +} + +public static MemoryRecords withVotersRecord( +long initialOffset, +long timestamp, +int leaderEpoch, +ByteBuffer buffer, +VotersRecord votersRecord +) { +writeVotersRecord(buffer, initialOffset, timestamp, leaderEpoch, votersRecord); Review Comment: We are discarding the `MemoryRecords` created by the builder and creating a new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
chia7712 merged PR #15645: URL: https://github.com/apache/kafka/pull/15645 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16641) MM2 offset translation should interpolate between sparse OffsetSyncs
Greg Harris created KAFKA-16641: --- Summary: MM2 offset translation should interpolate between sparse OffsetSyncs Key: KAFKA-16641 URL: https://issues.apache.org/jira/browse/KAFKA-16641 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Greg Harris Right now, the OffsetSyncStore keeps a sparse offset store, with exponential spacing between syncs. This can leave large gaps in translation, where offsets are translated much more conservatively than necessary. The dominant way to use MirrorMaker2 is in a "single writer" fashion, where the target topic is only written to by a single mirror maker 2. When a topic without gaps is replicated, contiguous blocks of offsets are preserved. For example: Say that MM2 mirrors 100 records, and emits two syncs: 0:100 and 100:200. We can detect when the gap between the upstream and downstream offsets is the same using subtraction, and then assume that 50:150 is also a valid translation. If the source topic has gaps, or goes through a restart, we should expect a discontinuity in the offset syncs, like 0:100 and 100:250 or 0:100 and 100:150. This may allow us to restore much of the offset translation precision that was lost for simple contiguous topics, without additional memory usage, but at the risk of mis-translating some pathological situations when the source topic has gaps. This might be able to be enabled unconditionally, or enabled via a configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1583466928 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; + +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.GroupType.CONSUMER; + +class ConsumerGroupExecutor { + +private ConsumerGroupExecutor() { +} + +static AutoCloseable buildConsumerGroup(String brokerAddress, +int numberOfConsumers, +String groupId, +String topic, +String groupProtocol, +Optional remoteAssignor, +Map customConfigs, +boolean syncCommit) { +return buildConsumers( +brokerAddress, +numberOfConsumers, +groupId, +groupProtocol, +topic, +RangeAssignor.class.getName(), +remoteAssignor, +customConfigs, +syncCommit +); +} + +static AutoCloseable buildClassicGroup(String brokerAddress, + int numberOfConsumers, + String groupId, + String topic, + String assignmentStrategy, + Map customConfigs, + boolean syncCommit) { +return buildConsumers( +brokerAddress, +numberOfConsumers, +groupId, +GroupProtocol.CLASSIC.name, +topic, +assignmentStrategy, +Optional.empty(), +customConfigs, +syncCommit +); +} + +private static AutoCloseable buildConsumers( +String brokerAddress, +int numberOfConsumers, +String groupId, +String groupProtocol, +String topic, +String assignmentStrategy, +Optional remoteAssignor, +Map customConfigs, +boolean syncCommit +) { +List> allConfigs = IntStream.range(0, numberOfConsumers) +.mapToObj(ignored -> +composeConfigs( +
[jira] [Updated] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test
[ https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16576: -- Priority: Minor (was: Blocker) > New consumer fails with assert in consumer_test.py’s test_consumer_failure > system test > -- > > Key: KAFKA-16576 > URL: https://issues.apache.org/jira/browse/KAFKA-16576 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: flaky-test, kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 42.582 seconds > AssertionError() > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 399, in test_consumer_failure > assert partition_owner is not None > AssertionError > Notify > {code} > Affected tests: > * {{test_consumer_failure}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1583452334 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1169,8 +1129,7 @@ private Map beginningOrEndOffset(Collection offsetAndTimestampMap; offsetAndTimestampMap = applicationEventHandler.addAndGet( -listOffsetsEvent, -timer); +listOffsetsEvent); Review Comment: Yep, I made the change to use the deadline directly (vs. a `Timer`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1583451774 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java: ## @@ -27,19 +30,34 @@ public abstract class CompletableBackgroundEvent extends BackgroundEvent implements CompletableEvent { private final CompletableFuture future; +private final long deadlineMs; -protected CompletableBackgroundEvent(final Type type) { +protected CompletableBackgroundEvent(final Type type, final Timer timer) { super(type); this.future = new CompletableFuture<>(); +Objects.requireNonNull(timer); + +long currentTimeMs = timer.currentTimeMs(); +long remainingMs = timer.remainingMs(); + +if (currentTimeMs > Long.MAX_VALUE - remainingMs) +this.deadlineMs = Long.MAX_VALUE; +else +this.deadlineMs = currentTimeMs + remainingMs; Review Comment: Per the above, I added `CompletableEvent.calculateDeadlineMs()` to keep the code in a shared location. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1583452740 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -162,32 +162,39 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu controllers = annot.controllers(); } -if (brokers <= 0 || controllers <= 0) { -throw new IllegalArgumentException("Number of brokers/controllers must be greater than zero."); +final int disksPerBroker; +if (annot.disksPerBroker() == 0) { +disksPerBroker = defaults.disksPerBroker(); Review Comment: We can leverage ternary conditional operator to make it more simple. For example: ```java Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType(); Map serverProperties = new HashMap<>(); for (ClusterConfigProperty property : defaults.serverProperties()) { serverProperties.put(property.key(), property.value()); } for (ClusterConfigProperty property : annot.serverProperties()) { serverProperties.put(property.key(), property.value()); } ClusterConfig config = ClusterConfig.builder() .setType(type) .setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers()) .setControllers(annot.controllers() == 0 ? defaults.controllers() : annot.controllers()) .setDisksPerBroker(annot.disksPerBroker() == 0 ? defaults.disksPerBroker() : annot.disksPerBroker()) .setAutoStart(annot.autoStart() == AutoStart.DEFAULT ? defaults.autoStart() : annot.autoStart() == AutoStart.YES) .setName(annot.name().isEmpty() ? null : annot.name()) .setName(annot.listener().isEmpty() ? null : annot.listener()) .setServerProperties(serverProperties) .setSecurityProtocol(annot.securityProtocol()) .setMetadataVersion(annot.metadataVersion()) .build(); type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
kirktrue commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1583452722 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -946,8 +907,7 @@ public Map committed(final Set> listTopics(Duration timeout) { final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(timer); Review Comment: Changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16576) New consumer fails with assert in consumer_test.py’s test_consumer_failure system test
[ https://issues.apache.org/jira/browse/KAFKA-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16576: - Assignee: (was: Kirk True) > New consumer fails with assert in consumer_test.py’s test_consumer_failure > system test > -- > > Key: KAFKA-16576 > URL: https://issues.apache.org/jira/browse/KAFKA-16576 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Minor > Labels: flaky-test, kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{consumer_test.py}} system test intermittently fails with the following > error: > {code} > test_id: > kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_failure.clean_shutdown=True.enable_autocommit=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 42.582 seconds > AssertionError() > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_test.py", > line 399, in test_consumer_failure > assert partition_owner is not None > AssertionError > Notify > {code} > Affected tests: > * {{test_consumer_failure}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1583450532 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -966,6 +982,61 @@ private static void maybeUpdateSubscribedTopicNames( } } +/** + * Updates the subscription count. + * + * @param oldMember The old member. + * @param newMember The new member. + * + * @return Copy of the map of topics to the count of number of subscribers. + */ +public Map updateSubscribedTopicNames( +ConsumerGroupMember oldMember, +ConsumerGroupMember newMember +) { +Map subscribedTopicCount = new HashMap<>(this.subscribedTopicNames); +if (oldMember != null) { +oldMember.subscribedTopicNames().forEach(topicName -> +subscribedTopicCount.compute(topicName, ConsumerGroup::decValue) +); +} + +if (newMember != null) { +newMember.subscribedTopicNames().forEach(topicName -> +subscribedTopicCount.compute(topicName, ConsumerGroup::incValue) +); +} + +return subscribedTopicCount; +} + +/** + * Compute the subscription type of the consumer group. + * + * If all members are subscribed to the same set of topics, the type is homogeneous. + * Otherwise, it is heterogeneous. + * + * @param subscribedTopicNames A map of topic names to the count of members subscribed to each topic. + * @param numOfMembers The total number of members in the group. + * @param subscriptionType The current subscription type of the group. + * @return {@link SubscriptionType#HOMOGENEOUS} if all members are subscribed to exactly the same topics; + * otherwise, {@link SubscriptionType#HETEROGENEOUS}. + */ +public static SubscriptionType subscriptionType( +Map subscribedTopicNames, +int numOfMembers, +SubscriptionType subscriptionType +) { +if (subscribedTopicNames.isEmpty()) return subscriptionType; Review Comment: discussed offline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1583450102 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java: ## @@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) { * is a request in-flight. */ public boolean requestInFlight() { -return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs; +return requestInFlight; Review Comment: @philipnee—let us know if you're OK to leave the method name as is. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1583446833 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicMap.size() == minIsrRecords.size()) { +// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the +// updated topics. + records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate( +new ArrayList<>(topicMap.keySet()), +topicName -> topicMap.get(topicName)) +); +return; +} + +// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply +// the config updates to it. Use this config copy for the min ISR look up. +Map> configDataCopy = new HashMap<>(configData); +SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); +for (ConfigRecord record : minIsrRecords) { +replayInternal(record, configDataCopy, localSnapshotRegistry); +} Review Comment: This is the implementation challenge part of this PR. To find the effective min ISR value, it requires checking topic config -> dynamic broker config -> default broker config -> ... Let's say the user updates the default broker config: 1. All the topics could be affected. 2. The effective min ISR values should be recalculated. 3. We need to generate the partition change records along with the config change records, which means the ReplicationControlManager can't use the regular methods for the effective min ISR value. The value should be determined by the config records and the current configs. I found it easier to make a copy of the configs and apply the min ISR updates on the copy. Then let the ReplicationControlManager check all the partitions with the config copy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move metrics configs out of KafkaConfig [kafka]
chia7712 merged PR #15822: URL: https://github.com/apache/kafka/pull/15822 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]
dajac commented on code in PR #15818: URL: https://github.com/apache/kafka/pull/15818#discussion_r1583442656 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1560,13 +1561,15 @@ private List consumerGroupStaticMemberGroupLeave( * @param records The list of records to be applied to the state. * @return The append future to be applied. */ -private CompletableFuture consumerGroupFenceMember( +private CoordinatorResult consumerGroupFenceMember( ConsumerGroup group, ConsumerGroupMember member, -List records +List records, +T response ) { if (validateOnlineDowngrade(group, member.memberId())) { -return convertToClassicGroup(group, member.memberId(), records); +CompletableFuture appendFuture = convertToClassicGroup(group, member.memberId(), records); Review Comment: `convertToClassicGroup` is only used by this method, would it make sense to move `CoordinatorResult` to it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]
dajac commented on code in PR #15818: URL: https://github.com/apache/kafka/pull/15818#discussion_r1583441095 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1499,33 +1499,28 @@ private CoordinatorResult consumerGr ) throws ApiException { ConsumerGroup group = consumerGroup(groupId); List records = new ArrayList<>(); Review Comment: Should we push this one down too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using
[ https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842110#comment-17842110 ] Greg Harris commented on KAFKA-16640: - Okay, I understand now. SGTM! > Replace TestUtils#resource by scala.util.Using > -- > > Key: KAFKA-16640 > URL: https://issues.apache.org/jira/browse/KAFKA-16640 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we > don't need to have custom try-resource function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using
[ https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842107#comment-17842107 ] Chia-Ping Tsai commented on KAFKA-16640: {quote} does this depend on https://issues.apache.org/jira/browse/KAFKA-12895 ? {quote} no, we can complete this in 3.8 since we can use `Using` through `scala-collection-compat` in scala 2.12 > Replace TestUtils#resource by scala.util.Using > -- > > Key: KAFKA-16640 > URL: https://issues.apache.org/jira/browse/KAFKA-16640 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we > don't need to have custom try-resource function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
chia7712 commented on code in PR #15773: URL: https://github.com/apache/kafka/pull/15773#discussion_r1583431374 ## server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java: ## @@ -86,7 +86,7 @@ public class ServerLogConfigs { public static final String LOG_DELETE_DELAY_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG); public static final long LOG_DELETE_DELAY_MS_DEFAULT = 6L; -public static final String LOG_DELETE_DELAY_MS_DOC = "The amount of time to wait before deleting a file from the filesystem"; +public static final String LOG_DELETE_DELAY_MS_DOC = "The amount of time to wait before deleting a file from the filesystem. If the value is 0 and there is no file to delete, the system will wait 1 second, or the job may keep running."; Review Comment: "1 second"? or "1 millisecond"? Also, we need to highlight "busy waiting" for users -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using
[ https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842104#comment-17842104 ] Greg Harris commented on KAFKA-16640: - Hi [~chia7712] does this depend on https://issues.apache.org/jira/browse/KAFKA-12895 ? If so we should wait to implement this after the 3.8 release. > Replace TestUtils#resource by scala.util.Using > -- > > Key: KAFKA-16640 > URL: https://issues.apache.org/jira/browse/KAFKA-16640 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we > don't need to have custom try-resource function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1583424779 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); Review Comment: If `min.insync.replicas` is not set on the topic config level, the effective `min.insync.replicas` of a topic will change if default broker config is updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1583423622 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource( if (error.isFailure()) { return error; } +maybeTriggerPartitionUpdateOnMinIsrChange(newRecords); Review Comment: `legacyAlterConfigResource` has supported. Adding UT to cover it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1583422586 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int brokerId } return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false); } + +BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() { +Map topicMap = new HashMap<>(); +for (Map map : elrMembers.values()) { +if (map != null) { Review Comment: Done, it can't be null -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842099#comment-17842099 ] Chia-Ping Tsai commented on KAFKA-16639: check the code again. It seems there are two possible bugs if we change the state to `LEAVING` before receiving the heartbeat response 1. https://github.com/apache/kafka/commit/8c0488b887be4a9178563f1d72514010f83b8614 ignore the response and leave the member id be empty. That obstructs from leaving the group as server will reject the request (epoch=-1 and member is empty) 2. https://github.com/apache/kafka/blob/636e65aa6b3558a7ae239ce69579b62ab3377fcb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212 could ignore the request due to in-flight request. > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-16639: -- Assignee: Philip Nee (was: Chia-Ping Tsai) > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1583356709 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class ConnectorOffsetBackingStoreTest { + +// Serialized +private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); +private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + +private static final KafkaException PRODUCE_EXCEPTION = new KafkaException(); + +private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); + +@Test +public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() { +MockProducer connectorStoreProducer = createMockProducer(); +MockProducer workerStoreProducer = createMockProducer(); +KafkaOffsetBackingStore connectorStore = createStore("topic1", connectorStoreProducer); +KafkaOffsetBackingStore workerStore = createStore("topic2", workerStoreProducer); + +ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( +() -> LoggingContext.forConnector("source-connector"), +workerStore, +connectorStore, +"offsets-topic", +mock(TopicAdmin.class)); + +AtomicBoolean callbackInvoked = new AtomicBoolean(); +AtomicReference callbackResult = new AtomicReference<>(); +AtomicReference callbackError = new AtomicReference<>(); + +Future setFuture = offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY_SERIALIZED, null), (error, result) -> { +callbackInvoked.set(true); +callbackResult.set(result); +callbackError.set(error); +}); Review Comment: Nit: could it also be helpful to make sure we don't prematurely complete our callbacks? ```suggestion }); assertFalse("Store callback should not be invoked before underlying producer callback", callbackInvoked.get()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1307,13 +1307,14 @@ private CoordinatorResult consumerGr } } +Map subscribedTopicsMemberCount = new HashMap<>(); Review Comment: I initialized it in the method to update, and we return the map. Currently there is no getter for the map, I can create another getter to return the map and not just the list of subscribed topic names -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1307,13 +1307,14 @@ private CoordinatorResult consumerGr } } +Map subscribedTopicsMemberCount = new HashMap<>(); Review Comment: I initialized it in the method to update, and we return the map. Currently there is no getter for the map, I can create another getter to return the map and not just the subscribed topic names -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1583294148 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1307,13 +1307,14 @@ private CoordinatorResult consumerGr } } +Map subscribedTopicsMemberCount = new HashMap<>(); Review Comment: I initialized it in the method to update, and we return the map. THe map itself is private so I can't initialize it directly, unless I create another getter to return the map and not just the subscribed topic names -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r158025 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1350,6 +1351,11 @@ private CoordinatorResult consumerGr .withMembers(group.members()) .withStaticMembers(group.staticMembers()) .withSubscriptionMetadata(subscriptionMetadata) +.withSubscriptionType(ConsumerGroup.subscriptionType( +subscribedTopicsMemberCount, +group.numMembers(), Review Comment: An alternative may be to compare the counts within the Map without considering the group size. This could work because we do not accept empty subscriptions from a member. -> this works as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1583332135 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1350,6 +1351,11 @@ private CoordinatorResult consumerGr .withMembers(group.members()) .withStaticMembers(group.staticMembers()) .withSubscriptionMetadata(subscriptionMetadata) +.withSubscriptionType(ConsumerGroup.subscriptionType( +subscribedTopicsMemberCount, +group.numMembers(), Review Comment: yeah I was actually thinking the same, I saw that the member map is updated in the getOrCreateMember step so I figured the memberCount is updated, unless a member is removed, I'm not sure if that updates elsewhere, or I might be overlooking something -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2083099739 Rely on #15766 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org