Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]
gharris1727 commented on PR #13294: URL: https://github.com/apache/kafka/pull/13294#issuecomment-1895192159 > Wouldn't any unexpected response here simply be indicative of a bug in the Connect runtime since the REST client is only being used internally to forward requests within a Connect cluster? While a bug in the remote connect worker could produce unexpected content, I think it's more likely that for one of these calls to get unexpected content, that what is on the other end of the connection isn't actually a connect worker. For example, if there was a stale DNS resolution, and the request went to an unrelated service. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
dajac commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1454773095 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -935,8 +935,9 @@ private[group] class GroupCoordinator( producerId, producerEpoch, RecordBatch.NO_SEQUENCE, - requestLocal, - postVerificationCallback + // Wrap the callback to be handled on an arbitrary request handler thread + // when transaction verification is complete. + KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback, requestLocal) Review Comment: We still need this in the old coordinator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
dajac commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1454771868 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig, * @param producerId the producer id for the producer writing to the transaction * @param producerEpoch the epoch of the producer writing to the transaction * @param baseSequencethe base sequence of the first record in the batch we are trying to append - * @param requestLocalcontainer for the stateful instances scoped to this request -- this must correspond to the - *thread calling this method * @param callbackthe method to execute once the verification is either completed or returns an error * * When the verification returns, the callback will be supplied the error if it exists or Errors.NONE. * If the verification guard exists, it will also be supplied. Otherwise the SENTINEL verification guard will be returned. - * This guard can not be used for verification and any appends that attenpt to use it will fail. + * This guard can not be used for verification and any appends that attempt to use it will fail. */ def maybeStartTransactionVerificationForPartition( topicPartition: TopicPartition, transactionalId: String, producerId: Long, producerEpoch: Short, baseSequence: Int, -requestLocal: RequestLocal, -callback: (Errors, RequestLocal, VerificationGuard) => Unit +callback: Either[Errors, VerificationGuard] => Unit ): Unit = { -def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors], -newRequestLocal: RequestLocal, -verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { - callback( -preAppendErrors.getOrElse(topicPartition, Errors.NONE), -newRequestLocal, -verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL)) +def generalizedCallback(results: Map[TopicPartition, Either[Errors, VerificationGuard]]): Unit = { Review Comment: Ack. I will take a look at https://github.com/apache/kafka/pull/15087. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1454760385 ## clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +public class SubscriptionPattern { Review Comment: I think we should leave it on the consumer side, as I think its functionalities is more to differenciate between using Java.util.Pattern and using regex that compatible with RE2J. If we were to use a string instead of SubscribedPattern, the user may get confused about why they need to use Pattern for the other subsribe methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1454761116 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ## @@ -86,7 +86,6 @@ import static org.mockito.Mockito.when; public class OffsetsRequestManagerTest { - Review Comment: My bad -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1454760385 ## clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +public class SubscriptionPattern { Review Comment: I think we should leave it on the consumer side, as I think its functionalities is more to differenciate between using Java.util.Pattern and using regex that compatible with RE2J. If we were just to use a string instead of SubscribedPattern, the user may get confused about why they need to use Pattern for the other subsribe methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”
[ https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807585#comment-17807585 ] Phuc Hong Tran commented on KAFKA-16022: [~pnee], were you seeing this exception in the FetchRequestManagerTest or was it some places else? > AsyncKafkaConsumer sometimes complains “No current assignment for partition > {}” > --- > > Key: KAFKA-16022 > URL: https://issues.apache.org/jira/browse/KAFKA-16022 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > This seems to be a timing issue that before the member receives any > assignment from the coordinator, the fetcher will try to find the current > position causing "No current assignment for partition {}". This creates a > small amount of noise to the log. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]
cmccabe commented on PR #15197: URL: https://github.com/apache/kafka/pull/15197#issuecomment-1895021752 Thanks for the quick response on this one, @pprovenzano . LGTM once you address the one comment I made. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]
cmccabe commented on code in PR #15197: URL: https://github.com/apache/kafka/pull/15197#discussion_r1454694471 ## metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java: ## @@ -46,6 +46,18 @@ public final class FeaturesImage { ZkMigrationState.NONE ); +public static final FeaturesImage LATEST = new FeaturesImage( Review Comment: Hmm... I'd rather not add `FeaturesImage.LATEST` here since it feels like something that is only for testing. Can we just add this to `ReplicaManagerTest.scala`, for now? LGTM once that's addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]
satishd closed pull request #15209: MINOR Removed unused CommittedOffsetsFile class. URL: https://github.com/apache/kafka/pull/15209 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15851) broker under replicated due to error java.nio.BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-15851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807561#comment-17807561 ] Yu Wang commented on KAFKA-15851: - got similar stack trace in 2.5.1, recovered after restart. {code:java} java.nio.BufferOverflowException at java.nio.Buffer.nextPutIndex(Buffer.java:533) at java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:796) at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134) at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:507) at kafka.log.Log.$anonfun$roll$8(Log.scala:1900) at kafka.log.Log.$anonfun$roll$2(Log.scala:1900) at kafka.log.Log.roll(Log.scala:2322) at kafka.log.Log.maybeRoll(Log.scala:1849) at kafka.log.Log.$anonfun$append$2(Log.scala:1148) at kafka.log.Log.append(Log.scala:2322) at kafka.log.Log.appendAsFollower(Log.scala:1017) at kafka.cluster.Partition.$anonfun$doAppendRecordsToFollowerOrFutureReplica$1(Partition.scala:924) at kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:917) at kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:931) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:167) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:332) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:320) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:319) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) at scala.collection.AbstractIterable.foreach(Iterable.scala:921) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:319) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:135) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:134) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) {code} > broker under replicated due to error java.nio.BufferOverflowException > - > > Key: KAFKA-15851 > URL: https://issues.apache.org/jira/browse/KAFKA-15851 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.2 > Environment: Kafka Version: 3.3.2 > Deployment mode: zookeeper >Reporter: wangliucheng >Priority: Major > Attachments: p1.png > > > In my kafka cluster, kafka update 2.0 to 3.3.2 version > {*}first start failed{*}, because the same directory was configured > The error is as follows: > > {code:java} > [2023-11-16 10:04:09,952] ERROR (main kafka.Kafka$ 159) Exiting Kafka due to > fatal exception during startup. > java.lang.IllegalStateException: Duplicate log directories for > skydas_sc_tdevirsec-12 are found in both > /data01/kafka/log/skydas_sc_tdevirsec-12 and > /data07/kafka/log/skydas_sc_tdevirsec-12. It is likely because log directory > failure happened while broker was replacing current replica with future > replica. Recover broker from this failure by manually deleting one of the two > directories for this partition. It is recommended to delete the partition in > the log directory that is known to have failed recently. > at kafka.log.LogManager.loadLog(LogManager.scala:305) > at kafka.log.LogManager.$anonfun$loadLogs$14(LogManager.scala:403) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2023-11-16 10:04:09,953] INFO (kafka-shutdown-hook kafka.server.KafkaServer > 66) [KafkaServer id=1434] shutting down {code} > > > *second,* remove /data07/kafka/log in log.dirs and start kafka also reported > an error : > > {code:java} > [2023-11-16 10:13:10,713] INFO (ReplicaFetcherThread-3-1008 > kafka.log.UnifiedLog 66) [UnifiedLog partition=ty_udp_full-60, > dir=/data04/kafka/log] Rolling new log segment (log_size = > 755780551/1073741824}, offset_index_size = 2621440/2621440, time_index_size = > 1747626/1747626, inactive_time_ms = 2970196/60480). > [2023-11-16 10:13:10,714] ERROR (Replica
Re: [PR] KAFKA-16146: Checkpoint log-start-offset for remote log enabled topics [kafka]
kamalcph commented on PR #15201: URL: https://github.com/apache/kafka/pull/15201#issuecomment-1894989811 Test failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]
kamalcph commented on PR #15209: URL: https://github.com/apache/kafka/pull/15209#issuecomment-1894987437 Should we also avoid creating the [remote_log_snapshot](https://sourcegraph.com/github.com/apache/kafka@e563aad4eec2e08c8db54e1afebe28c746130ba4/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java?L75) file under each partition directory? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]
philipnee commented on code in PR #15210: URL: https://github.com/apache/kafka/pull/15210#discussion_r1454575608 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2169,10 +2169,8 @@ public void testCommittedAuthenticationFailure(GroupProtocol groupProtocol) { assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0)); } -// TODO: this test triggers a bug with the CONSUMER group protocol implementation. Review Comment: I'm not sure where is the bug. The test seems to passed locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-5863: Avoid NPE when RestClient calls expecting no-content receive content. [kafka]
yashmayya commented on PR #13294: URL: https://github.com/apache/kafka/pull/13294#issuecomment-1894930823 > because it's inherently untrusted content that shouldn't be put into the log by default I'm not sure I follow why this would be inherently untrusted content? Wouldn't any unexpected response here simply be indicative of a bug in the Connect runtime since the REST client is only being used internally to forward requests within a Connect cluster? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807543#comment-17807543 ] Matthias J. Sax commented on KAFKA-16141: - Thanks. I am wondering if the right fix would be, to change `KeyValueToTimestampedKeyValueByteStoreAdapter` to implement `WrappedStateStore` – in the end, it is a wrapper, but the restore code path does not recognize it as such, and thus cannot "unwrap" it's inner store to pick the right converter? – Comparing to `InMemoryTimestampedKeyValueStoreMarker` it also implement both `WrappedStore` and `TimestampedBytesStore`? We did consider it a bug-fix to add `TimestampedBytesStore` to `KeyValueToTimestampedKeyValueByteStoreAdapter` because in the end, it's does expect `` value byte format an `put()` and also returns this format on `get()`. It's just that the restore code path is not interested in some upper layer, but only in the most inner wrapper store type? > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Almog Gavra >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16113: Add committed and commit sensor to record metrics [kafka]
philipnee opened a new pull request, #15210: URL: https://github.com/apache/kafka/pull/15210 In this PR, I'm adding sensor to the `CommitRequestManager` to record the necessary metrics, i.e.: ``` commit-latency-avg commit-latency-max commit-rate commit-total committed-time-ns-total ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR Removed unused CommittedOffsetsFile class. [kafka]
satishd opened a new pull request, #15209: URL: https://github.com/apache/kafka/pull/15209 We will introduce the same when it is required for enhancing TBRLMM to consume from a specific offset when snapshots are implemented. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807520#comment-17807520 ] Almog Gavra commented on KAFKA-16141: - OK, after doing some more digging I don't think it's related to my change. Instead I believe it was caused by [https://github.com/apache/kafka/pull/14570] - that PR changed {code:java} KeyValueToTimestampedKeyValueByteStoreAdapter implements ...{code} to {code:java} KeyValueToTimestampedKeyValueByteStoreAdapter implements ..., TimestampedBytesStore {code} This caused WrappedStateStore#isTimestamped to return true when previously it returned false. This, in turn, caused us to initialize the store with the RecordConverts.RAW_TO_TIMESTAMED_INSTANCE as the converter (see StateManagerUtil#converterForStore). After a restore, the converter will prepend the record timestamp when it shouldn't, because an additional timestamp is then prepended when it goes through the adapter. Related: https://issues.apache.org/jira/browse/KAFKA-15629 cc [~mjsax] [~hanyuzheng] > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Almog Gavra >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16131) Repeated UnsupportedVersionException logged when running Kafka 3.7.0-RC2 KRaft cluster with metadata version 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano updated KAFKA-16131: -- Affects Version/s: 3.7.0 > Repeated UnsupportedVersionException logged when running Kafka 3.7.0-RC2 > KRaft cluster with metadata version 3.6 > > > Key: KAFKA-16131 > URL: https://issues.apache.org/jira/browse/KAFKA-16131 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Assignee: Proven Provenzano >Priority: Blocker > Fix For: 3.7.0 > > > When running Kafka 3.7.0-RC2 as a KRaft cluster with metadata version set to > 3.6-IV2 metadata version, it throws repeated errors like this in the > controller logs: > {quote}2024-01-13 16:58:01,197 INFO [QuorumController id=0] > assignReplicasToDirs: event failed with UnsupportedVersionException in 15 > microseconds. (org.apache.kafka.controller.QuorumController) > [quorum-controller-0-event-handler] > 2024-01-13 16:58:01,197 ERROR [ControllerApis nodeId=0] Unexpected error > handling request RequestHeader(apiKey=ASSIGN_REPLICAS_TO_DIRS, apiVersion=0, > clientId=1000, correlationId=14, headerVersion=2) – > AssignReplicasToDirsRequestData(brokerId=1000, brokerEpoch=5, > directories=[DirectoryData(id=w_uxN7pwQ6eXSMrOKceYIQ, > topics=[TopicData(topicId=bvAKLSwmR7iJoKv2yZgygQ, > partitions=[PartitionData(partitionIndex=2), > PartitionData(partitionIndex=1)]), TopicData(topicId=uNe7f5VrQgO0zST6yH1jDQ, > partitions=[PartitionData(partitionIndex=0)])])]) with context > RequestContext(header=RequestHeader(apiKey=ASSIGN_REPLICAS_TO_DIRS, > apiVersion=0, clientId=1000, correlationId=14, headerVersion=2), > connectionId='172.16.14.219:9090-172.16.14.217:53590-7', > clientAddress=/[172.16.14.217|http://172.16.14.217/], > principal=User:CN=my-cluster-kafka,O=io.strimzi, > listenerName=ListenerName(CONTROLPLANE-9090), securityProtocol=SSL, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.7.0), fromPrivilegedListener=false, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@71004ad2]) > (kafka.server.ControllerApis) [quorum-controller-0-event-handler] > java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.UnsupportedVersionException: Directory > assignment is not supported yet. > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:880) > at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:148) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:137) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:840) > Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: > Directory assignment is not supported yet. > {quote} > > With the metadata version set to 3.6-IV2, it makes sense that the request is > not supported. But the request should in such case not be sent at all. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]
kirktrue commented on PR #15186: URL: https://github.com/apache/kafka/pull/15186#issuecomment-1894795635 @ijuma / @stanislavkozlovski are either of you able to review? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax commented on PR #15207: URL: https://github.com/apache/kafka/pull/15207#issuecomment-1894793526 This PR must be cherry-picked to `3.7` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: code cleanup [kafka]
mjsax commented on PR #15208: URL: https://github.com/apache/kafka/pull/15208#issuecomment-1894793322 This PR must be cherry-picked to `3.7` and `3.6` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax commented on code in PR #15199: URL: https://github.com/apache/kafka/pull/15199#discussion_r1454318352 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -71,6 +71,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import static org.apache.kafka.common.config.ConfigDef.parseType; +import static org.apache.kafka.streams.internals.UpgradeFromValues.UPGRADE_FROM_35; Review Comment: After merging and doing the follow up PR to add `3.6`, I realized that using this import is not what we want to do. Call for review (follow up cleanup PR): https://github.com/apache/kafka/pull/15208 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax commented on PR #15199: URL: https://github.com/apache/kafka/pull/15199#issuecomment-1894790719 Follow-up PR to add `3.6`: https://github.com/apache/kafka/pull/15207 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax opened a new pull request, #15207: URL: https://github.com/apache/kafka/pull/15207 Adds version 3.6 to the possible values for config upgrade_from. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16104: Enable additional PlaintextConsumerTest tests for new consumer [kafka]
kirktrue opened a new pull request, #15206: URL: https://github.com/apache/kafka/pull/15206 We reevaluated the integration tests that were disabled for the new consumer group protocol which _should_ be supported. The evaluation was to run the `PlaintextConsumerTest` suite ten times and see which tests passed and which failed. Based on that evaluation, the following test can now be enabled: - `testAutoCommitOnClose` - `testAutoCommitOnCloseAfterWakeup` - `testAutoCommitOnRebalance` - `testExpandingTopicSubscriptions` - `testMultiConsumerSessionTimeoutOnClose` - `testMultiConsumerSessionTimeoutOnStopPolling` - `testShrinkingTopicSubscriptions` There are three tests which consistently failed. For each, a dedicated Jira was created to track and fix. Those that failed: - `testPerPartitionLagMetricsCleanUpWithSubscribe` (failure rate 100%, KAFKA-16150) - `testPerPartitionLeadMetricsCleanUpWithSubscribe` (failure rate: 70%, KAFKA-16151) - `testStaticConsumerDetectsNewPartitionCreatedAfterRestart` (failure rate: 100%, KAFKA-16152) See [KAFKA-16104](https://issues.apache.org/jira/browse/KAFKA-16104) for more details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax commented on PR #15199: URL: https://github.com/apache/kafka/pull/15199#issuecomment-1894786932 Merged to `trunk` and cherry-picked to `3.7` and `3.6` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax merged PR #15199: URL: https://github.com/apache/kafka/pull/15199 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax commented on PR #15199: URL: https://github.com/apache/kafka/pull/15199#issuecomment-1894782497 I just checked the test failures on https://github.com/apache/kafka/pull/15151 (cf last comment) and it failed because `3.6` is missing. Can you also do a follow up PR for 3.6 branch which only add `3.5` to upgrade_from? (or use this PR and do a follow up for trunk/3.7 to add 3.6 instead?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16104: -- Description: It should be possible to enable: * testAutoCommitOnClose * testAutoCommitOnCloseAfterWakeup * testExpandingTopicSubscriptions * testShrinkingTopicSubscriptions * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) * testMultiConsumerSessionTimeoutOnStopPolling * testAutoCommitOnRebalance * testPerPartitionLeadMetricsCleanUpWithSubscribe * testPerPartitionLagMetricsCleanUpWithSubscribe * testStaticConsumerDetectsNewPartitionCreatedAfterRestart was: It should be possible to enable: * testAutoCommitOnClose * testAutoCommitOnCloseAfterWakeup * testExpandingTopicSubscriptions * testShrinkingTopicSubscriptions * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) * testAutoCommitOnRebalance * testPerPartitionLeadMetricsCleanUpWithSubscribe * testPerPartitionLagMetricsCleanUpWithSubscribe * testStaticConsumerDetectsNewPartitionCreatedAfterRestart > Enable additional PlaintextConsumerTest tests for new consumer > -- > > Key: KAFKA-16104 > URL: https://issues.apache.org/jira/browse/KAFKA-16104 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > It should be possible to enable: > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup > * testExpandingTopicSubscriptions > * testShrinkingTopicSubscriptions > * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) > * testMultiConsumerSessionTimeoutOnStopPolling > * testAutoCommitOnRebalance > * testPerPartitionLeadMetricsCleanUpWithSubscribe > * testPerPartitionLagMetricsCleanUpWithSubscribe > * testStaticConsumerDetectsNewPartitionCreatedAfterRestart -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]
jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1454293498 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -712,13 +712,14 @@ public void run() { try { // Apply the records to the state machine. if (result.replayRecords()) { -result.records().forEach(record -> +for (int i = 0; i < result.records().size(); i++) { context.coordinator.replay( +prevLastWrittenOffset + i, Review Comment: is this the only way to get the offsets? 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]
jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1454285529 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -979,15 +981,30 @@ public void replayEndTransactionMarker( pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { topicOffsets.forEach((topicName, partitionOffsets) -> { partitionOffsets.forEach((partitionId, offsetAndMetadata) -> { -log.debug("Committed transaction offset commit for producer id {} in group {} " + -"with topic {}, partition {}, and offset {}.", -producerId, groupId, topicName, partitionId, offsetAndMetadata); -offsets.put( +OffsetAndMetadata existingOffsetAndMetadata = offsets.get( groupId, topicName, -partitionId, -offsetAndMetadata +partitionId ); + +// We always keep the most recent committed offset when we have a mix of transactional and regular +// offset commits. Without preserving information of the commit record offset, compaction of the +// __consumer_offsets topic itself may result in the wrong offset commit being materialized. +if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) { +log.debug("Committed transactional offset commit {} for producer id {} in group {} " + +"with topic {} and partition {}.", +offsetAndMetadata, producerId, groupId, topicName, partitionId); +offsets.put( Review Comment: if the offset is not the latest offset were we incorrectly saying the last transactional commit offset was the latest one? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -979,15 +981,30 @@ public void replayEndTransactionMarker( pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { topicOffsets.forEach((topicName, partitionOffsets) -> { partitionOffsets.forEach((partitionId, offsetAndMetadata) -> { -log.debug("Committed transaction offset commit for producer id {} in group {} " + -"with topic {}, partition {}, and offset {}.", -producerId, groupId, topicName, partitionId, offsetAndMetadata); -offsets.put( +OffsetAndMetadata existingOffsetAndMetadata = offsets.get( groupId, topicName, -partitionId, -offsetAndMetadata +partitionId ); + +// We always keep the most recent committed offset when we have a mix of transactional and regular +// offset commits. Without preserving information of the commit record offset, compaction of the +// __consumer_offsets topic itself may result in the wrong offset commit being materialized. +if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) { +log.debug("Committed transactional offset commit {} for producer id {} in group {} " + +"with topic {} and partition {}.", +offsetAndMetadata, producerId, groupId, topicName, partitionId); +offsets.put( Review Comment: if the offset is not the latest offset were we incorrectly saying the transactional commit offset was the latest one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16008: -- Issue Type: Bug (was: Test) > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848 > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16009: -- Issue Type: Bug (was: Test) > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848 > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]
jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1454285529 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -979,15 +981,30 @@ public void replayEndTransactionMarker( pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { topicOffsets.forEach((topicName, partitionOffsets) -> { partitionOffsets.forEach((partitionId, offsetAndMetadata) -> { -log.debug("Committed transaction offset commit for producer id {} in group {} " + -"with topic {}, partition {}, and offset {}.", -producerId, groupId, topicName, partitionId, offsetAndMetadata); -offsets.put( +OffsetAndMetadata existingOffsetAndMetadata = offsets.get( groupId, topicName, -partitionId, -offsetAndMetadata +partitionId ); + +// We always keep the most recent committed offset when we have a mix of transactional and regular +// offset commits. Without preserving information of the commit record offset, compaction of the +// __consumer_offsets topic itself may result in the wrong offset commit being materialized. +if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) { +log.debug("Committed transactional offset commit {} for producer id {} in group {} " + +"with topic {} and partition {}.", +offsetAndMetadata, producerId, groupId, topicName, partitionId); +offsets.put( Review Comment: if the offset is not the latest offset were we incorrectly saying the last committed offset was the latest one? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -979,15 +981,30 @@ public void replayEndTransactionMarker( pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { topicOffsets.forEach((topicName, partitionOffsets) -> { partitionOffsets.forEach((partitionId, offsetAndMetadata) -> { -log.debug("Committed transaction offset commit for producer id {} in group {} " + -"with topic {}, partition {}, and offset {}.", -producerId, groupId, topicName, partitionId, offsetAndMetadata); -offsets.put( +OffsetAndMetadata existingOffsetAndMetadata = offsets.get( groupId, topicName, -partitionId, -offsetAndMetadata +partitionId ); + +// We always keep the most recent committed offset when we have a mix of transactional and regular +// offset commits. Without preserving information of the commit record offset, compaction of the +// __consumer_offsets topic itself may result in the wrong offset commit being materialized. +if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) { +log.debug("Committed transactional offset commit {} for producer id {} in group {} " + +"with topic {} and partition {}.", +offsetAndMetadata, producerId, groupId, topicName, partitionId); +offsets.put( Review Comment: if the offset is not the latest offset were we incorrectly saying the last transactionally committed offset was the latest one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]
jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1454282345 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -918,7 +920,7 @@ public void replay( groupId, topic, partition, -OffsetAndMetadata.fromRecord(value) +OffsetAndMetadata.fromRecord(offset, value) Review Comment: it is interesting that the offset passed in here is new (probably not the offset that OffsetAndMetadata is referring to) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [7/7] Always materialize the most recent committed offset [kafka]
jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1454280167 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java: ## @@ -92,30 +117,34 @@ public boolean equals(Object o) { OffsetAndMetadata that = (OffsetAndMetadata) o; -if (offset != that.offset) return false; +if (committedOffset != that.committedOffset) return false; if (commitTimestampMs != that.commitTimestampMs) return false; -if (!leaderEpoch.equals(that.leaderEpoch)) return false; -if (!metadata.equals(that.metadata)) return false; -return expireTimestampMs.equals(that.expireTimestampMs); +if (recordOffset != that.recordOffset) return false; +if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false; Review Comment: any reason why we changed from the .equals to Objects.equals? I guess it is safer if one of them can be null -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
jolshan commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1454273994 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -935,8 +935,9 @@ private[group] class GroupCoordinator( producerId, producerEpoch, RecordBatch.NO_SEQUENCE, - requestLocal, - postVerificationCallback + // Wrap the callback to be handled on an arbitrary request handler thread + // when transaction verification is complete. + KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback, requestLocal) Review Comment: Why are we calling this here? I thought we wanted to avoid this wrap here and only do it for produce requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
jolshan commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1454268198 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig, * @param producerId the producer id for the producer writing to the transaction * @param producerEpoch the epoch of the producer writing to the transaction * @param baseSequencethe base sequence of the first record in the batch we are trying to append - * @param requestLocalcontainer for the stateful instances scoped to this request -- this must correspond to the - *thread calling this method * @param callbackthe method to execute once the verification is either completed or returns an error * * When the verification returns, the callback will be supplied the error if it exists or Errors.NONE. * If the verification guard exists, it will also be supplied. Otherwise the SENTINEL verification guard will be returned. - * This guard can not be used for verification and any appends that attenpt to use it will fail. + * This guard can not be used for verification and any appends that attempt to use it will fail. */ def maybeStartTransactionVerificationForPartition( topicPartition: TopicPartition, transactionalId: String, producerId: Long, producerEpoch: Short, baseSequence: Int, -requestLocal: RequestLocal, -callback: (Errors, RequestLocal, VerificationGuard) => Unit +callback: Either[Errors, VerificationGuard] => Unit ): Unit = { -def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors], -newRequestLocal: RequestLocal, -verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { - callback( -preAppendErrors.getOrElse(topicPartition, Errors.NONE), -newRequestLocal, -verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL)) +def generalizedCallback(results: Map[TopicPartition, Either[Errors, VerificationGuard]]): Unit = { Review Comment: https://github.com/apache/kafka/pull/15087 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
jolshan commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1454267538 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1090,38 +1090,29 @@ class ReplicaManager(val config: KafkaConfig, * @param producerId the producer id for the producer writing to the transaction * @param producerEpoch the epoch of the producer writing to the transaction * @param baseSequencethe base sequence of the first record in the batch we are trying to append - * @param requestLocalcontainer for the stateful instances scoped to this request -- this must correspond to the - *thread calling this method * @param callbackthe method to execute once the verification is either completed or returns an error * * When the verification returns, the callback will be supplied the error if it exists or Errors.NONE. * If the verification guard exists, it will also be supplied. Otherwise the SENTINEL verification guard will be returned. - * This guard can not be used for verification and any appends that attenpt to use it will fail. + * This guard can not be used for verification and any appends that attempt to use it will fail. */ def maybeStartTransactionVerificationForPartition( topicPartition: TopicPartition, transactionalId: String, producerId: Long, producerEpoch: Short, baseSequence: Int, -requestLocal: RequestLocal, -callback: (Errors, RequestLocal, VerificationGuard) => Unit +callback: Either[Errors, VerificationGuard] => Unit ): Unit = { -def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors], -newRequestLocal: RequestLocal, -verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { - callback( -preAppendErrors.getOrElse(topicPartition, Errors.NONE), -newRequestLocal, -verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL)) +def generalizedCallback(results: Map[TopicPartition, Either[Errors, VerificationGuard]]): Unit = { Review Comment: Please take a look at my refactor PR. I have some this to some extent. I'd prefer not to overhaul it again (as I did after the previous group coordinator change) Hopefully it makes this work easier too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16139: Fix StreamsUpgradeTest [kafka]
mjsax commented on PR #15199: URL: https://github.com/apache/kafka/pull/15199#issuecomment-1894743876 Is this fix sufficient? Don't we not also need to add `3.6` as "upgrade_from" version? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16042: Add byte-rate metrics for topic and partition [kafka]
ex172000 commented on PR #15085: URL: https://github.com/apache/kafka/pull/15085#issuecomment-1894726845 FYI: We are making a similar effort here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-977%3A+Partition-Level+Throughput+Metrics -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16126: Kcontroller dynamic configurations may fail to apply at startup [kafka]
cmccabe commented on PR #15192: URL: https://github.com/apache/kafka/pull/15192#issuecomment-1894711882 committed, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16126: Kcontroller dynamic configurations may fail to apply at startup [kafka]
cmccabe closed pull request #15192: KAFKA-16126: Kcontroller dynamic configurations may fail to apply at startup URL: https://github.com/apache/kafka/pull/15192 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 16131: Only update directoryIds if the metadata version supports DirectoryAssignment [kafka]
pprovenzano commented on PR #15197: URL: https://github.com/apache/kafka/pull/15197#issuecomment-1894699271 Test failures are unrelated to the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
dongnuo123 commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1454202465 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -524,37 +495,39 @@ public CompletableFuture listGroups( ); } -final CompletableFuture future = new CompletableFuture<>(); -final List results = new ArrayList<>(); final Set existingPartitionSet = runtime.partitions(); -final AtomicInteger cnt = new AtomicInteger(existingPartitionSet.size()); if (existingPartitionSet.isEmpty()) { return CompletableFuture.completedFuture(new ListGroupsResponseData()); } +final List>> futures = +new ArrayList<>(); + for (TopicPartition tp : existingPartitionSet) { -runtime.scheduleReadOperation( +futures.add(runtime.scheduleReadOperation( "list-groups", tp, (coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), lastCommittedOffset) -).handle((groups, exception) -> { -if (exception == null) { -synchronized (results) { -results.addAll(groups); -} +).exceptionally(exception -> { +exception = Errors.maybeUnwrapException(exception); +if (exception instanceof NotCoordinatorException) { +return Collections.emptyList(); } else { -if (!(exception instanceof NotCoordinatorException)) { -future.complete(new ListGroupsResponseData().setErrorCode(Errors.forException(exception).code())); -} -} -if (cnt.decrementAndGet() == 0) { -future.complete(new ListGroupsResponseData().setGroups(results)); +throw new CompletionException(exception); } -return null; -}); +})); } -return future; + +return FutureUtils +.combineFutures(futures, ArrayList::new, List::addAll) +.thenApply(groups -> new ListGroupsResponseData().setGroups(groups)) +.exceptionally(exception -> handleOperationException( +"ListGroups", +request, +exception, +(error, __) -> new ListGroupsResponseData().setErrorCode(error.code()) +)); Review Comment: Yeah, it makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: populate TopicName in ConsumerGroupDescribe [kafka]
dongnuo123 opened a new pull request, #15205: URL: https://github.com/apache/kafka/pull/15205 The patch populates the topic name of `ConsumerGroupDescribeResponseData.TopicPartitions` with the corresponding topic id in `ConsumerGroupDescribe`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16152) Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart
Kirk True created KAFKA-16152: - Summary: Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart Key: KAFKA-16152 URL: https://issues.apache.org/jira/browse/KAFKA-16152 Project: Kafka Issue Type: Bug Components: clients, consumer, unit tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-16151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16151: -- Summary: Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe (was: Fix PlaintextConsumerTest.testPerPartitionLedMetricsCleanUpWithSubscribe) > Fix PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe > - > > Key: KAFKA-16151 > URL: https://issues.apache.org/jira/browse/KAFKA-16151 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16150) Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe
Kirk True created KAFKA-16150: - Summary: Fix PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe Key: KAFKA-16150 URL: https://issues.apache.org/jira/browse/KAFKA-16150 Project: Kafka Issue Type: Bug Components: clients, consumer, unit tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16151) Fix PlaintextConsumerTest.testPerPartitionLedMetricsCleanUpWithSubscribe
Kirk True created KAFKA-16151: - Summary: Fix PlaintextConsumerTest.testPerPartitionLedMetricsCleanUpWithSubscribe Key: KAFKA-16151 URL: https://issues.apache.org/jira/browse/KAFKA-16151 Project: Kafka Issue Type: Bug Components: clients, consumer, unit tests Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16149) Aggressively expire unused client connections
Kirk True created KAFKA-16149: - Summary: Aggressively expire unused client connections Key: KAFKA-16149 URL: https://issues.apache.org/jira/browse/KAFKA-16149 Project: Kafka Issue Type: Improvement Components: clients, consumer, producer Reporter: Kirk True Assignee: Kirk True The goal is to minimize the number of connections from the client to the brokers. On the Java client, there are potentially two types of network connections to brokers: # Connections for metadata requests # Connections for fetch, produce, etc. requests The idea is to apply a much shorter idle time to client connections that have _only_ served metadata (type 1 above) so that they become candidates for expiration more quickly. Alternatively (or additionally), a change to the way metadata requests are routed could be made to reduce the number of connections. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1454185664 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -141,17 +144,33 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Return topic partition metadata for the given topic, listener and index range. Also, return a boolean value to + * indicate whether there are more partitions with index equal or larger than the upper index. + * + * @param image The metadata image + * @param topicName The name of the topic. + * @param listenerNameThe listener name. + * @param startIndex The smallest index of the partitions to be included in the result. + * @param upperIndex The upper limit of the index of the partitions to be included in the result. + *Note that, the upper index can be larger than the largest partition index in + *this topic. + * @returnA collection of topic partition metadata and whether there are more partitions. + */ private def getPartitionMetadataForDescribeTopicResponse( image: MetadataImage, topicName: String, -listenerName: ListenerName - ): Option[List[DescribeTopicPartitionsResponsePartition]] = { +listenerName: ListenerName, +startIndex: Int, +upperIndex: Int + ): (Option[List[DescribeTopicPartitionsResponsePartition]], Boolean) = { Option(image.topics().getTopic(topicName)) match { - case None => None + case None => (None, false) case Some(topic) => { -val partitions = Some(topic.partitions().entrySet().asScala.map { entry => - val partitionId = entry.getKey - val partition = entry.getValue +val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]() +val endIndex = upperIndex.min(topic.partitions().size()) +for (partitionId <- startIndex until endIndex) { + val partition = topic.partitions().get(partitionId) Review Comment: Actually it is not possible, the partition index starts with 0 and increments by 1. Then what is the case if the partition does not exist? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807487#comment-17807487 ] Phuc Hong Tran commented on KAFKA-15538: [~lianetm] thanks for the comments. Just to clarify though, isn't the section that you sent [here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L537] supposed to be used for the SubscriptionPattern, not the java.util.regex.Pattern, as the list of topic to be sent which match the regex is already set [here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L531]? > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1894635327 Thanks @lianetm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16083) Exclude throttle time when expiring inflight requests on a connection
[ https://issues.apache.org/jira/browse/KAFKA-16083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adithya Chandra updated KAFKA-16083: Affects Version/s: (was: 3.7.0) > Exclude throttle time when expiring inflight requests on a connection > - > > Key: KAFKA-16083 > URL: https://issues.apache.org/jira/browse/KAFKA-16083 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Adithya Chandra >Priority: Critical > Fix For: 3.8.0 > > > When expiring inflight requests, the network client does not take throttle > time into account. If a connection has multiple inflight requests (default of > 5) and each request is throttled then some of the requests can incorrectly > marked as expired. Subsequently the connection is closed and the client > establishes a new connection to the broker. This behavior leads to > unnecessary connections to the broker, leads to connection storms and > increases latencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16083) Exclude throttle time when expiring inflight requests on a connection
[ https://issues.apache.org/jira/browse/KAFKA-16083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adithya Chandra updated KAFKA-16083: Affects Version/s: 3.7.0 > Exclude throttle time when expiring inflight requests on a connection > - > > Key: KAFKA-16083 > URL: https://issues.apache.org/jira/browse/KAFKA-16083 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Adithya Chandra >Priority: Critical > Fix For: 3.8.0 > > > When expiring inflight requests, the network client does not take throttle > time into account. If a connection has multiple inflight requests (default of > 5) and each request is throttled then some of the requests can incorrectly > marked as expired. Subsequently the connection is closed and the client > establishes a new connection to the broker. This behavior leads to > unnecessary connections to the broker, leads to connection storms and > increases latencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16083) Exclude throttle time when expiring inflight requests on a connection
[ https://issues.apache.org/jira/browse/KAFKA-16083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adithya Chandra resolved KAFKA-16083. - Fix Version/s: 3.8.0 Reviewer: Stanislav Kozlovski Resolution: Fixed > Exclude throttle time when expiring inflight requests on a connection > - > > Key: KAFKA-16083 > URL: https://issues.apache.org/jira/browse/KAFKA-16083 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Adithya Chandra >Priority: Critical > Fix For: 3.8.0 > > > When expiring inflight requests, the network client does not take throttle > time into account. If a connection has multiple inflight requests (default of > 5) and each request is throttled then some of the requests can incorrectly > marked as expired. Subsequently the connection is closed and the client > establishes a new connection to the broker. This behavior leads to > unnecessary connections to the broker, leads to connection storms and > increases latencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
lianetm commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1894569927 This is the task to closely follow https://issues.apache.org/jira/browse/KAFKA-14517, where the broker will support the new regex. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807434#comment-17807434 ] Almog Gavra commented on KAFKA-16141: - I confirmed it’s most likely related to my change - changing it from: final KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore(persistentMemoryStoreName); to final KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentTimestampedKeyValueStore(persistentMemoryStoreName); makes the test pass, so it’s almost certainly the same bug as in https://issues.apache.org/jira/browse/KAFKA-16046 > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Almog Gavra >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16148) Implement GroupMetadataManager#onUnloaded
Jeff Kim created KAFKA-16148: Summary: Implement GroupMetadataManager#onUnloaded Key: KAFKA-16148 URL: https://issues.apache.org/jira/browse/KAFKA-16148 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim complete all awaiting futures with NOT_COORDINATOR (for classic group) transition all groups to DEAD. Cancel all timers related to the unloaded group metadata manager -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on PR #15188: URL: https://github.com/apache/kafka/pull/15188#issuecomment-1894475663 @lianetm, thanks for the comments, I will make sure to address those points in my next PR. Regarding your point about passing the regex for HeartbeatRequestManager, I origninally included that in my code change, then I came across this PR https://github.com/apache/kafka/pull/14956 and decided that we need to wait for the broker to implement new regex logic first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
lianetm commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1453968041 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java: ## @@ -86,7 +86,6 @@ import static org.mockito.Mockito.when; public class OffsetsRequestManagerTest { - Review Comment: Nit: I find it's better to avoid changes in unrelated files, even if minor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
lianetm commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1453966464 ## clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +public class SubscriptionPattern { Review Comment: This class would need some doc explaining what it represents. Also not sure if this is the right place for it (given that this whole intention with the new regex is driven by the broker, but is not implemented yet). So at this point is not clear to me if we would prefer to define this on the broker side to be used there? Could be. As suggested in the Jira, maybe we should wait for the broker implementation of the new regex, and then align on this class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
lianetm commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1453958313 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ## @@ -84,6 +85,9 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; +/* we should rename this to something more specific */ Review Comment: Agree that it's confusing but can't think of a better naming. I would suggest though that we add a proper comment, stating that this represents the RE2J regex (vs the java regex represented by the `Pattern subscribedPattern` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
lianetm commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1453953690 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -494,6 +495,16 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } +@Override +public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + Review Comment: Not stated in the KIP I believe, but I would say that we should log at least a warn indicating that this is not supported with the legacy protocol (similar to what the KIP states for logging a warn for the `enforceRebalance` that is not supported with the new protocol, and that we already do [here](https://github.com/apache/kafka/blob/055ff2b831193f5935f9efc2f7809f853f63de5f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1187)) ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -494,6 +495,16 @@ public void subscribe(Pattern pattern) { subscribeInternal(pattern, Optional.empty()); } +@Override +public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { + +} + +@Override +public void subscribe(SubscriptionPattern pattern) { + Review Comment: same as above comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 15561: Client support for new SubscriptionPattern based subscription [kafka]
lianetm commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1453945617 ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -753,6 +753,10 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } +@Override +public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {} Review Comment: I expect we should be calling the delegate.subscribe(SubscriptionPattern..) here, otherwise the actual implementation in the AsyncKafkaConsumer won't be called. Similar to the subscribe(Pattern..) (This KafkaConsumer is the user-facing api, that ends up calling the Legacy or Async consumer via the delegate) ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -753,6 +753,10 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { public void subscribe(Pattern pattern) { delegate.subscribe(pattern); } +@Override +public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {} +@Override +public void subscribe(SubscriptionPattern pattern) {} Review Comment: Same as above comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807323#comment-17807323 ] Lianet Magrans edited comment on KAFKA-15538 at 1/16/24 7:45 PM: - Hey [~phuctran], this ticket is not fully implemented yet. There are some bits of it at the consumer level already, that [~kirktrue] worked on just as an initial approach, but that needs to be reviewed and make sure nothing else need to be wired up to the HeartbeatRequestManager, to ensure that the list of topics matching the regex are sent to the broker (see [here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L537]). We should also ensure that we re-enable all the pattern subscription related integration tests, that are currently disabled in the PlainTextAsyncConsumer (ex. [testPatternSubscription|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L356C7-L356C30]). I will re-open the issue. Regarding documentation, for the legacy consumer all we have is the java doc (afaik). For the new consumer, we do have a one-pager on the AK wiki [here|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design]. Hope it helps! was (Author: JIRAUSER300183): Hey [~phuctran], this ticket is not fully implemented yet. There are some bits of it at the consumer level already, that [~kirktrue] worked on just as an initial approach, but that needs to be reviewed and wired up to the HeartbeatRequestManager, to ensure that the list of topics matching the regex are sent to the broker (see [here|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L537]). We should also ensure that we re-enable all the pattern subscription related integration tests, that are currently disabled in the PlainTextAsyncConsumer (ex. [testPatternSubscription|https://github.com/apache/kafka/blob/dd0916ef9a6276d191196f79176bcb725e1ff9e6/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L356C7-L356C30]). I will re-open the issue. Regarding documentation, for the legacy consumer all we have is the java doc (afaik). For the new consumer, we do have a one-pager on the AK wiki [here|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design]. Hope it helps! > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1453879095 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -367,6 +372,42 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { assertPoll(1, commitRequestManger); } +@Test Review Comment: similar to the comment above - we autocommit on close - should the interceptor be triggered? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807411#comment-17807411 ] Matthias J. Sax commented on KAFKA-16141: - Assigned to [~agavra] and marked as blocker. Might be a regression introduced via KIP-954. > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Almog Gavra >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807412#comment-17807412 ] Phuc Hong Tran commented on KAFKA-15561: [~lianetm], PTAL if you have time. Thanks > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16141: Priority: Blocker (was: Major) > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Almog Gavra >Priority: Blocker > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
OmniaGM commented on PR #15158: URL: https://github.com/apache/kafka/pull/15158#issuecomment-1894343924 > @OmniaGM It looks like there are a [few build failures](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15158/6/pipeline/10): > > ``` > [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15158/core/src/main/scala/kafka/server/DynamicConfig.scala:26:58: Unused import > [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15158/core/src/main/scala/kafka/server/KafkaConfig.scala:47:50: Unused import > ``` fixed this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
OmniaGM commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1453877298 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { +// Log-level config default values +public static final int DEFAULT_NUM_PARTITIONS = 50; +public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; +public static final short DEFAULT_REPLICATION_FACTOR = 3; +public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; +public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +} Review Comment: Done, and did the same to other files I created as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1453877053 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -844,6 +849,54 @@ public void testWakeupCommitted() { assertNull(consumer.wakeupTrigger().getPendingTask()); } +@Test Review Comment: what's the interceptor behavior on close? if we have inflight commits before closing the consumer, should the interceptors be invoked? can we add tests around that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
[ https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15878: -- Fix Version/s: 3.8.0 > KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER > > > Key: KAFKA-15878 > URL: https://issues.apache.org/jira/browse/KAFKA-15878 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Anuj Sharma >Priority: Major > Labels: oauth > Fix For: 3.8.0 > > > {code:java} > // code placeholder > {code} > h1. Overview > * This issue pertains to > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism of Kafka authentication. > * Kafka clients can use [SASL/OAUTHBEARER > |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism > by overriding the [custom call back > handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] > . > * > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > available from v3.1 further extends the mechanism with a production grade > implementation. > * Kafka's > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is > because of a more restrictive set of characters than what > [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] > recommends. > * This JIRA can be considered an extension of > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > to support the opaque tokens as well apart from the JWT tokens. > > In summary the following character set should be supported as per the RFC - > {code:java} > 1*( ALPHA / DIGIT / >"-" / "." / "_" / "~" / "+" / "/" ) *"=" > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-16141: --- Assignee: Almog Gavra > StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails > consistently in 3.7 > > > Key: KAFKA-16141 > URL: https://issues.apache.org/jira/browse/KAFKA-16141 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0 >Reporter: Stanislav Kozlovski >Assignee: Almog Gavra >Priority: Major > > {code:java} > kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ > “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} > TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from > ubuntu@worker26") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", > line 79, in test_standby_tasks_rebalance > self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", > line 96, in wait_for_verification > err_msg="Did expect to read '%s' from %s" % (message, > processor.node.account)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", > line 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 > STANDBY_TASKS:[1-3]' from ubuntu@worker26 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
[ https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15878: -- Labels: oauth (was: ) > KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER > > > Key: KAFKA-15878 > URL: https://issues.apache.org/jira/browse/KAFKA-15878 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Anuj Sharma >Priority: Major > Labels: oauth > > {code:java} > // code placeholder > {code} > h1. Overview > * This issue pertains to > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism of Kafka authentication. > * Kafka clients can use [SASL/OAUTHBEARER > |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism > by overriding the [custom call back > handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] > . > * > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > available from v3.1 further extends the mechanism with a production grade > implementation. > * Kafka's > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is > because of a more restrictive set of characters than what > [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] > recommends. > * This JIRA can be considered an extension of > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > to support the opaque tokens as well apart from the JWT tokens. > > In summary the following character set should be supported as per the RFC - > {code:java} > 1*( ALPHA / DIGIT / >"-" / "." / "_" / "~" / "+" / "/" ) *"=" > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER [kafka]
kirktrue commented on PR #14818: URL: https://github.com/apache/kafka/pull/14818#issuecomment-1894339862 @jcme—I wrote and implemented KIP-768, so I'll take a look at this. Also, are you able to assign the Jira to yourself? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1453871780 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.FencedInstanceIdException; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback} amd + * {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}s. This is + * achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the + * future completion, and execute the callbacks when user polls/commits/closes the consumer. + */ +public class OffsetCommitCallbackInvoker { +private final ConsumerInterceptors interceptors; + +OffsetCommitCallbackInvoker(ConsumerInterceptors interceptors) { +this.interceptors = interceptors; +} + +// Thread-safe queue to store user-defined callbacks and interceptors to be executed +private final BlockingQueue callbackQueue = new LinkedBlockingQueue<>(); + +public void submitCommitInterceptors(final Map offsets) { +if (!interceptors.isEmpty()) { +callbackQueue.add(new OffsetCommitCallbackTask( +(innerOffsets, exception) -> interceptors.onCommit(innerOffsets), +offsets, +null +)); +} +} + +public void submitUserCallback(final OffsetCommitCallback callback, + final Map offsets, + final Exception exception) { +callbackQueue.add(new OffsetCommitCallbackTask(callback, offsets, exception)); +} + +/** + * @return true if an offset commit was fenced. + */ +public boolean executeCallbacks() { Review Comment: see comment above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1453870149 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -93,9 +94,11 @@ public CommitRequestManager( final SubscriptionState subscriptions, final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, +final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, final String groupId, final Optional groupInstanceId) { -this(time, logContext, subscriptions, config, coordinatorRequestManager, groupId, +this(time, logContext, subscriptions, config, coordinatorRequestManager, Review Comment: i wonder if it would be more aesthetic to split each of them into its own line now it is spanning 3 lines with different widths. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1453868310 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -93,9 +94,11 @@ public CommitRequestManager( final SubscriptionState subscriptions, final ConsumerConfig config, final CoordinatorRequestManager coordinatorRequestManager, +final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, Review Comment: looking back at the implementation - now i think it is rather unnecessary to have these finals in the parameters. I wonder if you think we should clean them up in the future... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
philipnee commented on code in PR #15000: URL: https://github.com/apache/kafka/pull/15000#discussion_r1453865322 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1902,65 +1912,14 @@ private void maybeThrowFencedInstanceException() { } private void maybeInvokeCommitCallbacks() { -if (callbacks() > 0) { -invoker.executeCallbacks(); +if (offsetCommitCallbackInvoker.executeCallbacks()) { Review Comment: the naming appears a bit misleading for me - if executeCallback() return true, it almost means the callbacks were executed correctly. I wonder if we could restructure the code like ``` invoker.executeCallbacks() isFenced = invoker.hasFencedException() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
mumrah commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1453861304 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName + ): Option[List[DescribeTopicPartitionsResponsePartition]] = { +Option(image.topics().getTopic(topicName)) match { + case None => None + case Some(topic) => { +val partitions = Some(topic.partitions().entrySet().asScala.map { entry => + val partitionId = entry.getKey + val partition = entry.getValue + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, +listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, +false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { +case None => + val error = if (!image.cluster().brokers.containsKey(partition.leader)) { Review Comment: This won't cause an error for the whole request right? It will just populate the partition-level `ErrorCode` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]
OmniaGM commented on code in PR #15159: URL: https://github.com/apache/kafka/pull/15159#discussion_r1453859341 ## checkstyle/import-control-core.xml: ## @@ -82,6 +82,7 @@ + Review Comment: Remove it ## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ## @@ -32,6 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Sanitizer, Time} import org.apache.kafka.server.config.ConfigEntityName import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType} +import org.apache.kafka.server.config.ClientQuotaManagerConfig Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move ClientQuotaManagerConfig outside of core [kafka]
OmniaGM commented on code in PR #15159: URL: https://github.com/apache/kafka/pull/15159#discussion_r1453859778 ## server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +public class ClientQuotaManagerConfig { +// Always have 10 whole windows + 1 current window +public static final int DEFAULT_NUM_QUOTA_SAMPLES = 11; +public static final int DEFAULT_QUOTA_WINDOW_SIZE_SECONDS = 1; + +public final int numQuotaSamples; +public final int quotaWindowSizeSeconds; + +/** + * Configuration settings for quota management + * + * @param numQuotaSamples The number of samples to retain in memory + * @param quotaWindowSizeSeconds The time span of each sample + */ +public ClientQuotaManagerConfig(int numQuotaSamples, int quotaWindowSizeSeconds) { +this.numQuotaSamples = numQuotaSamples; +this.quotaWindowSizeSeconds = quotaWindowSizeSeconds; +} + +public ClientQuotaManagerConfig() { +this(DEFAULT_NUM_QUOTA_SAMPLES, DEFAULT_QUOTA_WINDOW_SIZE_SECONDS); +} + +public ClientQuotaManagerConfig(int numQuotaSamples) { +this(numQuotaSamples, DEFAULT_QUOTA_WINDOW_SIZE_SECONDS); +} +} Review Comment: Added a line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
gharris1727 commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1453854211 ## clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java: ## @@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider { private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class); +public static final String ALLOWED_PATHS_CONFIG = "allowed.paths"; +public static final String ALLOWED_PATHS_DOC = "A comma separated list of paths that this config provider is " + +"allowed to access. If not set, all paths are allowed."; +private AllowedPaths allowedPaths = null; Review Comment: @mimaison WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
gharris1727 commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1453851791 ## clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java: ## @@ -44,8 +44,15 @@ public class DirectoryConfigProvider implements ConfigProvider { private static final Logger log = LoggerFactory.getLogger(DirectoryConfigProvider.class); +public static final String ALLOWED_PATHS_CONFIG = "allowed.paths"; +public static final String ALLOWED_PATHS_DOC = "A comma separated list of paths that this config provider is " + +"allowed to access. If not set, all paths are allowed."; +private AllowedPaths allowedPaths = new AllowedPaths(null); Review Comment: This class is required to be thread-safe in the ConfigProvider javadoc. ```suggestion private volatile AllowedPaths allowedPaths = new AllowedPaths(null); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
gharris1727 commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1453850678 ## clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java: ## @@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider { private static final Logger log = LoggerFactory.getLogger(FileConfigProvider.class); +public static final String ALLOWED_PATHS_CONFIG = "allowed.paths"; +public static final String ALLOWED_PATHS_DOC = "A comma separated list of paths that this config provider is " + +"allowed to access. If not set, all paths are allowed."; +private AllowedPaths allowedPaths = null; Review Comment: > Could there be users who don't call configure first necessarily? I'm worried that throwing IllegalStateException could cause backward compatibility issue. Throwing IllegalStateException would be a backwards incompatible change, but I think it's not one that was supported in the first place. Not calling configure() on these particular implementations because they previously happened to be no-ops is coupling too closely on the internal implementation of these classes. It is a value judgement, and we have to determine which is more valuable. For the record, I think that exploiting the interface by preventing calls to configure() is unlikely, I was just thinking about defense in depth. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16147) Partition is assigned to two members at the same time
[ https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807403#comment-17807403 ] Emanuele Sabellico commented on KAFKA-16147: This is the events sequence: SUBSCRIPTION_1 is T1 and SUBSCRIPTION_2 is T1,T2 {code:java} /* * Define playbook */ const struct { int timestamp_ms; int consumer; const vector *topics; } playbook[] = {/* timestamp_ms, consumer_number, subscribe-to-topics */ {0, 0, &SUBSCRIPTION_1}, /* Cmd 0 */ {4000, 1, &SUBSCRIPTION_1}, {4000, 1, &SUBSCRIPTION_1}, {4000, 1, &SUBSCRIPTION_1}, {4000, 2, &SUBSCRIPTION_1}, {6000, 3, &SUBSCRIPTION_1}, /* Cmd 5 */ {6000, 4, &SUBSCRIPTION_1}, {6000, 5, &SUBSCRIPTION_1}, {6000, 6, &SUBSCRIPTION_1}, {6000, 7, &SUBSCRIPTION_2}, {6000, 1, &SUBSCRIPTION_1}, /* Cmd 10 */ {6000, 1, &SUBSCRIPTION_2}, {6000, 1, &SUBSCRIPTION_1}, {6000, 2, &SUBSCRIPTION_2}, {7000, 2, &SUBSCRIPTION_1}, {7000, 1, &SUBSCRIPTION_2}, /* Cmd 15 */ {8000, 0, &SUBSCRIPTION_2}, {8000, 1, &SUBSCRIPTION_1}, {8000, 0, &SUBSCRIPTION_1}, {13000, 2, &SUBSCRIPTION_1}, {13000, 1, &SUBSCRIPTION_2}, /* Cmd 20 */ {13000, 5, &SUBSCRIPTION_2}, {14000, 6, &SUBSCRIPTION_2}, {15000, 7, &SUBSCRIPTION_1}, {15000, 1, &SUBSCRIPTION_1}, {15000, 5, &SUBSCRIPTION_1}, /* Cmd 25 */ {15000, 6, &SUBSCRIPTION_1}, {INT_MAX, 0, 0}};{code} > Partition is assigned to two members at the same time > - > > Key: KAFKA-16147 > URL: https://issues.apache.org/jira/browse/KAFKA-16147 > Project: Kafka > Issue Type: Sub-task >Reporter: Emanuele Sabellico >Priority: Major > Attachments: broker1.log, broker2.log, broker3.log, librdkafka.log, > server.properties, server1.properties, server2.properties > > > While running [test 0113 of > librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384], > subtest _u_multiple_subscription_changes_ have received this error saying > that a partition is assigned to two members at the same time. > {code:java} > Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] > which is already assigned to consumer C_5#consumer-8 {code} > I've reconstructed this sequence: > C_5 SUBSCRIBES TO T1 > {noformat} > %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id > "(null)", current assignment "", subscribe topics > "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat} > C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12 > {noformat} > [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, > targetMemberEpoch=6, state=assigning, assignedPartitions={}, > partitionsPendingRevocation={}, > partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}). > (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat} > > C_5 RECEIVES TARGET ASSIGNMENT > {noformat} > %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 ACKS TARGET ASSIGNMENT > {noformat} > %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment > "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", > subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+
Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]
philipnee commented on code in PR #15186: URL: https://github.com/apache/kafka/pull/15186#discussion_r1453840738 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -376,25 +376,21 @@ protected Map prepareCloseFetchSessi final Cluster cluster = metadata.fetch(); Map fetchable = new HashMap<>(); -try { -sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { -// set the session handler to notify close. This will set the next metadata request to send close message. -sessionHandler.notifyClose(); +sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { Review Comment: Thanks Kirk, for the explanation - It seems like there are cases where we want to clear the cache - one I can think of is when there's a topology change but this is probably an unnoticeable optimization - i guess the size of the handler lookup never grows so large that becomes a problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time
[ https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emanuele Sabellico updated KAFKA-16147: --- Description: While running [test 0113 of librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384], subtest _u_multiple_subscription_changes_ have received this error saying that a partition is assigned to two members at the same time. {code:java} Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] which is already assigned to consumer C_5#consumer-8 {code} I've reconstructed this sequence: C_5 SUBSCRIBES TO T1 {noformat} %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id "(null)", current assignment "", subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat} C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12 {noformat} [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, targetMemberEpoch=6, state=assigning, assignedPartitions={}, partitionsPendingRevocation={}, partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=14, state=stable, assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}). (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat} C_5 RECEIVES TARGET ASSIGNMENT {noformat} %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat response received target assignment "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} C_5 ACKS TARGET ASSIGNMENT {noformat} %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id "NULL", current assignment "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat response received target assignment "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are pending {noformat} %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id "NULL", current assignment "NULL", subscribe topics "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, rdkafkatest_rnd5a91902462d61c2e_0113u_1]. (org.apache.kafka.coordinator.group.GroupMetadataManager) [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=14, state=stable, assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}) to CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=16, state=revoking, assignedPartitions={}, partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 8, 9]}). (org.apache.kafka.coordinator.group.GroupMetadataManager) %7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat response received target assignment ""{noformat} C_5 FINISHES REVOCATION {noformat} %7|1705403452.618|CGRPJOINSTATE|C_5#consumer-8| [thrd:main]: Group "rdkafkatest_rnd53b4eb0c2de343_0113u" changed join state wait-assign-call -> steady (state up){noformat} C_5 ACKS REVOCATION, RECEIVES T2-P0,T2-P1,T2-P2 {noformat} %7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat of member id "RaTCu6RXQH-FiSl95iZ
[jira] [Commented] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException
[ https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807397#comment-17807397 ] Kamal Chandraprakash commented on KAFKA-16105: -- [~anatolypopov] Could you write an integration test to simulate the error scenario? You can refer to some of the existing [tests|https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseReassignReplicaTest.java]. Thanks! > Reassignment of tiered topics is failing due to RemoteStorageException > -- > > Key: KAFKA-16105 > URL: https://issues.apache.org/jira/browse/KAFKA-16105 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Reporter: Anatolii Popov >Priority: Critical > > When partition reassignment is happening for a tiered topic in most of the > cases it's stuck with RemoteStorageException's on follower nodes saying that > it can not construct remote log auxilary state: > > {code:java} > [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, > fetcherId=2] Error building remote log auxiliary state for test-24 > (kafka.server.ReplicaFetcherThread) > > org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't > build the state from remote store for partition: test-24, currentLeaderEpoch: > 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the > previous remote log segment metadata was not found > at > kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259) > at > kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106) > at > kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413) > at > scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) > at > scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130) > {code} > > Scenario: > A cluster of 3 nodes with a single topic with 30 partitions. All partitions > have tiered segments. > Adding 3 more nodes to the cluster and making a reassignment to move all the > data to new nodes. > Behavior: > For most of the partitions reassignment is happening smoothly. > For some of the partitions when a new node starts to get assignments it reads > __remote_log_metadata topic and tries to initialize the metadata cache on > records with COPY
[jira] [Comment Edited] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type
[ https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807394#comment-17807394 ] Ritika Reddy edited comment on KAFKA-16095 at 1/16/24 6:29 PM: --- Yep that would be merged this week, but we could still work on the state filter changes in parallel was (Author: JIRAUSER300287): Yep that would be merged this week > Update list group state type filter to include the states for the new > consumer group type > - > > Key: KAFKA-16095 > URL: https://issues.apache.org/jira/browse/KAFKA-16095 > Project: Kafka > Issue Type: Sub-task >Reporter: Ritika Reddy >Assignee: Lan Ding >Priority: Minor > > # While using *—list —state* the current accepted values correspond to the > classic group type states. We need to include support for the new group type > states. > ## Consumer Group: Should list the state of the group. Accepted Values: > ### _UNKNOWN(“unknown”)_ > ### {_}EMPTY{_}("empty"), > ### *{_}ASSIGNING{_}("assigning"),* > ### *{_}RECONCILING{_}("reconciling"),* > ### {_}STABLE{_}("stable"), > ### {_}DEAD{_}("dead"); > # > ## Classic Group : Should list the state of the group. Accepted Values: > ### {_}UNKNOWN{_}("Unknown"), > ### {_}EMPTY{_}("Empty"); > ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),* > ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),* > ### {_}STABLE{_}("Stable"), > ### {_}DEAD{_}("Dead") -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time
[ https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emanuele Sabellico updated KAFKA-16147: --- Attachment: librdkafka.log > Partition is assigned to two members at the same time > - > > Key: KAFKA-16147 > URL: https://issues.apache.org/jira/browse/KAFKA-16147 > Project: Kafka > Issue Type: Sub-task >Reporter: Emanuele Sabellico >Priority: Major > Attachments: broker1.log, broker2.log, broker3.log, librdkafka.log, > server.properties, server1.properties, server2.properties > > > While running [test 0113 of > librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384], > subtest _u_multiple_subscription_changes_ have received this error saying > that a partition is assigned to two members at the same time. > {code:java} > Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] > which is already assigned to consumer C_5#consumer-8 {code} > I've reconstructed this sequence: > C_5 SUBSCRIBES TO T1 > {noformat} > %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id > "(null)", current assignment "", subscribe topics > "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat} > C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12 > {noformat} > [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, > targetMemberEpoch=6, state=assigning, assignedPartitions={}, > partitionsPendingRevocation={}, > partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}). > (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat} > > C_5 RECEIVES TARGET ASSIGNMENT > {noformat} > %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 ACKS TARGET ASSIGNMENT > {noformat} > %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment > "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", > subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are > pending > {noformat} > %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment "NULL", subscribe topics > "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], > rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated > its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, > rdkafkatest_rnd5a91902462d61c2e_0113u_1]. > (org.apache.kafka.coordinator.group.GroupMetadataManager) > [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=16, state=revoking, assignedPartitions={}, > partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 1
[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time
[ https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emanuele Sabellico updated KAFKA-16147: --- Attachment: (was: double-assignment.log) > Partition is assigned to two members at the same time > - > > Key: KAFKA-16147 > URL: https://issues.apache.org/jira/browse/KAFKA-16147 > Project: Kafka > Issue Type: Sub-task >Reporter: Emanuele Sabellico >Priority: Major > Attachments: broker1.log, broker2.log, broker3.log, > server.properties, server1.properties, server2.properties > > > While running [test 0113 of > librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384], > subtest _u_multiple_subscription_changes_ have received this error saying > that a partition is assigned to two members at the same time. > {code:java} > Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] > which is already assigned to consumer C_5#consumer-8 {code} > I've reconstructed this sequence: > C_5 SUBSCRIBES TO T1 > {noformat} > %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id > "(null)", current assignment "", subscribe topics > "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat} > C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12 > {noformat} > [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, > targetMemberEpoch=6, state=assigning, assignedPartitions={}, > partitionsPendingRevocation={}, > partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}). > (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat} > > C_5 RECEIVES TARGET ASSIGNMENT > {noformat} > %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 ACKS TARGET ASSIGNMENT > {noformat} > %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment > "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", > subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are > pending > {noformat} > %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment "NULL", subscribe topics > "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], > rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated > its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, > rdkafkatest_rnd5a91902462d61c2e_0113u_1]. > (org.apache.kafka.coordinator.group.GroupMetadataManager) > [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=16, state=revoking, assignedPartitions={}, > partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8,
[jira] [Commented] (KAFKA-16095) Update list group state type filter to include the states for the new consumer group type
[ https://issues.apache.org/jira/browse/KAFKA-16095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807394#comment-17807394 ] Ritika Reddy commented on KAFKA-16095: -- Yep that would be merged this week > Update list group state type filter to include the states for the new > consumer group type > - > > Key: KAFKA-16095 > URL: https://issues.apache.org/jira/browse/KAFKA-16095 > Project: Kafka > Issue Type: Sub-task >Reporter: Ritika Reddy >Assignee: Lan Ding >Priority: Minor > > # While using *—list —state* the current accepted values correspond to the > classic group type states. We need to include support for the new group type > states. > ## Consumer Group: Should list the state of the group. Accepted Values: > ### _UNKNOWN(“unknown”)_ > ### {_}EMPTY{_}("empty"), > ### *{_}ASSIGNING{_}("assigning"),* > ### *{_}RECONCILING{_}("reconciling"),* > ### {_}STABLE{_}("stable"), > ### {_}DEAD{_}("dead"); > # > ## Classic Group : Should list the state of the group. Accepted Values: > ### {_}UNKNOWN{_}("Unknown"), > ### {_}EMPTY{_}("Empty"); > ### *{_}PREPARING_REBALANCE{_}("PreparingRebalance"),* > ### *{_}COMPLETING_REBALANCE{_}("CompletingRebalance"),* > ### {_}STABLE{_}("Stable"), > ### {_}DEAD{_}("Dead") -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time
[ https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emanuele Sabellico updated KAFKA-16147: --- Attachment: server.properties server1.properties server2.properties > Partition is assigned to two members at the same time > - > > Key: KAFKA-16147 > URL: https://issues.apache.org/jira/browse/KAFKA-16147 > Project: Kafka > Issue Type: Sub-task >Reporter: Emanuele Sabellico >Priority: Major > Attachments: broker1.log, broker2.log, broker3.log, > double-assignment.log, server.properties, server1.properties, > server2.properties > > > While running [test 0113 of > librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384], > subtest _u_multiple_subscription_changes_ have received this error saying > that a partition is assigned to two members at the same time. > {code:java} > Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] > which is already assigned to consumer C_5#consumer-8 {code} > I've reconstructed this sequence: > C_5 SUBSCRIBES TO T1 > {noformat} > %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id > "(null)", current assignment "", subscribe topics > "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat} > C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12 > {noformat} > [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, > targetMemberEpoch=6, state=assigning, assignedPartitions={}, > partitionsPendingRevocation={}, > partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}). > (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat} > > C_5 RECEIVES TARGET ASSIGNMENT > {noformat} > %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 ACKS TARGET ASSIGNMENT > {noformat} > %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment > "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], > rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", > subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat response received target assignment > "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], > (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} > > C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are > pending > {noformat} > %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: > Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id > "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id > "NULL", current assignment "NULL", subscribe topics > "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], > rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" > [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated > its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, > rdkafkatest_rnd5a91902462d61c2e_0113u_1]. > (org.apache.kafka.coordinator.group.GroupMetadataManager) > [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 > topic=__consumer_offsets partition=7] [GroupId > rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw > transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=14, state=stable, > assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, > partitionsPendingRevocation={}, partitionsPendingAssignment={}) to > CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, > targetMemberEpoch=16, state=revoking, ass
[jira] [Updated] (KAFKA-16147) Partition is assigned to two members at the same time
[ https://issues.apache.org/jira/browse/KAFKA-16147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Emanuele Sabellico updated KAFKA-16147: --- Description: While running test[ 0113 of librdkafka|https://github.com/confluentinc/librdkafka/blob/8b6357f872efe2a5a3a2fd2828e4133f85e6b023/tests/0113-cooperative_rebalance.cpp#L2384], subtest _u_multiple_subscription_changes_ have received this error saying that a partition is assigned to two members at the same time. {code:java} Error: C_6#consumer-9 is assigned rdkafkatest_rnd550f20623daba04c_0113u_2 [0] which is already assigned to consumer C_5#consumer-8 {code} I've reconstructed this sequence: C_5 SUBSCRIBES TO T1 {noformat} %7|1705403451.561|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 6, group instance id "(null)", current assignment "", subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]"{noformat} C_5 ASSIGNMENT CHANGES TO T1-P7, T1-P8, T1-P12 {noformat} [2024-01-16 12:10:51,562] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=6, previousMemberEpoch=0, targetMemberEpoch=6, state=assigning, assignedPartitions={}, partitionsPendingRevocation={}, partitionsPendingAssignment={IKXGrFR1Rv-Qes7Ummas6A=[3, 12]}) to CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=14, state=stable, assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}). (org.apache.kafka.coordinator.group.GroupMetadataManager){noformat} C_5 RECEIVES TARGET ASSIGNMENT {noformat} %7|1705403451.565|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat response received target assignment "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} C_5 ACKS TARGET ASSIGNMENT {noformat} %7|1705403451.566|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id "NULL", current assignment "rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[7], rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[8], rdkafkatest_rnd5a91902462d61c2e_0113u_1(IKXGrFR1Rv+Qes7Ummas6A)[12]", subscribe topics "rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" %7|1705403451.567|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat response received target assignment "(null)(IKXGrFR1Rv+Qes7Ummas6A)[7], (null)(IKXGrFR1Rv+Qes7Ummas6A)[8], (null)(IKXGrFR1Rv+Qes7Ummas6A)[12]"{noformat} C_5 SUBSCRIBES TO T1,T2: T1 partitions are revoked, 5 T2 partitions are pending {noformat} %7|1705403452.612|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat of member id "RaTCu6RXQH-FiSl95iZzdw", group id "rdkafkatest_rnd53b4eb0c2de343_0113u", generation id 14, group instance id "NULL", current assignment "NULL", subscribe topics "rdkafkatest_rnd550f20623daba04c_0113u_2((null))[-1], rdkafkatest_rnd5a91902462d61c2e_0113u_1((null))[-1]" [2024-01-16 12:10:52,615] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw updated its subscribed topics to: [rdkafkatest_rnd550f20623daba04c_0113u_2, rdkafkatest_rnd5a91902462d61c2e_0113u_1]. (org.apache.kafka.coordinator.group.GroupMetadataManager) [2024-01-16 12:10:52,616] INFO [GroupCoordinator id=1 topic=__consumer_offsets partition=7] [GroupId rdkafkatest_rnd53b4eb0c2de343_0113u] Member RaTCu6RXQH-FiSl95iZzdw transitioned from CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=14, state=stable, assignedPartitions={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, partitionsPendingRevocation={}, partitionsPendingAssignment={}) to CurrentAssignment(memberEpoch=14, previousMemberEpoch=6, targetMemberEpoch=16, state=revoking, assignedPartitions={}, partitionsPendingRevocation={IKXGrFR1Rv-Qes7Ummas6A=[7, 8, 12]}, partitionsPendingAssignment={EnZMikZURKiUoxZf0rozaA=[0, 1, 2, 8, 9]}). (org.apache.kafka.coordinator.group.GroupMetadataManager) %7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat response received target assignment ""{noformat} C_5 FINISHES REVOCATION {noformat} %7|1705403452.618|CGRPJOINSTATE|C_5#consumer-8| [thrd:main]: Group "rdkafkatest_rnd53b4eb0c2de343_0113u" changed join state wait-assign-call -> steady (state up){noformat} C_5 ACKS REVOCATION, RECEIVES T2-P0,T2-P1,T2-P2 {noformat} %7|1705403452.618|HEARTBEAT|C_5#consumer-8| [thrd:main]: GroupCoordinator/1: Heartbeat of member id "RaTCu6RXQH-FiSl95iZ