[jira] [Commented] (KAFKA-10140) Incremental config api excludes plugin config changes
[ https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810243#comment-17810243 ] Deng Ziming commented on KAFKA-10140: - This problem only affect append/subtract operations and will not affect add/delete operations, the following command is OK: kafka-configs.sh --bootstrap-server localhost:9092 --alter --broker-defaults --add-config metrics.jmx.blicklist=kafka.controller > Incremental config api excludes plugin config changes > - > > Key: KAFKA-10140 > URL: https://issues.apache.org/jira/browse/KAFKA-10140 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Critical > > I was trying to alter the jmx metric filters using the incremental alter > config api and hit this error: > {code:java} > java.util.NoSuchElementException: key not found: metrics.jmx.blacklist > at scala.collection.MapLike.default(MapLike.scala:235) > at scala.collection.MapLike.default$(MapLike.scala:234) > at scala.collection.AbstractMap.default(Map.scala:65) > at scala.collection.MapLike.apply(MapLike.scala:144) > at scala.collection.MapLike.apply$(MapLike.scala:143) > at scala.collection.AbstractMap.apply(Map.scala:65) > at kafka.server.AdminManager.listType$1(AdminManager.scala:681) > at > kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693) > at kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687) > at > kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618) > at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:154) > at scala.collection.TraversableLike.map(TraversableLike.scala:273) > at scala.collection.TraversableLike.map$(TraversableLike.scala:266) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589) > at > kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698) > at kafka.server.KafkaApis.handle(KafkaApis.scala:188) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78) > at java.base/java.lang.Thread.run(Thread.java:834) {code} > It looks like we are only allowing changes to the keys defined in > `KafkaConfig` through this API. This excludes config changes to any plugin > components such as `JmxReporter`. > Note that I was able to use the regular `alterConfig` API to change this > config. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Delete unused classes [kafka]
ijuma merged PR #14797: URL: https://github.com/apache/kafka/pull/14797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]
appchemist commented on PR #14815: URL: https://github.com/apache/kafka/pull/14815#issuecomment-1907375242 this is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]
appchemist closed pull request #14815: KAFKA-15717: KRaft support in LeaderEpochIntegrationTest URL: https://github.com/apache/kafka/pull/14815 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Remove unreachable if-else block in ReplicaManager.scala [kafka]
drawxy commented on PR #15220: URL: https://github.com/apache/kafka/pull/15220#issuecomment-1907259848 > @drawxy can you please fix the failing compilation. Otherwise looks good. Already fixed by rebasing the trunk branch. @divijvaidya, can you please help to merge this PR, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]
rreddy-22 commented on code in PR #15221: URL: https://github.com/apache/kafka/pull/15221#discussion_r1464202116 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -604,22 +602,23 @@ public CoordinatorResult deleteOffsets( ) ); } else { -final TimelineHashMap offsetsByPartition = offsetsByTopic == null ? -null : offsetsByTopic.get(topic.name()); -if (offsetsByPartition != null) { -topic.partitions().forEach(partition -> { -if (offsetsByPartition.containsKey(partition.partitionIndex())) { -responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() -.setPartitionIndex(partition.partitionIndex()) -); - records.add(RecordHelpers.newOffsetCommitTombstoneRecord( -request.groupId(), -topic.name(), -partition.partitionIndex() -)); -} -}); -} +topic.partitions().forEach(partition -> { +// We always add the partition to the response. +responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition() +.setPartitionIndex(partition.partitionIndex()) +); + +// A tombstone is written if an offset in present is the main storage or Review Comment: nit: offset is* present in* the main storage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]
jolshan commented on PR #15221: URL: https://github.com/apache/kafka/pull/15221#issuecomment-1907178903 Took a first pass. I will look again tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]
jolshan commented on code in PR #15221: URL: https://github.com/apache/kafka/pull/15221#discussion_r1464178480 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -501,13 +521,11 @@ public void testOffsetDeleteWith( final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection expectedResponsePartitionCollection = new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); -if (hasOffset(groupId, topic, partition)) { Review Comment: Which one is the test where we verified we return a response even if the partition had no offset? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]
jolshan commented on code in PR #15221: URL: https://github.com/apache/kafka/pull/15221#discussion_r1464175850 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -2342,26 +2375,24 @@ public void testConsumerGroupOffsetDeleteWithErrors() { context.testOffsetDeleteWith("foo", "bar", 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); } +@Test +public void testConsumerGroupOffsetDeleteWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); +ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup( +"foo", +true +); +context.commitOffset(10L, "foo", "bar", 0, 100L, 0, context.time.milliseconds()); +assertFalse(group.isSubscribedToTopic("bar")); +context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE); Review Comment: do we want to test if the offset is deleted here like for the generic group? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
OmniaGM commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1464137161 ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; +import org.apache.kafka.coordinator.group.OffsetConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig; +import org.apache.kafka.raft.RaftConfig; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class Defaults { Review Comment: I think the pattern of default and properties in the class object companion has been like this in some scala as well. KafkaConfig has been kinda of anti pattern for a while as the defaults are defined in another object companian. I will raise another pr soon to move this pr to Java pattern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [WIP - DON'T MERGE] KAFKA-15974 Enforce that CompletableApplicationEvent has a timeout that is respected [kafka]
kirktrue opened a new pull request, #15250: URL: https://github.com/apache/kafka/pull/15250 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1464109057 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -452,21 +453,39 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +Set statesFilter, +Set typesFilter, +long committedOffset +) { +// Convert typesFilter to lowercase to make the filter case-insensitive. +Set lowerCaseTypesFilter = typesFilter.stream() +.map(String::toLowerCase) +.collect(Collectors.toCollection(HashSet::new)); + +Predicate combinedFilter = group -> { +boolean stateCheck = statesFilter.isEmpty() || statesFilter.contains(group.stateAsString(committedOffset)); +boolean typeCheck = lowerCaseTypesFilter.isEmpty() || + lowerCaseTypesFilter.contains(group.type().toString().toLowerCase()); Review Comment: I think I understand what you're saying. I'll add a way to parse any string and convert it to the necessary case to compare and determine which type it is in the group type enum. However, since some of these changes are on the client side can we do it in the second PR? That way I can test it as well. I could remove the lower case conversion here and then directly handle it there or we could change it there directly as an optimization just to be safe. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15813) Improve implementation of client instance cache
[ https://issues.apache.org/jira/browse/KAFKA-15813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-15813. - Fix Version/s: 3.8.0 Resolution: Fixed Merged to the PR to trunk. > Improve implementation of client instance cache > --- > > Key: KAFKA-15813 > URL: https://issues.apache.org/jira/browse/KAFKA-15813 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.8.0 > > > In the current implementation the ClientMetricsManager uses LRU cache but we > should alos support expiring stale clients i.e. client which haven't reported > metrics for a while. > > The KIP mentions: This client instance specific state is maintained in broker > memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds and is used to > enforce the push interval rate-limiting. There is no persistence of client > instance metrics state across broker restarts or between brokers -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
junrao merged PR #15234: URL: https://github.com/apache/kafka/pull/15234 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan merged PR #15139: URL: https://github.com/apache/kafka/pull/15139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15343) Fix MirrorConnectIntegrationTests causing ci build failures.
[ https://issues.apache.org/jira/browse/KAFKA-15343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-15343: --- Assignee: Greg Harris > Fix MirrorConnectIntegrationTests causing ci build failures. > > > Key: KAFKA-15343 > URL: https://issues.apache.org/jira/browse/KAFKA-15343 > Project: Kafka > Issue Type: Bug > Components: build >Affects Versions: 3.6.0 >Reporter: Prasanth Kumar >Assignee: Greg Harris >Priority: Major > > There are several instances of tests interacting badly with gradle daemon(s) > running on ports that the kafka broker previously used. After going through > the debug logs we observed a few retrying kafka clients trying to connect to > broker which got shutdown and the gradle worker chose the same port on which > broker was running. Later in the build, the gradle daemon attempted to > connect to the worker and could not, triggering a failure. Ideally gradle > would not exit when connected to from an invalid client - in testing with > netcat, it would often handle these without dying. However there appear to be > some cases where the daemon dies completely. Both the broker code and the > gradle workers bind to port 0, resulting in the OS assigning it an unused > port. This does avoid conflicts, but does not ensure that long lived clients > do not attempt to connect to these ports afterwards. It's possible that > closing the client in between may be enough to work around this issue. Till > then we will disable the test to avoid the ci blocker from testing the code > changes. > *MirrorConnectorsIntegrationBaseTest and extending Tests* > {code:java} > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] > MirrorConnectorsWithCustomForwardingAdminIntegrationTest > > testReplicateSourceDefault() STANDARD_OUT > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO primary REST service: http://localhost:43809/connectors > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:224) > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO backup REST service: http://localhost:43323/connectors > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:225) > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO primary brokers: localhost:37557 > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:226) > [2023-07-04T11:59:12.968Z] 2023-07-04T11:59:12.900+ [DEBUG] > [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] > Accepted connection from /127.0.0.1:47660 to /127.0.0.1:37557. > [2023-07-04T11:59:13.233Z] > org.gradle.internal.remote.internal.MessageIOException: Could not read > message from '/127.0.0.1:47660'. > [2023-07-04T11:59:12.970Z] 2023-07-04T11:59:12.579+ [DEBUG] > [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] Listening on > [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, > addresses:[localhost/127.0.0.1]]. > [2023-07-04T11:59:46.519Z] 2023-07-04T11:59:13.014+ [ERROR] > [system.err] org.gradle.internal.remote.internal.ConnectException: Could not > connect to server [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, > addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
apoorvmittal10 commented on PR #15234: URL: https://github.com/apache/kafka/pull/15234#issuecomment-1907047744 > @apoorvmittal10 : Before I merge this, could you update the description of the PR. You listed 3 options to implement the eviction. The one you picked is based on timing wheels, right? Thanks for pointing out, added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15343) Fix MirrorConnectIntegrationTests causing ci build failures.
[ https://issues.apache.org/jira/browse/KAFKA-15343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810124#comment-17810124 ] Greg Harris commented on KAFKA-15343: - I opened an issue in Gradle for this bug, since it's not unique to Kafka and can appear in any project which uses sockets: [https://github.com/gradle/gradle/issues/27801] I have also been working to reduce the number of socket/client leaks through https://issues.apache.org/jira/browse/KAFKA-15845 and have made a lot of progress in stopping the leaks we have currently, and should be able to prevent new ones in the future. > Fix MirrorConnectIntegrationTests causing ci build failures. > > > Key: KAFKA-15343 > URL: https://issues.apache.org/jira/browse/KAFKA-15343 > Project: Kafka > Issue Type: Bug > Components: build >Affects Versions: 3.6.0 >Reporter: Prasanth Kumar >Priority: Major > > There are several instances of tests interacting badly with gradle daemon(s) > running on ports that the kafka broker previously used. After going through > the debug logs we observed a few retrying kafka clients trying to connect to > broker which got shutdown and the gradle worker chose the same port on which > broker was running. Later in the build, the gradle daemon attempted to > connect to the worker and could not, triggering a failure. Ideally gradle > would not exit when connected to from an invalid client - in testing with > netcat, it would often handle these without dying. However there appear to be > some cases where the daemon dies completely. Both the broker code and the > gradle workers bind to port 0, resulting in the OS assigning it an unused > port. This does avoid conflicts, but does not ensure that long lived clients > do not attempt to connect to these ports afterwards. It's possible that > closing the client in between may be enough to work around this issue. Till > then we will disable the test to avoid the ci blocker from testing the code > changes. > *MirrorConnectorsIntegrationBaseTest and extending Tests* > {code:java} > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] > MirrorConnectorsWithCustomForwardingAdminIntegrationTest > > testReplicateSourceDefault() STANDARD_OUT > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO primary REST service: http://localhost:43809/connectors > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:224) > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO backup REST service: http://localhost:43323/connectors > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:225) > [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] > [TestEventLogger] [2023-07-04 11:47:46,799] > INFO primary brokers: localhost:37557 > (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:226) > [2023-07-04T11:59:12.968Z] 2023-07-04T11:59:12.900+ [DEBUG] > [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] > Accepted connection from /127.0.0.1:47660 to /127.0.0.1:37557. > [2023-07-04T11:59:13.233Z] > org.gradle.internal.remote.internal.MessageIOException: Could not read > message from '/127.0.0.1:47660'. > [2023-07-04T11:59:12.970Z] 2023-07-04T11:59:12.579+ [DEBUG] > [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] Listening on > [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, > addresses:[localhost/127.0.0.1]]. > [2023-07-04T11:59:46.519Z] 2023-07-04T11:59:13.014+ [ERROR] > [system.err] org.gradle.internal.remote.internal.ConnectException: Could not > connect to server [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, > addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16042: Add byte-rate metrics for topic and partition [kafka]
afshing commented on PR #15085: URL: https://github.com/apache/kafka/pull/15085#issuecomment-1907026805 > FYI: We are making a similar effort here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-977%3A+Partition-Level+Throughput+Metrics @ex172000 thanks for your comment. Your work is definitely more dedicated customization over exposing topic-partition metrics, which is great. In our fork, we exposed the byte-rate and throttle-time metrics per topic-partition to use that for quota management based on topic-partition (see [KIP-1010](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1010%3A+Topic+Partition+Quota), without giving any option to disable/enable it or customize its level verbosity. With this proposal, it makes more sense for us to port our quota management change over this format. I have two questions: 1. What is the status of this KIP. Is it something that you have used in your fork and trying to contribute to the upstream, or it is a new implementation (try to get a sense of how much work is remained) 2. We don't have any discussion initiated on our KIP-1010. But, I see your KIP-977 is approved, but I can't find any discussion in [the thread](https://lists.apache.org/list?d...@kafka.apache.org:lte=1M:KIP-977). I am wondering what is the KIP life-cycle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
junrao commented on PR #15234: URL: https://github.com/apache/kafka/pull/15234#issuecomment-1907023180 @apoorvmittal10 : Before I merge this, could you update the description of the PR. You listed 3 options to implement the eviction. The one you picked is based on timing wheels, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]
lianetm commented on PR #15215: URL: https://github.com/apache/kafka/pull/15215#issuecomment-1906966026 Thanks @dajac, all comments addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]
lianetm commented on code in PR #15215: URL: https://github.com/apache/kafka/pull/15215#discussion_r1464026216 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1134,9 +1134,38 @@ private CompletableFuture assignPartitions( // Make assignment effective on the client by updating the subscription state. updateSubscription(assignedPartitions, false); +// Mark assigned partitions as pendingOnAssignedCallback to temporarily stop fetching or +// initializing positions for them. Passing the full set of assigned partitions +// (previously owned and newly added), given that they are all provided to the user in the +// callback, so we could expect offsets updates for any of them. +Set assignedTopicPartition = assignedPartitions.stream().map(tIdp -> tIdp.topicPartition()).collect(Collectors.toSet()); +subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, true); Review Comment: Totally, that was definitely the case. I made the changes to make sure that all `assignedPartitions` are updated in the subscription state and `addedPartitions` are marked as awaiting callback in a thread safe single operation in the subscription state. This btw in line with your comment about not touching the previously owned partitions, and just blocking the added ones while the callback completes. Totally agree, fixed. Both, the legacy and this new logic always included only the added in the `onPartitionsAssigned`, so it was me getting mixed up before, sorry about the confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]
jsancio commented on code in PR #15231: URL: https://github.com/apache/kafka/pull/15231#discussion_r1463743747 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -562,33 +568,37 @@ private VoteResponseData handleVoteRequest( VoteRequestData.PartitionData partitionRequest = request.topics().get(0).partitions().get(0); -int candidateId = partitionRequest.candidateId(); -int candidateEpoch = partitionRequest.candidateEpoch(); +if (partitionRequest.preVote()) { +throw new IllegalArgumentException("PreVote is not supported yet"); Review Comment: I prefer if we implement this in this PR. That should alleviate @hachikuji 's comment about adding a version for the Vote RPC without the replica fully supporting that version. We don't need to implement the prospective state and sending Vote request with pre-vote set to true in this PR. We just need to implement the handling of version 1 of Vote RPC in this PR. That includes adding a KafkaRaftClientPreVoteTest suite that verifies the correct implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15683) Delete subscription from metadata when all configs are deleted
[ https://issues.apache.org/jira/browse/KAFKA-15683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal resolved KAFKA-15683. --- Resolution: Not A Problem Closing ticket as this is not required, it works as per the default behaviour of kafka-configs.sh. Moreover ClientMetricsManager deletes the in-memory subscription/client resources once all properties for respective resource are removed. This is not needed. > Delete subscription from metadata when all configs are deleted > -- > > Key: KAFKA-15683 > URL: https://issues.apache.org/jira/browse/KAFKA-15683 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > > As of now the kafka-configs.sh do not differentiate on non-existent and blank > metrics subscription. Add support to differentiate in 2 scenarios and also > delete the subscription if all configs are delete for respective > subscription. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16187) Flaky test: testTopicPatternArg - org.apache.kafka.tools.GetOffsetShellTest
Apoorv Mittal created KAFKA-16187: - Summary: Flaky test: testTopicPatternArg - org.apache.kafka.tools.GetOffsetShellTest Key: KAFKA-16187 URL: https://issues.apache.org/jira/browse/KAFKA-16187 Project: Kafka Issue Type: Bug Reporter: Apoorv Mittal [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15234/8/tests/] {code:java} Errororg.opentest4j.AssertionFailedError: expected: <[org.apache.kafka.tools.GetOffsetShellTest$Row@c6f09cc2, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a084, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a0a3, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a446, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a465, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a484, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a808, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a827, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a846, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a865]> but was: <[org.apache.kafka.tools.GetOffsetShellTest$Row@c6f09cc2, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a084, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a446, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a808]>Stacktraceorg.opentest4j.AssertionFailedError: expected: <[org.apache.kafka.tools.GetOffsetShellTest$Row@c6f09cc2, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a084, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a0a3, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a446, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a465, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a484, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a808, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a827, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a846, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a865]> but was: <[org.apache.kafka.tools.GetOffsetShellTest$Row@c6f09cc2, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a084, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a446, org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a808]> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) at app//org.apache.kafka.tools.GetOffsetShellTest.testTopicPatternArg(GetOffsetShellTest.java:154) at java.base@21.0.1/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base@21.0.1/java.lang.reflect.Method.invoke(Method.java:580)at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) at
Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
apoorvmittal10 commented on PR #15234: URL: https://github.com/apache/kafka/pull/15234#issuecomment-1906867428 I have added one missing jira for flaky tests, others pre-existed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
apoorvmittal10 commented on PR #15234: URL: https://github.com/apache/kafka/pull/15234#issuecomment-1906858917 > @apoorvmittal10 : Are the 30 test failures related? Thanks. @junrao Thanks for the review, none of them is related. I find most of them already reported, ll check once again for jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]
kirktrue commented on PR #15186: URL: https://github.com/apache/kafka/pull/15186#issuecomment-1906805914 @dajac—the PR description was updated. Let me know if there's anything left before we're ready to merge. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
junrao commented on PR #15234: URL: https://github.com/apache/kafka/pull/15234#issuecomment-1906780891 @apoorvmittal10 : Are the 30 test failures related? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]
hachikuji commented on code in PR #15231: URL: https://github.com/apache/kafka/pull/15231#discussion_r1463806130 ## clients/src/main/resources/common/message/VoteRequest.json: ## @@ -18,7 +18,8 @@ "type": "request", "listeners": ["controller"], "name": "VoteRequest", - "validVersions": "0", + // Version 1 adds the PreVote field and renames CandidateEpoch and CandidateId to ReplicaEpoch and ReplicaId Review Comment: Agree it would be better to implement all of it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: populate TopicName in ConsumerGroupDescribe [kafka]
dongnuo123 commented on code in PR #15205: URL: https://github.com/apache/kafka/pull/15205#discussion_r1463772765 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -17,10 +17,11 @@ package org.apache.kafka.coordinator.group.consumer; Review Comment: Yeah, we have that in `testAsConsumerGroupDescribeWithTopicNameNotFound`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]
jsancio commented on code in PR #15231: URL: https://github.com/apache/kafka/pull/15231#discussion_r1463743747 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -562,33 +568,37 @@ private VoteResponseData handleVoteRequest( VoteRequestData.PartitionData partitionRequest = request.topics().get(0).partitions().get(0); -int candidateId = partitionRequest.candidateId(); -int candidateEpoch = partitionRequest.candidateEpoch(); +if (partitionRequest.preVote()) { +throw new IllegalArgumentException("PreVote is not supported yet"); Review Comment: I prefer if we implement this in this PR. That should alleviate @hachikuji 's comment about adding a version for the Vote RPC without the replica fully supporting that version. We don't need to implement the prospective state and sending Vote request with pre-vote set to true in this PR. We just need to implement the handling of version 1 of Vote RPC in this PR. That include adding a KafkaRaftClientPreVoteTest suite that verifies the correct implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]
jsancio commented on code in PR #15231: URL: https://github.com/apache/kafka/pull/15231#discussion_r1463746152 ## clients/src/main/resources/common/message/VoteRequest.json: ## @@ -18,7 +18,8 @@ "type": "request", "listeners": ["controller"], "name": "VoteRequest", - "validVersions": "0", + // Version 1 adds the PreVote field and renames CandidateEpoch and CandidateId to ReplicaEpoch and ReplicaId Review Comment: @hachikuji I think we can avoid this if we implement the handling of version 1 requests in this PR. See https://github.com/apache/kafka/pull/15231#discussion_r1463743747 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]
jsancio commented on code in PR #15231: URL: https://github.com/apache/kafka/pull/15231#discussion_r1463743747 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -562,33 +568,37 @@ private VoteResponseData handleVoteRequest( VoteRequestData.PartitionData partitionRequest = request.topics().get(0).partitions().get(0); -int candidateId = partitionRequest.candidateId(); -int candidateEpoch = partitionRequest.candidateEpoch(); +if (partitionRequest.preVote()) { +throw new IllegalArgumentException("PreVote is not supported yet"); Review Comment: I prefer if we implement this in this PR. That should alleviate @hachikuji 's comment about adding a version for the Vote RPC without the replica fully supporting that version. We don't need to implement the prospective state and sending Vote request with pre-vote set to true in this PR. We just need to implement the handling of version 2 of Vote RPC in this PR. That include adding a KafkaRaftClientPreVoteTest suite that verifies the correct implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14576) Move ConsoleConsumer to tools
[ https://issues.apache.org/jira/browse/KAFKA-14576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810066#comment-17810066 ] Christo Lolov commented on KAFKA-14576: --- Sure [~mimaison] , apologies for not unassigned myself sooner! > Move ConsoleConsumer to tools > - > > Key: KAFKA-14576 > URL: https://issues.apache.org/jira/browse/KAFKA-14576 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]
C0urante opened a new pull request, #15249: URL: https://github.com/apache/kafka/pull/15249 [Jira](https://issues.apache.org/jira/browse/KAFKA-15675) Currently a draft; will promote to ready once several consecutive CI runs have completed without the flaky test failing. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15675) Fix flaky ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test
[ https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810057#comment-17810057 ] Chris Egerton commented on KAFKA-15675: --- I've done some analysis on this one and believe I've found the root cause. It's a confluence of a few different issues, but the TL;DR is: *the request to {{POST /connectors//restart?onlyFailed=false=false}} fails with a 409 error, this does not cause the test to (immediately) fail, but the connector is never restarted, which causes the test to time out while [waiting for the connector to be stopped|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L272-L275].* This kind of scenario probably raises several questions. Here's my best attempt to anticipate and address them: *Why does the 409 response not cause the test to immediately fail?* It's unclear on the original rationale for this, but the code structure [here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L374-L383] is fairly clear: issue the request, and if the status code is less than 400, attempt to deserialize the body. Then, unconditionally, return either null or the deserialized response body. *Why is the 409 response occurring?* The cluster (or, to be more specific, either the worker that received the initial REST request or, if the request was forwarded, the leader) detected that a rebalance due to an added/removed connector or new task configs was about to take place, and rejected the request. See the {{DistributedHerder}} class's [restartConnectorAndTasks|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1467] and [checkRebalanceNeeded|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2302-L2307] methods for the logic to check for pending rebalances, and its logic for detecting pending rebalances [here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2385], [here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2400], and [here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2419]. *Why is a rebalance pending by the time we try to restart the connector? Shouldn't the cluster and the set of connectors and tasks on it be stable by this point?* Yes, the cluster and set of connectors and tasks on it should be stable by the time we issue our restart request. We check to make sure that [every worker in the cluster is up and running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L116-L117] before proceeding with the rest of the test, and that the [connector and expected number of tasks are running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L252-L253] before issuing the restart request. Unfortunately, the former check–for worker liveness across the cluster–does not guarantee that every worker has joined the cluster. This check is [performed by issuing a request to the root resource|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L956-L975] ({{{}GET /{}}}) for each worker: if the response is valid (i.e., its body matches the expected format), then the worker is considered up and running. However, this does not guarantee that the worker has actually completed startup: it may not have finished reading to the end of internal topics, or had a chance to contact the group coordinator and join the cluster yet. After examining the logs of one test case, it appeared that the following sequence of events took place: # A single worker completes startup (creates and reads to the end of internal topics, then joins the cluster) # The connector is created (by chance, the REST request to create the connector happens to be sent to the only worker that has completed startup so far) # The
[jira] [Commented] (KAFKA-14576) Move ConsoleConsumer to tools
[ https://issues.apache.org/jira/browse/KAFKA-14576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810056#comment-17810056 ] Mickael Maison commented on KAFKA-14576: Hi [~christo_lolov], I'll have some spare cycles so I've reassigned this ticket to me. > Move ConsoleConsumer to tools > - > > Key: KAFKA-14576 > URL: https://issues.apache.org/jira/browse/KAFKA-14576 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14576) Move ConsoleConsumer to tools
[ https://issues.apache.org/jira/browse/KAFKA-14576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-14576: -- Assignee: Mickael Maison (was: Christo Lolov) > Move ConsoleConsumer to tools > - > > Key: KAFKA-14576 > URL: https://issues.apache.org/jira/browse/KAFKA-14576 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: populate TopicName in ConsumerGroupDescribe [kafka]
dajac commented on code in PR #15205: URL: https://github.com/apache/kafka/pull/15205#discussion_r1463681361 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember( private static List topicPartitionsFromMap( Map> partitions, -Map subscriptionMetadata +TopicsImage topicsImage ) { -return partitions.entrySet().stream().map( -item -> new ConsumerGroupDescribeResponseData.TopicPartitions() -.setTopicId(item.getKey()) -.setTopicName(lookupTopicNameById(item.getKey(), subscriptionMetadata)) -.setPartitions(new ArrayList<>(item.getValue())) -).collect(Collectors.toList()); +List topicPartitions = new ArrayList<>(); +for (Map.Entry> entry : partitions.entrySet()) { +Uuid topicId = entry.getKey(); +Set partitionSet = partitions.get(topicId); +//partitions.forEach((topicId, partitionSet) -> { +String topicName = lookupTopicNameById(topicId, topicsImage); +if (topicName != null) { +topicPartitions.add(new ConsumerGroupDescribeResponseData.TopicPartitions() +.setTopicId(topicId) +.setTopicName(topicName) +.setPartitions(new ArrayList<>(partitionSet))); +} else { +// When the topic has been deleted and the group/member hasn't updated, +// directly remove the topic from the assignment. +partitions.remove(topicId, partitionSet); Review Comment: We should not do this. We should only not add it to the reponse. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember( private static List topicPartitionsFromMap( Map> partitions, -Map subscriptionMetadata +TopicsImage topicsImage ) { -return partitions.entrySet().stream().map( -item -> new ConsumerGroupDescribeResponseData.TopicPartitions() -.setTopicId(item.getKey()) -.setTopicName(lookupTopicNameById(item.getKey(), subscriptionMetadata)) -.setPartitions(new ArrayList<>(item.getValue())) -).collect(Collectors.toList()); +List topicPartitions = new ArrayList<>(); +for (Map.Entry> entry : partitions.entrySet()) { Review Comment: nit: How about using `partitions.foreach`? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -17,10 +17,11 @@ package org.apache.kafka.coordinator.group.consumer; Review Comment: Should we add a unit test here to verify that non-existent partitions are not provided in the response? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ## @@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember( private static List topicPartitionsFromMap( Map> partitions, -Map subscriptionMetadata +TopicsImage topicsImage ) { -return partitions.entrySet().stream().map( -item -> new ConsumerGroupDescribeResponseData.TopicPartitions() -.setTopicId(item.getKey()) -.setTopicName(lookupTopicNameById(item.getKey(), subscriptionMetadata)) -.setPartitions(new ArrayList<>(item.getValue())) -).collect(Collectors.toList()); +List topicPartitions = new ArrayList<>(); +for (Map.Entry> entry : partitions.entrySet()) { +Uuid topicId = entry.getKey(); +Set partitionSet = partitions.get(topicId); +//partitions.forEach((topicId, partitionSet) -> { Review Comment: nit: Could we remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]
hachikuji commented on code in PR #15231: URL: https://github.com/apache/kafka/pull/15231#discussion_r1463682475 ## clients/src/main/resources/common/message/VoteRequest.json: ## @@ -18,7 +18,8 @@ "type": "request", "listeners": ["controller"], "name": "VoteRequest", - "validVersions": "0", + // Version 1 adds the PreVote field and renames CandidateEpoch and CandidateId to ReplicaEpoch and ReplicaId Review Comment: Should we set `latestVersionUnstable` until the feature is fully implemented? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update KIP-890 note [kafka]
jolshan commented on PR #15244: URL: https://github.com/apache/kafka/pull/15244#issuecomment-1906562169 Picked to 3.6 and 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]
lianetm commented on code in PR #15215: URL: https://github.com/apache/kafka/pull/15215#discussion_r1463584138 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1134,9 +1134,38 @@ private CompletableFuture assignPartitions( // Make assignment effective on the client by updating the subscription state. updateSubscription(assignedPartitions, false); +// Mark assigned partitions as pendingOnAssignedCallback to temporarily stop fetching or +// initializing positions for them. Passing the full set of assigned partitions +// (previously owned and newly added), given that they are all provided to the user in the +// callback, so we could expect offsets updates for any of them. +Set assignedTopicPartition = assignedPartitions.stream().map(tIdp -> tIdp.topicPartition()).collect(Collectors.toSet()); +subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, true); + // Invoke user call back. CompletableFuture result = invokeOnPartitionsAssignedCallback(addedPartitions); +// Resume partitions only if the callback succeeded. +result.whenComplete((error, callbackResult) -> { +if (error == null) { +// Remove pendingOnAssignedCallback flag from the assigned partitions, so we can +// start fetching, and updating positions for them if needed. + subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, false); +} else { +// Remove pendingOnAssignedCallback flag from the previously owned only so that Review Comment: Yes, you got it right. When callback fails, the assignment is not acked to the broker, and it remains as `assignmentReadyToReconcile` on the client. So when we get the poll based reconciliation that assignment will be retried, until it succeeds or the broker removes it from the assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update KIP-890 note [kafka]
jolshan merged PR #15244: URL: https://github.com/apache/kafka/pull/15244 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15675) Fix flaky ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test
[ https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-15675: - Assignee: Chris Egerton > Fix flaky > ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test > --- > > Key: KAFKA-15675 > URL: https://issues.apache.org/jira/browse/KAFKA-15675 > Project: Kafka > Issue Type: Bug >Reporter: Kirk True >Assignee: Chris Egerton >Priority: Major > Labels: flaky-test > Attachments: error.stacktrace.txt, error.stdout.txt > > > This integration test is flaky around 9% of test runs. Source: [Gradle > Enterprise test > trends|https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=KAFKA=org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest=testMultiWorkerRestartOnlyConnector]. > One failure had this message: > {code:java} > java.lang.AssertionError: Failed to stop connector and tasks within 12ms > {code} > Please see the attachments for the stack trace and stdout log. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]
lianetm commented on code in PR #15215: URL: https://github.com/apache/kafka/pull/15215#discussion_r1463584138 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1134,9 +1134,38 @@ private CompletableFuture assignPartitions( // Make assignment effective on the client by updating the subscription state. updateSubscription(assignedPartitions, false); +// Mark assigned partitions as pendingOnAssignedCallback to temporarily stop fetching or +// initializing positions for them. Passing the full set of assigned partitions +// (previously owned and newly added), given that they are all provided to the user in the +// callback, so we could expect offsets updates for any of them. +Set assignedTopicPartition = assignedPartitions.stream().map(tIdp -> tIdp.topicPartition()).collect(Collectors.toSet()); +subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, true); + // Invoke user call back. CompletableFuture result = invokeOnPartitionsAssignedCallback(addedPartitions); +// Resume partitions only if the callback succeeded. +result.whenComplete((error, callbackResult) -> { +if (error == null) { +// Remove pendingOnAssignedCallback flag from the assigned partitions, so we can +// start fetching, and updating positions for them if needed. + subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, false); +} else { +// Remove pendingOnAssignedCallback flag from the previously owned only so that Review Comment: Yes, you're right. When callback fails, the assignment is not acked to the broker, and it remains as `assignmentReadyToReconcile` on the client. So when we get the poll based reconciliation that assignment will be retried, until it succeeds or the broker removes it from the assignment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]
kirktrue commented on PR #15186: URL: https://github.com/apache/kafka/pull/15186#issuecomment-1906448498 > @kirktrue I think that the description is not quite right. First, I think that the session map was actually cleared previously. Second, the root cause is more along the line of what I said [here](https://github.com/apache/kafka/pull/15186#discussion_r1461537105). Could you please update the description of the PR? I will update the description today. Thanks @dajac! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16029) Investigate cause of "Unable to find FetchSessionHandler for node X" in logs
[ https://issues.apache.org/jira/browse/KAFKA-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16029: -- Reviewer: David Jacot (was: Ismael Juma) > Investigate cause of "Unable to find FetchSessionHandler for node X" in logs > > > Key: KAFKA-16029 > URL: https://issues.apache.org/jira/browse/KAFKA-16029 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.7.0, 3.8.0 > > > From [~mjsax]: > {quote}Looking into AK unit/integration test logs for KS, I regularly see an > ERROR log line that is triggered here: > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151] > Given that it seems not to impact the test (it's not failing because of > this), I am wondering why we log this at ERROR level or if it might be better > to reduce to WARN? It seems to happen fairly frequently, but it also seems > that it's nothing one would need to be concerned about, and thus using ERROR > might be more alerting to end users than it needs to be? Thoughts? > {quote} > According to Matthias, the running the {{EosIntegrationTest}} locally > reproduces the log line. > This is also reproducible by running the Apache Kafka quickstart. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
satishd commented on PR #14034: URL: https://github.com/apache/kafka/pull/14034#issuecomment-1906403295 Thanks @ijuma for the review comments. Updated the PR with the inline comments and/or with the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
lianetm commented on PR #15202: URL: https://github.com/apache/kafka/pull/15202#issuecomment-1906359448 Hey @dajac , thanks for the comments, all addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
lianetm commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463501761 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -272,7 +270,65 @@ public void testSyncAutocommitRetriedAfterRetriableException(Errors error) { // We expect that request should have been retried on this sync commit. assertExceptionHandling(commitRequestManger, error, true); -assertCoordinatorDisconnect(error); +} + +@Test +public void testCommitSyncThrowsCommitFailedExceptionOnFencedInstanceId() { + testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.FENCED_INSTANCE_ID); +} + +@Test +public void testCommitSyncThrowsCommitFailedExceptionOnUnknownMemberId() { + testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.UNKNOWN_MEMBER_ID); +} + +private void testCommitSyncFailsWithCommitFailedExceptionOnError(Errors commitError) { +CommitRequestManager commitRequestManger = create(false, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Map offsets = Collections.singletonMap( +new TopicPartition("topic", 1), +new OffsetAndMetadata(0)); + +// Send sync offset commit request that fails with an error that is expected to propagate +// a CommitFailedException +Long expirationTimeMs = time.milliseconds() + retryBackoffMs; +CompletableFuture commitResult = commitRequestManger.addOffsetCommitRequest(offsets, Optional.of(expirationTimeMs), false); +completeOffsetCommitRequestWithError(commitRequestManger, commitError); +assertFutureThrows(commitResult, CommitFailedException.class); +} + +@Test +public void testCommitSyncThrowsOffsetMetadataTooLargeException() { +// Error with metadata provided by the user should propagate the exception, so they can handle it. + testCommitSyncFailsWithErrorException(Errors.OFFSET_METADATA_TOO_LARGE); +} + +@Test +public void testCommitSyncThrowsInvalidCommitOffsetSizeException() { +// Error with data provided by the user should propagate the exception, so they can handle it. + testCommitSyncFailsWithErrorException(Errors.INVALID_COMMIT_OFFSET_SIZE); +} + +@Test +public void testCommitSyncThrowsGroupAuthorizationException() { + testCommitSyncFailsWithErrorException(Errors.GROUP_AUTHORIZATION_FAILED); +} + +private void testCommitSyncFailsWithErrorException(Errors commitError) { Review Comment: Agree, missed that. All done and it did simplified a lot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
lianetm commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463498713 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -440,28 +582,59 @@ public void testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(final // We expect that the request should not have been retried on this async commit. assertExceptionHandling(commitRequestManger, error, false); -assertCoordinatorDisconnect(error); } +@Test +public void testCommitAsyncThrowsKafkaExceptionForUnexpectedRetriableError() { +testCommitAsyncThrowsKafkaException(Errors.CORRUPT_MESSAGE); +} @Test -public void testAsyncOffsetCommitThrowsRetriableCommitExceptionForUnhandledRetriable() { +public void testCommitAsyncThrowsKafkaExceptionForUnexpectedNonRetriableError() { +testCommitAsyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR); +} + +private void testCommitAsyncThrowsKafkaException(Errors error) { CommitRequestManager commitRequestManger = create(true, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); Map offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0)); -// Send commit request without expiration (async commit) that fails with retriable -// network exception that has no specific handling. Should fail with -// RetriableCommitException. +// Send async commit that fails with unexpected error. Should fail with KafkaException. CompletableFuture commitResult = commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false); -completeOffsetCommitRequestWithError(commitRequestManger, Errors.NETWORK_EXCEPTION); +completeOffsetCommitRequestWithError(commitRequestManger, error); NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(0, res.unsentRequests.size()); assertTrue(commitResult.isDone()); assertTrue(commitResult.isCompletedExceptionally()); -assertFutureThrows(commitResult, RetriableCommitFailedException.class); +assertFutureThrows(commitResult, KafkaException.class); +} + +@Test +public void testCommitSyncThrowsKafkaExceptionForUnexpectedRetriableError() { +testCommitSyncThrowsKafkaException(Errors.CORRUPT_MESSAGE); +} + +@Test +public void testCommitSyncThrowsKafkaExceptionForUnexpectedNonRetriableError() { +testCommitSyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR); +} + +private void testCommitSyncThrowsKafkaException(Errors error) { Review Comment: Agree, it does not exist anymore after the refactoring for the tests common logic and params -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
lianetm commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463496440 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -414,8 +495,69 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool testNonRetriable(futures); assertEmptyPendingRequests(commitRequestManger); } +} -assertCoordinatorDisconnect(error); +@Test Review Comment: Sure, done for this and other similar cases that could be parametrized (That was the approach, not even sure why I did not use them in these) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]
nizhikov commented on PR #15248: URL: https://github.com/apache/kafka/pull/15248#issuecomment-1906283629 @tledkov can you, please, take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]
nizhikov commented on PR #15248: URL: https://github.com/apache/kafka/pull/15248#issuecomment-1906283217 Hello @mimaison , @jolshan I prepared second PR for KAFKA-14589. It contains `ConsumerGroupServiceTest` rewritten in java. Please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16186) Implement broker metrics for client telemetry usage
Apoorv Mittal created KAFKA-16186: - Summary: Implement broker metrics for client telemetry usage Key: KAFKA-16186 URL: https://issues.apache.org/jira/browse/KAFKA-16186 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal The KIP-714 lists new metrics for broker which records the usage of client telemetry instances and plugin. Implement broker metrics as defined in the KIP-714. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-14589 [WIP] [kafka]
nizhikov opened a new pull request, #15248: URL: https://github.com/apache/kafka/pull/15248 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15234: URL: https://github.com/apache/kafka/pull/15234#discussion_r1463408579 ## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ## @@ -919,4 +922,102 @@ public void testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U // 1 request should fail with throttling error. assertEquals(1, throttlingErrorCount); } + +@Test +public void testCacheEviction() throws UnknownHostException, InterruptedException { +Properties properties = new Properties(); +properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); +properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100"); +clientMetricsManager.updateSubscription("sub-1", properties); + +GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( +new GetTelemetrySubscriptionsRequestData(), true).build(); + +GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( +request, ClientMetricsTestUtils.requestContext()); +assertEquals(Errors.NONE, response.error()); + + assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId())); +assertEquals(1, clientMetricsManager.expirationTimer().size()); +// Cache expiry should occur after 100 * 3 = 300 ms, wait for at most 600 ms for the eviction +// to happen as eviction timer is scheduled in different thread. +assertTimeoutPreemptively(Duration.ofMillis(600), () -> { +// Validate that cache eviction happens and client instance is removed from cache. +while (clientMetricsManager.expirationTimer().size() != 0 || Review Comment: There is a flakiness if clock is not forced to advance, hence I have added that back and lowered the max wait time in assertion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15234: URL: https://github.com/apache/kafka/pull/15234#discussion_r1462570838 ## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ## @@ -919,4 +922,102 @@ public void testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U // 1 request should fail with throttling error. assertEquals(1, throttlingErrorCount); } + +@Test +public void testCacheEviction() throws UnknownHostException, InterruptedException { +Properties properties = new Properties(); +properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG); +properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100"); +clientMetricsManager.updateSubscription("sub-1", properties); + +GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder( +new GetTelemetrySubscriptionsRequestData(), true).build(); + +GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest( +request, ClientMetricsTestUtils.requestContext()); +assertEquals(Errors.NONE, response.error()); + + assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId())); +assertEquals(1, clientMetricsManager.expirationTimer().size()); +// Cache expiry should occur after 100 * 3 = 300 ms, wait for at most 600 ms for the eviction +// to happen as eviction timer is scheduled in different thread. +assertTimeoutPreemptively(Duration.ofMillis(600), () -> { +// Validate that cache eviction happens and client instance is removed from cache. +while (clientMetricsManager.expirationTimer().size() != 0 || Review Comment: I checked the code for `waitUntilTrue` which actually takes a pause with thread sleep. Though I agree the current code has a busy wait but refactoring the method from TestUtils.scala to server-common with changes in all test files seems to be huge. Rather I could create a TestUtils java class in server-common with `waitUntilTrue` method as like scala and could use that. But I have also done equivalent now in the current code where thread sleeps for 50 ms in the busy wait. Also the executable in `assertTimeoutPreemptively` is executed in different thread. Please let me know if it works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix "No suitable checks publisher found" message during CI build [kafka]
C0urante opened a new pull request, #15247: URL: https://github.com/apache/kafka/pull/15247 This message keeps popping up in our CI builds during the "Archive JUnit-formatted test results" step, and can be misleading since it appears to indicate that something is wrong. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
lianetm commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463379100 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) { continue; } -if (error == Errors.COORDINATOR_NOT_AVAILABLE || +if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); +return; +} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) { coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs); maybeRetry(currentTimeMs, error.exception()); return; +} else if (error == Errors.FENCED_INSTANCE_ID) { +log.info("OffsetCommit failed due to group instance id {} fenced: {}", groupInstanceId, error.message()); +future.completeExceptionally(new CommitFailedException()); Review Comment: got it. Message added to the exception too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
lianetm commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463372247 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) { continue; } -if (error == Errors.COORDINATOR_NOT_AVAILABLE || +if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); +return; +} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) { coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs); maybeRetry(currentTimeMs, error.exception()); return; +} else if (error == Errors.FENCED_INSTANCE_ID) { +log.info("OffsetCommit failed due to group instance id {} fenced: {}", groupInstanceId, error.message()); +future.completeExceptionally(new CommitFailedException()); Review Comment: Oh maybe just a confusion...I notice now that on the main PR page it still shows the `log.info`, but we do have the `log.error` I introduced in [this](https://github.com/apache/kafka/pull/15202/commits/f7b76fd75de6615ac089c6c8ff24166fc5683a2d) previous commit , so maybe just github tricking us? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
dajac commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463371625 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) { continue; } -if (error == Errors.COORDINATOR_NOT_AVAILABLE || +if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); +return; +} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) { coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs); maybeRetry(currentTimeMs, error.exception()); return; +} else if (error == Errors.FENCED_INSTANCE_ID) { +log.info("OffsetCommit failed due to group instance id {} fenced: {}", groupInstanceId, error.message()); +future.completeExceptionally(new CommitFailedException()); Review Comment: Ah. I was referring to adding a message to `new CommitFailedException()`. Sorry for the confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
lianetm commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463366293 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) { continue; } -if (error == Errors.COORDINATOR_NOT_AVAILABLE || +if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); +return; +} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) { coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs); maybeRetry(currentTimeMs, error.exception()); return; +} else if (error == Errors.FENCED_INSTANCE_ID) { +log.info("OffsetCommit failed due to group instance id {} fenced: {}", groupInstanceId, error.message()); +future.completeExceptionally(new CommitFailedException()); Review Comment: uhm I had addressed it by adding `log.error` instead of `log.info`, but maybe I misunderstood your comment. Were you suggesting something different? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]
dajac commented on PR #15186: URL: https://github.com/apache/kafka/pull/15186#issuecomment-1906093587 @kirktrue I think that the description is not quite right. First, I think that the session map was actually cleared previously. Second, the root cause is more along the line of what I said [here](https://github.com/apache/kafka/pull/15186#discussion_r1461537105). Could you please update the description of the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
dajac commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463282309 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -414,8 +495,69 @@ public void testOffsetFetchRequestErroredRequests(final Errors error, final bool testNonRetriable(futures); assertEmptyPendingRequests(commitRequestManger); } +} -assertCoordinatorDisconnect(error); +@Test Review Comment: nit: Should we use parameterized tests instead of specifying all cases like this? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -272,7 +270,65 @@ public void testSyncAutocommitRetriedAfterRetriableException(Errors error) { // We expect that request should have been retried on this sync commit. assertExceptionHandling(commitRequestManger, error, true); -assertCoordinatorDisconnect(error); +} + +@Test +public void testCommitSyncThrowsCommitFailedExceptionOnFencedInstanceId() { + testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.FENCED_INSTANCE_ID); +} + +@Test +public void testCommitSyncThrowsCommitFailedExceptionOnUnknownMemberId() { + testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.UNKNOWN_MEMBER_ID); +} + +private void testCommitSyncFailsWithCommitFailedExceptionOnError(Errors commitError) { +CommitRequestManager commitRequestManger = create(false, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + +Map offsets = Collections.singletonMap( +new TopicPartition("topic", 1), +new OffsetAndMetadata(0)); + +// Send sync offset commit request that fails with an error that is expected to propagate +// a CommitFailedException +Long expirationTimeMs = time.milliseconds() + retryBackoffMs; +CompletableFuture commitResult = commitRequestManger.addOffsetCommitRequest(offsets, Optional.of(expirationTimeMs), false); +completeOffsetCommitRequestWithError(commitRequestManger, commitError); +assertFutureThrows(commitResult, CommitFailedException.class); +} + +@Test +public void testCommitSyncThrowsOffsetMetadataTooLargeException() { +// Error with metadata provided by the user should propagate the exception, so they can handle it. + testCommitSyncFailsWithErrorException(Errors.OFFSET_METADATA_TOO_LARGE); +} + +@Test +public void testCommitSyncThrowsInvalidCommitOffsetSizeException() { +// Error with data provided by the user should propagate the exception, so they can handle it. + testCommitSyncFailsWithErrorException(Errors.INVALID_COMMIT_OFFSET_SIZE); +} + +@Test +public void testCommitSyncThrowsGroupAuthorizationException() { + testCommitSyncFailsWithErrorException(Errors.GROUP_AUTHORIZATION_FAILED); +} + +private void testCommitSyncFailsWithErrorException(Errors commitError) { Review Comment: This method is very similar to testCommitSyncFailsWithCommitFailedExceptionOnError. Could we share part of the implementation for both cases? For instance, one idea would be to pass the Errors and the exception Exception as params. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -440,28 +582,59 @@ public void testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(final // We expect that the request should not have been retried on this async commit. assertExceptionHandling(commitRequestManger, error, false); -assertCoordinatorDisconnect(error); } +@Test +public void testCommitAsyncThrowsKafkaExceptionForUnexpectedRetriableError() { +testCommitAsyncThrowsKafkaException(Errors.CORRUPT_MESSAGE); +} @Test -public void testAsyncOffsetCommitThrowsRetriableCommitExceptionForUnhandledRetriable() { +public void testCommitAsyncThrowsKafkaExceptionForUnexpectedNonRetriableError() { +testCommitAsyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR); +} + +private void testCommitAsyncThrowsKafkaException(Errors error) { CommitRequestManager commitRequestManger = create(true, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); Map offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0)); -// Send commit request without expiration (async commit) that fails with retriable -// network exception that has no specific handling. Should fail with -// RetriableCommitException. +// Send async commit that fails with unexpected error. Should fail with KafkaException. CompletableFuture commitResult
Re: [PR] KAFKA-15853: Move PasswordEncoder to server module [kafka]
mimaison commented on PR #15164: URL: https://github.com/apache/kafka/pull/15164#issuecomment-1906055970 @OmniaGM I opened an alternative PR for this: https://github.com/apache/kafka/pull/15246 `ConfigCommand` uses `PasswordEncoder` so it needs to be accessible from the tools module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15853: Move PasswordEncoder to server-common [kafka]
mimaison opened a new pull request, #15246: URL: https://github.com/apache/kafka/pull/15246 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14920) Address timeouts and out of order sequences
[ https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding updated KAFKA-14920: - Description: KAFKA-14844 showed the destructive nature of a timeout on the first produce request for a topic partition (ie one that has no state in psm) 由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。 最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager 中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。 was: KAFKA-14844 showed the destructive nature of a timeout on the first produce request for a topic partition (ie one that has no state in psm) Since we currently don't validate the first sequence (we will in part 2 of kip-890), any transient error on the first produce can lead to out of order sequences that never recover. Originally, KAFKA-14561 relied on the producer's retry mechanism for these transient issues, but until that is fixed, we may need to retry from in the AddPartitionsManager instead. We addressed the concurrent transactions, but there are other errors like coordinator loading that we could run into and see increased out of order issues. > Address timeouts and out of order sequences > --- > > Key: KAFKA-14920 > URL: https://issues.apache.org/jira/browse/KAFKA-14920 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.6.0 > > > KAFKA-14844 showed the destructive nature of a timeout on the first produce > request for a topic partition (ie one that has no state in psm) > 由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。 > 最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager > 中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]
dajac commented on code in PR #15202: URL: https://github.com/apache/kafka/pull/15202#discussion_r1463253788 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) { continue; } -if (error == Errors.COORDINATOR_NOT_AVAILABLE || +if (error == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); +return; +} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) { coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs); maybeRetry(currentTimeMs, error.exception()); return; +} else if (error == Errors.FENCED_INSTANCE_ID) { +log.info("OffsetCommit failed due to group instance id {} fenced: {}", groupInstanceId, error.message()); +future.completeExceptionally(new CommitFailedException()); +return; +} else if (error == Errors.OFFSET_METADATA_TOO_LARGE || +error == Errors.INVALID_COMMIT_OFFSET_SIZE) { +future.completeExceptionally(error.exception()); +return; } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { // just retry Review Comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Remove unreachable if-else block in ReplicaManager.scala [kafka]
drawxy commented on PR #15220: URL: https://github.com/apache/kafka/pull/15220#issuecomment-1905997013 > I can see there are thousands of lines changes. Is that expected? Hi @showuon , sorry for involving tens of commits by rebasing the wrong branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Remove unreachable if-else block in ReplicaManager.scala [kafka]
showuon commented on PR #15220: URL: https://github.com/apache/kafka/pull/15220#issuecomment-1905980778 I can see there are thousands of lines changes. Is that expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: ~~I have actually removed the chaining using `CompletableFuture` and simplified the logic. I just wait on thesecondary store write directly (with or without a timeout) and if the execution fails or the wait itself fails, I update the callback and return the same exception (because it's already completed).With this, there are no changes (apart from using `regularOffsets` when writing to secondary store) from the primary store. Let me know what you think about this.~~ I re-read the comments and looks like with this we are going against the approach you wanted in this https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: ~~I have actually removed the chaining using `CompletableFuture` and simplified the logic. I just wait on thesecondary store write directly (with or without a timeout) and if the execution fails or the wait itself fails, I update the callback and return the same exception (because it's already completed).With this, there are no changes (apart from using `regularOffsets` when writing to secondary store) from the primary store. Let me know what you think about this.~~ I re-read the comments and looks like with this we are going against the approach you wanted in this https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
showuon commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1463221572 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1396,11 +1396,13 @@ object TestUtils extends Logging { // Note: Call this method in the test itself, rather than the @AfterEach method. // Because of the assert, if assertNoNonDaemonThreads fails, nothing after would be executed. def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = { -val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => - !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) -} -val threadCount = nonDaemonThreads.size -assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}") +var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread] +waitUntilTrue(() => { + nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => +!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) + } + 0 == nonDemonThreads.size +}, s"Found unexpected ${nonDemonThreads.size} NonDaemon threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000) Review Comment: > BTW, which threads do you think are closed async during Kafka server shutdown? AFAIK, we have checks in place to ensure that we wait for proper close during shutdown Because in the failed test cases, They are `ReplicaFetcherThread` leaked. But had a quick look, yes, we did do `awaitShutdown()` for fetcher threads. Hmm... Let me dig into further tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158103 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); +try { +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (ExecutionException e) { +log.error("{} Flush of tombstone(s) offsets to secondary store threw an unexpected exception: ", this, e.getCause()); +callback.onCompletion(e.getCause(), null); +} catch (Exception e) { +log.error("{} Got Exception when trying to flush tombstone(s) offsets to secondary store", this, e); +callback.onCompletion(e, null); +} +}); +} +offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new FutureCallback<>((primaryWriteError, ignored) -> { Review Comment: I have not added this test at this point and I can add it if you think it's still needed with the new approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]
nizhikov commented on PR #14856: URL: https://github.com/apache/kafka/pull/14856#issuecomment-1905956206 Thanks for review and merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
divijvaidya commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1463200794 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1396,11 +1396,13 @@ object TestUtils extends Logging { // Note: Call this method in the test itself, rather than the @AfterEach method. // Because of the assert, if assertNoNonDaemonThreads fails, nothing after would be executed. def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = { -val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => - !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) -} -val threadCount = nonDaemonThreads.size -assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}") +var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread] +waitUntilTrue(() => { + nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => +!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) + } + 0 == nonDemonThreads.size +}, s"Found unexpected ${nonDemonThreads.size} NonDaemon threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000) Review Comment: BTW, which threads do you think are closed async during Kafka server shutdown? AFAIK, we have checks in place to ensure that we wait for proper close during shutdown -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
divijvaidya commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1463198772 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1396,11 +1396,13 @@ object TestUtils extends Logging { // Note: Call this method in the test itself, rather than the @AfterEach method. // Because of the assert, if assertNoNonDaemonThreads fails, nothing after would be executed. def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = { -val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => - !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) -} -val threadCount = nonDaemonThreads.size -assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}") +var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread] +waitUntilTrue(() => { + nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => +!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) + } + 0 == nonDemonThreads.size +}, s"Found unexpected ${nonDemonThreads.size} NonDaemon threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000) Review Comment: This will increase the run-time for test suite when it is leaking threads. Which should be ok, as the happy case (when we are not leaking remains the same). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove controlPlaneRequestProcessor in BrokerServer [kafka]
appchemist commented on PR #15245: URL: https://github.com/apache/kafka/pull/15245#issuecomment-1905904446 Git fetch failed in CI due to the error "couldn't find remote ref refs/pull/15245/head". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
showuon commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1463172726 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1396,11 +1396,13 @@ object TestUtils extends Logging { // Note: Call this method in the test itself, rather than the @AfterEach method. // Because of the assert, if assertNoNonDaemonThreads fails, nothing after would be executed. def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = { -val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => - !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) -} -val threadCount = nonDaemonThreads.size -assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}") +var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread] +waitUntilTrue(() => { + nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => +!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) + } + 0 == nonDemonThreads.size +}, s"Found unexpected ${nonDemonThreads.size} NonDaemon threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000) Review Comment: cc @divijvaidya , I found sometimes the [CI](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_11_and_Scala_2_13___testSuccessfulBuildRemoteLogAuxStateMetrics__/) is too sensitive to the non demean threads check. There are some shutdown are in async way. So you can check the failed results [here](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/): Basically, if there are some resource not closed, all the following tests should also fail (I verified in my local env). But in the CI results, it only fail 2 of replicaManagertest, and only in jdk11. So I'm going to verify it using `waitUntilTrue` to give it some chance to wait for the threads shutdown. I also set the wait time as 1 second because if there are really resources leaked, the total wait time will be the product of `waitTime` and the number of all the following failed tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
showuon commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1463172726 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1396,11 +1396,13 @@ object TestUtils extends Logging { // Note: Call this method in the test itself, rather than the @AfterEach method. // Because of the assert, if assertNoNonDaemonThreads fails, nothing after would be executed. def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = { -val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => - !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) -} -val threadCount = nonDaemonThreads.size -assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}") +var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread] +waitUntilTrue(() => { + nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => +!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) + } + 0 == nonDemonThreads.size +}, s"Found unexpected ${nonDemonThreads.size} NonDaemon threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000) Review Comment: cc @divijvaidya , I found sometimes the [CI](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_11_and_Scala_2_13___testSuccessfulBuildRemoteLogAuxStateMetrics__/) is too sensitive to the non demean threads check. There are some shutdown are in async way. So you can check the failed results [here](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/): Basically, if there are some resource not closed, all the following tests should also fail (I verified in my local env). But in the CI results, it only fail 2 of replicaManagertest, and only in jdk11. So I'm going to verify it using `waitUntilTrue` to give it some chance to wait for the threads shutdown. I also set the wait time as 1 second because if there are really resources leaked, the wait time will be the product of `waitTime` and the number of all the following failed tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
showuon commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1463172726 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1396,11 +1396,13 @@ object TestUtils extends Logging { // Note: Call this method in the test itself, rather than the @AfterEach method. // Because of the assert, if assertNoNonDaemonThreads fails, nothing after would be executed. def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = { -val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => - !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) -} -val threadCount = nonDaemonThreads.size -assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}") +var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread] +waitUntilTrue(() => { + nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t => +!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix) + } + 0 == nonDemonThreads.size +}, s"Found unexpected ${nonDemonThreads.size} NonDaemon threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000) Review Comment: cc @divijvaidya , I found sometimes the [CI](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_11_and_Scala_2_13___testSuccessfulBuildRemoteLogAuxStateMetrics__/) is too sensitive to the non demean threads check. There are some shutdown are in async way. So you can check the failed results [here](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/): Basically, if there are some resource not closed, all the following tests should also fail (I verified in my local env). But in the CI results, it only fail 2 of replicaManagertest, and only in jdk11. So I'm going to verify it using `waitUntilTrue` to give it some chance to wait for the threads shutdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: I re-read the comments and looks like with this we are going against the approach you wanted in this [comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: I re-read the comments again and looks like with this we are going against the approach you wanted in this [comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: I re-read the comments again and looks like with this we are going against the approach you wanted in this [comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this but I had forgotten about this comment in particular. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
clolov commented on PR #15213: URL: https://github.com/apache/kafka/pull/15213#issuecomment-1905886714 Thanks a lot for the review @kamalcph! I have hopefully addressed everything -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
clolov commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1463164583 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() -val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { +val epochOpt: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() + if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() }) -val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) -else Optional.empty[Integer]() - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache))) + } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { +if (remoteLogEnabled()) { + val curHighestRemoteOffset = highestOffsetInRemoteStorage() + + val optEpoch: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { +val epoch = cache.epochForOffset(curHighestRemoteOffset) +if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() + }) + + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, highestOffsetInRemoteStorage(), optEpoch)) +} else { + Option.empty Review Comment: Good catch! In the KIP I have specified that Kafka should return no offset in such situations. I shall aim to add an integration test from the point of view of the client in an upcoming pull request ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1280,7 +1282,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) && targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP && targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && -targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP) +targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP && +targetTimestamp != ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) Review Comment: I have removed them both, but I don't think it would have caused problems either way. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() -val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { +val epochOpt: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() + if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() }) -val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) -else Optional.empty[Integer]() - Review Comment: Hopefully I have achieved both in the subsequent commit (reverted one of the changes and made both easier to read). Let me know if this isn't the case! ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -2126,6 +2126,94 @@ class UnifiedLogTest { log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } + @Test + def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = { +val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) +val log = createLog(logDir, logConfig) + +assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + +val firstTimestamp = mockTime.milliseconds +val leaderEpoch = 0 +log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + +val secondTimestamp = firstTimestamp + 1 +log.appendAsLeader(TestUtils.singletonRecords( +
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158347 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -299,12 +349,11 @@ public Future set(Map values, Callback callb } catch (Exception e) { log.warn("Failed to write offsets to secondary backing store", e); } +callback.onCompletion(null, null); } } -try (LoggingContext context = loggingContext()) { -callback.onCompletion(primaryWriteError, ignored); -} -}); Review Comment: Added it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158103 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); +try { +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (ExecutionException e) { +log.error("{} Flush of tombstone(s) offsets to secondary store threw an unexpected exception: ", this, e.getCause()); +callback.onCompletion(e.getCause(), null); +} catch (Exception e) { +log.error("{} Got Exception when trying to flush tombstone(s) offsets to secondary store", this, e); +callback.onCompletion(e, null); +} +}); +} +offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new FutureCallback<>((primaryWriteError, ignored) -> { Review Comment: I have not added this test at this point and I can add it if you think it's still needed with the new approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { Review Comment: I have actually removed the chaining using `CompletableFuture` and simplified the logic. I just wait on the secondary store write directly (with or without a timeout) and if the execution fails or the wait itself fails, I update the callback and return the same exception (because it's already completed). ``` if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, (t, r) -> { }); try { if (exactlyOnce) { secondaryWriteFuture.get(); } else { secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); } log.debug("Successfully flushed tombstone offsets to secondary store"); } catch (ExecutionException e) { log.error("{} Failed to flush tombstone offsets to secondary store", this, e.getCause()); callback.onCompletion(e.getCause(), null); return secondaryWriteFuture; } catch (Throwable e) { log.error("{} Failed to flush tombstone offsets to secondary store", this, e); callback.onCompletion(e, null); return secondaryWriteFuture; } } ``` With this, there are no changes (apart from using `regularOffsets` when writing to secondary store) from the primary store. Let me know what you think about this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Remove controlPlaneRequestProcessor in BrokerServer [kafka]
appchemist opened a new pull request, #15245: URL: https://github.com/apache/kafka/pull/15245 In BrokerServer, controlPlaneRequestProcessor is always null and is not used. In addition, `validateControlPlaneListenerEmptyForKRaft` in `KafkaConfig` checks that `controlPlaneListenerName` is empty in KRaft mode. So, controlPlaneRequestProcessor is needed in BrokerServer ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463144968 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenAccept(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); Review Comment: I took the non-fancy route for now i.e using an inline no-op `Callback`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463143759 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,54 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); Review Comment: Thanks for this. I updated the logic. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -254,7 +260,12 @@ public Map get(long timeout, TimeUnit unit) throws Inter * write to that store, and the passed-in {@link Callback} is invoked once that write completes. If a worker-global * store is provided, a secondary write is made to that store if the write to the connector-specific store * succeeds. Errors with this secondary write are not reflected in the returned {@link Future} or the passed-in - * {@link Callback}; they are only logged as a warning to users. + * {@link Callback}; they are only logged as a warning to users. The only exception to this rule is when the offsets + * that need to be committed contains tombstone records as well. In such cases, a write consisting of only tombstone + * offsets would first happen on the worker-global store and only if it succeeds, would all the offsets be written + * to the connector-specific store and the regular offsets would be written to the worker-global store. Note that + * in this case, failure to write regular offsets to secondary store would still not reflect in the returned + * {@link Future} or the passed-in {@link Callback} Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
showuon commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1463136420 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -406,93 +408,51 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter() - def recordRemoteCopyLagBytes(partition: Int, bytesLag: Long): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.setPartitionMetricValue(partition, bytesLag) - } - - def removeRemoteCopyLagBytes(partition: Int): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.removePartition(partition) + def remoteCopyLagBytesAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).aggregatedMetric } // Visible for testing - def remoteCopyLagBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteCopyLagSegments(partition: Int, segmentsLag: Long): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.setPartitionMetricValue(partition, segmentsLag) - } + def remoteCopyLagBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).aggregatedMetric.value() - def removeRemoteCopyLagSegments(partition: Int): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.removePartition(partition) + def remoteCopyLagSegmentsAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).aggregatedMetric } // Visible for testing - def remoteCopyLagSegments: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteLogMetadataCount(partition: Int, count: Long): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.setPartitionMetricValue(partition, count) - } + def remoteCopyLagSegments: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).aggregatedMetric.value() - def removeRemoteLogMetadataCount(partition: Int): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.removePartition(partition) + def remoteLogMetadataCountAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).aggregatedMetric } - def remoteLogMetadataCount: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteLogSizeBytes(partition: Int, size: Long): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.setPartitionMetricValue(partition, size) - } + def remoteLogMetadataCount: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).aggregatedMetric.value() - def removeRemoteLogSizeBytes(partition: Int): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.removePartition(partition) + def remoteLogSizeBytesAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).aggregatedMetric } - def remoteLogSizeBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): Unit = { -val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric -brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent) - } + def remoteLogSizeBytes: Long =
Re: [PR] MINOR Remove unreachable if-else block in ReplicaManager.scala [kafka]
divijvaidya commented on PR #15220: URL: https://github.com/apache/kafka/pull/15220#issuecomment-1905791005 @drawxy can you please fix the failing compilation. Otherwise looks good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]
divijvaidya commented on code in PR #15239: URL: https://github.com/apache/kafka/pull/15239#discussion_r1463080761 ## build.gradle: ## @@ -2286,8 +2286,9 @@ project(':streams:streams-scala') { api project(':streams') api libs.scalaLibrary -api libs.scalaCollectionCompat - +if ( versions.baseScala == '2.12' ) { + api libs.scalaCollectionCompat Review Comment: Can you please add a comment here explaining the reason for this removal of dependency, basically, capture what you wrote in PR here. It helps the readers of the code understand why certain actions were taken. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27
[ https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809875#comment-17809875 ] Divij Vaidya commented on KAFKA-16066: -- [~high.lee] please feel free to pick this one up. There has been no activity from previous requester on this Jira for more than 20 days now > Upgrade apacheds to 2.0.0.AM27 > -- > > Key: KAFKA-16066 > URL: https://issues.apache.org/jira/browse/KAFKA-16066 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Major > Labels: newbie, newbie++ > > We are currently using a very old dependency. Notably, apacheds is only used > for testing when we use MiniKdc, hence, there is nothing stopping us from > upgrading it. > Notably, apacheds has removed the component > org.apache.directory.server:apacheds-protocol-kerberos in favour of using > Apache Kerby, hence, we need to make changes in MiniKdc.scala for this > upgrade to work correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10659) Cogroup topology generation fails if input streams are repartitioned
[ https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809859#comment-17809859 ] Stanislav Spiridonov commented on KAFKA-10659: -- The same issue here. The workaround with groupByKey -> with works but I need to create these topics manually > Cogroup topology generation fails if input streams are repartitioned > > > Key: KAFKA-10659 > URL: https://issues.apache.org/jira/browse/KAFKA-10659 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0, 2.5.1 >Reporter: blueedgenick >Priority: Major > > Example to reproduce: > > {code:java} > KGroupedStream groupedA = builder > .stream(topicA, Consumed.with(Serdes.String(), serdeA)) > .selectKey((aKey, aVal) -> aVal.someId) > .groupByKey(); > KGroupedStream groupedB = builder > .stream(topicB, Consumed.with(Serdes.String(), serdeB)) > .selectKey((bKey, bVal) -> bVal.someId) > .groupByKey(); > KGroupedStream groupedC = builder > .stream(topicC, Consumed.with(Serdes.String(), serdeC)) > .selectKey((cKey, cVal) -> cVal.someId) > .groupByKey(); > CogroupedKStream cogroup = groupedA.cogroup(AggregatorA) > .cogroup(groupedB, AggregatorB) > . cogroup(groupedC, AggregatorC); > // Aggregate all streams of the cogroup > KTable agg = cogroup.aggregate( > () -> new ABC(), > Named.as("my-agg-proc-name"), > Materialized.>as( > "abc-agg-store") > .withKeySerde(Serdes.String()) > .withValueSerde(serdeABC) > ); > {code} > > > This throws an exception during topology generation: > > {code:java} > org.apache.kafka.streams.errors.TopologyException: Invalid topology: > Processor abc-agg-store-repartition-filter is already added. at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter > nalTopologyBuilder.java:485)` > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553) > at ... > {code} > > The same exception is observed if the `selectKey(...).groupByKey()` pattern > is replaced with `groupBy(...)`. > This behavior is observed with topology optimization at default state, > explicitly set off, or explicitly set on. > Interestingly the problem is avoided, and a workable topology produced,, if > the grouping step is named by passing a `Grouped.with(...)` expression to > either `groupByKey`` or `groupBy`. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]
showuon commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1462970366 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1650,15 +1650,27 @@ private static void shutdownAndAwaitTermination(ExecutorService pool, String poo } private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) { -BrokerTopicMetrics topicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic()); -int partition = topicIdPartition.partition(); -topicMetrics.removeRemoteCopyLagBytes(partition); -topicMetrics.removeRemoteCopyLagSegments(partition); -topicMetrics.removeRemoteDeleteLagBytes(partition); -topicMetrics.removeRemoteDeleteLagSegments(partition); -topicMetrics.removeRemoteLogMetadataCount(partition); -topicMetrics.removeRemoteLogSizeComputationTime(partition); -topicMetrics.removeRemoteLogSizeBytes(partition); +String topic = topicIdPartition.topic(); +if (!brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) { +// The topic metrics are already removed, removing this topic key from broker-level metrics +brokerTopicStats.removeBrokerLevelRemoteCopyLagBytes(topic); Review Comment: @kamalcph , when running `ReplicaManager#stopPartitions`, it'll call [maybeRemoveTopicMetrics](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L604), and the specific topic metrics will be removed if all partitions are offline (i.e. topics deletion). And the metrics removal will also remove the topic key from brokerTopicStats [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L646). That's when it will be deleted. So, it the topic metrics are all deleted, we also need to remove the topic key from broker-level allTopics metrics. Hope that's clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org