Re: [PR] MINOR: KIP-848 Uniform Assignor Bugs [kafka]
dajac commented on PR #15286: URL: https://github.com/apache/kafka/pull/15286#issuecomment-1916253886 Thanks @rreddy-22! Could you please describe the bug(s) in the description? Do we need to add new tests for them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]
showuon commented on code in PR #15273: URL: https://github.com/apache/kafka/pull/15273#discussion_r1470688447 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ## @@ -290,7 +320,15 @@ public Optional serverConfigName(String configName) { .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM, TopicConfig.LOCAL_LOG_RETENTION_MS_DOC) .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, -TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC); +TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) +.define(LOG_DIR_PROP, STRING, DEFAULT_LOG_DIR, HIGH, LOG_DIR_DOC) +.define(LOG_DIRS_PROP, STRING, null, HIGH, LOG_DIRS_DOC) +.define(METADATA_LOG_DIR_PROP, STRING, null, HIGH, METADATA_LOG_DIR_DOC) +.define(INTER_BROKER_PROTOCOL_VERSION_PROP, STRING, DEFAULT_INTER_BROKER_PROTOCOL_VERSION, new MetadataVersionValidator(), MEDIUM, INTER_BROKER_PROTOCOL_VERSION_DOC) +// This indicates whether unreleased APIs should be advertised by this node. +.defineInternal(UNSTABLE_API_VERSIONS_ENABLE_PROP, BOOLEAN, DEFAULT_UNSTABLE_API_VERSIONS_ENABLE, HIGH) +// This indicates whether unreleased MetadataVersions should be enabled on this node. +.defineInternal(UNSTABLE_METADATA_VERSIONS_ENABLE_PROP, BOOLEAN, DEFAULT_UNSTABLE_METADATA_VERSIONS_ENABLE, HIGH); Review Comment: I didn't see where we remove the definition from original kafkaConfig? ## raft/src/main/java/org/apache/kafka/raft/RaftConfig.java: ## @@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) { } } +private static Set parseProcessRoles(List processRoles, Map voterConnections, int nodeId) { Review Comment: Where do we remove the original `parseProcessRoles` method? ## core/src/test/scala/unit/kafka/log/LogConfigTest.scala: ## @@ -94,6 +94,18 @@ class LogConfigTest { case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean") case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3") case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-3") + case LogConfig.LOG_DIR_PROP => assert(true) + case LogConfig.LOG_DIRS_PROP => assert(true) Review Comment: What are we testing here? ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ## @@ -500,6 +553,34 @@ public static Map configKeys() { return Collections.unmodifiableMap(CONFIG.configKeys()); } +public List logDirs() { +String csvList = logDirs != null ? logDirs : logDir; +if (csvList == null || csvList.isEmpty()) { +return Collections.emptyList(); +} else { +return Arrays.stream(csvList.split("\\s*,\\s*")) +.filter(v -> !v.equals("")) +.collect(Collectors.toList()); +} +} + +public String getMetadataLogDir() { +if (metadataLogDir != null) { +return metadataLogDir; +} else { +return logDirs().get(0); +} +} + +public Optional interBrokerProtocolVersion() { +String originalIBP = (String) originals().get(LogConfig.INTER_BROKER_PROTOCOL_VERSION_PROP); +return originalIBP != null ? Optional.of(originalIBP) : Optional.empty(); +} + +public Boolean unstableMetadataVersionsEnabled() { +return unstableMetadataVersionsEnabled; +} Review Comment: Why do we need these methods? ## raft/src/main/java/org/apache/kafka/raft/RaftConfig.java: ## @@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) { } } +private static Set parseProcessRoles(List processRoles, Map voterConnections, int nodeId) { +Set distinctRoles = new HashSet<>(); +for (String role : processRoles) { +switch (role) { +case "broker": +distinctRoles.add("BrokerRole"); Review Comment: Why don't we use the `ProcessRole` as before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1470669307 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig, delta: TopicsDelta, topicId: Uuid): Option[(Partition, Boolean)] = { getPartition(tp) match { - case HostedPartition.Offline => -stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + - s"with topic id $topicId because it resides in an offline log " + - "directory.") -None + case HostedPartition.Offline(offlinePartition) => +if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) { + stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + +s"with topic id $topicId because it resides in an offline log " + +"directory.") + None +} else { + stateChangeLogger.info(s"Creating new partition $tp with topic id " + s"$topicId." + +s"A topic with the same name but different id exists but it resides in an offline log " + +s"directory.") Review Comment: Question: From this check `if (offlinePartition.flatMap(p => p.topicId).contains(topicId))`, we can make sure this partition is not in an offline dir, but how could we know if the partition is an Online dir, or an None dir, or even in another offline dir? Should we use topicIDPartition as the key for `allPartitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]
cmccabe commented on PR #15230: URL: https://github.com/apache/kafka/pull/15230#issuecomment-1916170746 committed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]
cmccabe closed pull request #15230: KAFKA-14616: Fix stray replica of recreated topics in KRaft mode URL: https://github.com/apache/kafka/pull/15230 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]
cmccabe commented on PR #15230: URL: https://github.com/apache/kafka/pull/15230#issuecomment-1916148593 > Question: why can't we delete these disappeared(deleted) logs directly? I think this can only happen when topic deleting while the node is offline. If so, then deleting them should be fine? It probably would be fine, but I'm a bit nervous about it. What I'd really like is to delete them after a time delay of a few hours, but the code doesn't support that currently. We should add this functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]
cmccabe commented on code in PR #15230: URL: https://github.com/apache/kafka/pull/15230#discussion_r1470637685 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -563,6 +565,18 @@ class LogManager(logDirs: Seq[File], startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames)) } + def deleteStrayKRaftReplicas( +brokerId: Int, +image: TopicsImage + ): Unit = { +val strayPartitions = findStrayReplicas(brokerId, image, allLogs) +strayPartitions.foreach(topicPartition => { + val log = getLog(topicPartition).get + log.renameDir(UnifiedLog.logStrayDirName(topicPartition), shouldReinitialize = true) + asyncDelete(topicPartition, false, false, true) Review Comment: OK. I will delete the call to `log.renameDir`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8
[ https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812157#comment-17812157 ] Luke Chen commented on KAFKA-16209: --- Thanks [~goyarpit]! > fetchSnapshot might return null if topic is created before v2.8 > --- > > Key: KAFKA-16209 > URL: https://issues.apache.org/jira/browse/KAFKA-16209 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.1 >Reporter: Luke Chen >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Remote log manager will fetch snapshot via ProducerStateManager > [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608], > but the snapshot map might get nothing if the topic has no snapshot created, > ex: topics before v2.8. Need to fix it to avoid NPE. > old PR: https://github.com/apache/kafka/pull/14615/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request
[ https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812155#comment-17812155 ] Kamal Chandraprakash commented on KAFKA-15776: -- I've opened a KIP to add new `fetch.remote.max.wait.ms` dynamic config: [KIP-1018|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests]. Please post your feedback and suggestions on the mailing thread. > Update delay timeout for DelayedRemoteFetch request > --- > > Key: KAFKA-15776 > URL: https://issues.apache.org/jira/browse/KAFKA-15776 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for > DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the > given amount of time when there is no data available to serve the FETCH > request. > {code:java} > The maximum amount of time the server will block before answering the fetch > request if there isn't sufficient data to immediately satisfy the requirement > given by fetch.min.bytes. > {code} > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41] > Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the > user on how to configure optimal value for each purpose. Moreover, the config > is of *LOW* importance and most of the users won't configure it and use the > default value of 500 ms. > Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to > higher number of expired delayed remote fetch requests when the remote > storage have any degradation. > We should introduce one {{fetch.remote.max.wait.ms}} config (preferably > server config) to define the delay timeout for DelayedRemoteFetch requests > (or) take it from client similar to {{request.timeout.ms}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-4759) Add support for subnet masks in SimpleACLAuthorizer
[ https://issues.apache.org/jira/browse/KAFKA-4759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812144#comment-17812144 ] Jorge Esteban Quilcate Otoya commented on KAFKA-4759: - Thanks [~theszym] ! I wasn't aware of the largely discussed KIP backing this issue and PR. I pretty much agree with Tom[1] on how to approach further discussion, though. As the KIP has been dormant for a while, I'd suggest either check if author (Sönke Liebau) has any intention to progress the original discussion, or/and creating a new KIP with a proposal including how to address the challenges open on the original KIP (e.g. backward compatibility issues). PS. It may be a good time to bump this discussion given the current KRaft/Zookeeper-removal. [1] https://lists.apache.org/thread/jp974bd05fsdwrws7kbprykdw9g65dt1 > Add support for subnet masks in SimpleACLAuthorizer > --- > > Key: KAFKA-4759 > URL: https://issues.apache.org/jira/browse/KAFKA-4759 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Shun Takebayashi >Assignee: Shun Takebayashi >Priority: Major > > SimpleACLAuthorizer currently accepts only single IP addresses. > Supporting subnet masks with SimpleACLAuthorizer can make ACL configurations > simpler. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1470557475 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: Yes, I agree with @OmniaGM that we don't need to branch if/else here since we already do the handling in `createLogIfNotExists`. So we only need to pass in `partitionState.isNew` correctly here. Does that make sense? ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: Also, before this change, we'll always treat it as `new` if `directoryId == DirectoryId.UNASSIGNED`. But now, it needs to be `directoryId == DirectoryId.UNASSIGNED && partitionState.isNew`. Will that cause other issue @OmniaGM ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8
[ https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812139#comment-17812139 ] Arpit Goyal commented on KAFKA-16209: - [~showuon] I am picking it up. > fetchSnapshot might return null if topic is created before v2.8 > --- > > Key: KAFKA-16209 > URL: https://issues.apache.org/jira/browse/KAFKA-16209 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.1 >Reporter: Luke Chen >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Remote log manager will fetch snapshot via ProducerStateManager > [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608], > but the snapshot map might get nothing if the topic has no snapshot created, > ex: topics before v2.8. Need to fix it to avoid NPE. > old PR: https://github.com/apache/kafka/pull/14615/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8
[ https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arpit Goyal reassigned KAFKA-16209: --- Assignee: Arpit Goyal > fetchSnapshot might return null if topic is created before v2.8 > --- > > Key: KAFKA-16209 > URL: https://issues.apache.org/jira/browse/KAFKA-16209 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.1 >Reporter: Luke Chen >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Remote log manager will fetch snapshot via ProducerStateManager > [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608], > but the snapshot map might get nothing if the topic has no snapshot created, > ex: topics before v2.8. Need to fix it to avoid NPE. > old PR: https://github.com/apache/kafka/pull/14615/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1470555356 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -452,21 +453,38 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +Set statesFilter, +Set typesFilter, +long committedOffset +) { +// Converts each string to a value in the GroupType enum while being case-insensitive. +Set enumTypesFilter = typesFilter.stream() +.map(Group.GroupType::parse) +.collect(Collectors.toSet()); Review Comment: I initially threw the illegalArgException and then caught it and ignored it in the listGroups method but this caused the opposite result from what we wanted aka it returns all the groups since the typeFilter is empty. The only way to get the expected behavior is to add unknown to the group type enum. This gives us the same result as when an unknown state filter is passed. I even added tests to make sure. Lmk if this doesn't work or if there was something else you had in mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]
JimmyWang6 commented on PR #14327: URL: https://github.com/apache/kafka/pull/14327#issuecomment-1916021518 > h level comments to start with. Are you on the Apache slack? We could also discuss there offline if you want. @dajac Thanks for your reply! I will remove this part of code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]
JimmyWang6 commented on PR #14327: URL: https://github.com/apache/kafka/pull/14327#issuecomment-1916020627 > As a first step, it would be great if we could keep this pull request focused on the RPC and its implementation on the server side. I would extract the command line tool part into its own pull request. Would it be possible? Thanks for your reply! I will remove this part of code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptor [kafka]
Joker-5 commented on PR #14963: URL: https://github.com/apache/kafka/pull/14963#issuecomment-1916017056 Hey, @lucasbru, I saw [this pr](https://github.com/apache/kafka/pull/15000) had been merged, but it seems there's no `Co-authord-by` tag in the final commit. So there may something wrong in it? I would be extremely grateful if you would tell me about it if you have time. Feel so sorry to trouble you again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming merged PR #14846: URL: https://github.com/apache/kafka/pull/14846 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
github-actions[bot] commented on PR #14309: URL: https://github.com/apache/kafka/pull/14309#issuecomment-1916012508 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix NPE during fetchSnapshot [kafka]
showuon commented on PR #14615: URL: https://github.com/apache/kafka/pull/14615#issuecomment-1916003844 [KAFKA-16209](https://issues.apache.org/jira/browse/KAFKA-16209) is created for this issue. Closing this stale PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix NPE during fetchSnapshot [kafka]
showuon closed pull request #14615: MINOR: Fix NPE during fetchSnapshot URL: https://github.com/apache/kafka/pull/14615 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8
Luke Chen created KAFKA-16209: - Summary: fetchSnapshot might return null if topic is created before v2.8 Key: KAFKA-16209 URL: https://issues.apache.org/jira/browse/KAFKA-16209 Project: Kafka Issue Type: Bug Affects Versions: 3.6.1 Reporter: Luke Chen Remote log manager will fetch snapshot via ProducerStateManager [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608], but the snapshot map might get nothing if the topic has no snapshot created, ex: topics before v2.8. Need to fix it to avoid NPE. old PR: https://github.com/apache/kafka/pull/14615/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8
[ https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16209: -- Labels: newbie newbie++ (was: ) > fetchSnapshot might return null if topic is created before v2.8 > --- > > Key: KAFKA-16209 > URL: https://issues.apache.org/jira/browse/KAFKA-16209 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.1 >Reporter: Luke Chen >Priority: Major > Labels: newbie, newbie++ > > Remote log manager will fetch snapshot via ProducerStateManager > [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608], > but the snapshot map might get nothing if the topic has no snapshot created, > ex: topics before v2.8. Need to fix it to avoid NPE. > old PR: https://github.com/apache/kafka/pull/14615/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR kip 848 uniform assignor bugs [kafka]
rreddy-22 opened a new pull request, #15286: URL: https://github.com/apache/kafka/pull/15286 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request
[ https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812112#comment-17812112 ] Jorge Esteban Quilcate Otoya commented on KAFKA-15776: -- Totally agree – was just trying to highlight the implications for not interrupting the threads; but I'm also in favor of improving configurations to deal with remote fetches. > Update delay timeout for DelayedRemoteFetch request > --- > > Key: KAFKA-15776 > URL: https://issues.apache.org/jira/browse/KAFKA-15776 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for > DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the > given amount of time when there is no data available to serve the FETCH > request. > {code:java} > The maximum amount of time the server will block before answering the fetch > request if there isn't sufficient data to immediately satisfy the requirement > given by fetch.min.bytes. > {code} > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41] > Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the > user on how to configure optimal value for each purpose. Moreover, the config > is of *LOW* importance and most of the users won't configure it and use the > default value of 500 ms. > Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to > higher number of expired delayed remote fetch requests when the remote > storage have any degradation. > We should introduce one {{fetch.remote.max.wait.ms}} config (preferably > server config) to define the delay timeout for DelayedRemoteFetch requests > (or) take it from client similar to {{request.timeout.ms}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15839) Topic ID integration in consumer subscription state
[ https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15839: -- Labels: kip-848 kip-848-client-support (was: kip-848 kip-848-client-support kip-848-e2e kip-848-preview) > Topic ID integration in consumer subscription state > --- > > Key: KAFKA-15839 > URL: https://issues.apache.org/jira/browse/KAFKA-15839 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > With the new consumer group protocol, assignments received by the consumer > contain topic IDs instead of topic names. Topic Ids are used in the > reconciliation path, integrated using TopicIdPartition. When reconciling, > topic names are resolved via a metadata update, but they are also kept in a > local #MembershipManager cache. This local cache serves the purpose of > keeping assigned topicId-names (that might have been deleted from metadata, > ex. topic deleted). > That's just an initial step towards spreading topic IDs internally in the > consumer code. Next step to address with this task would be to include topic > IDs in the subscription state, so that assigned topicId-names can be accessed > from other components without the need of resolving names multiple times. > Note that this task aims only at spreading topic IDs internally in the > consumer, no changes to expose them at the API level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]
kirktrue commented on PR #15186: URL: https://github.com/apache/kafka/pull/15186#issuecomment-1915837088 Thanks @dajac! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
[ https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16010: -- Labels: consumer-threading-refactor consumer-timeout kip-848-client-support (was: consumer-threading-refactor kip-848) > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling > -- > > Key: KAFKA-16010 > URL: https://issues.apache.org/jira/browse/KAFKA-16010 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, consumer-timeout, > kip-848-client-support > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3] after one consumer left > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16204) Stray file core/00000000000000000001.snapshot created when running core tests
[ https://issues.apache.org/jira/browse/KAFKA-16204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16204: -- Labels: newbie newbie++ (was: ) > Stray file core/0001.snapshot created when running core tests > - > > Key: KAFKA-16204 > URL: https://issues.apache.org/jira/browse/KAFKA-16204 > Project: Kafka > Issue Type: Improvement > Components: core, unit tests >Reporter: Mickael Maison >Priority: Major > Labels: newbie, newbie++ > > When running the core tests I often get a file called > core/0001.snapshot created in my kafka folder. It looks like > one of the test does not clean its resources properly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16009: -- Labels: consumer-threading-refactor consumer-timeout kip-848-client-support (was: consumer-threading-refactor kip-848) > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, consumer-timeout, > kip-848-client-support > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16008: -- Labels: consumer-threading-refactor consumer-timeout (was: consumer-threading-refactor kip-848) > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, consumer-timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15475) Request might retry forever even if the user API timeout expires
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15475: -- Summary: Request might retry forever even if the user API timeout expires (was: Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer) > Request might retry forever even if the user API timeout expires > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15475) Request might retry forever even if the user API timeout expires
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15475: -- Labels: consumer-threading-refactor consumer-timeout (was: consumer-threading-refactor kip-848-preview) > Request might retry forever even if the user API timeout expires > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, consumer-timeout > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeout
[ https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15974: -- Labels: consumer-threading-refactor consumer-timeout kip-848-client-support (was: consumer-threading-refactor kip-848-client-support) > Enforce that events and requests respect user-provided timeout > -- > > Key: KAFKA-15974 > URL: https://issues.apache.org/jira/browse/KAFKA-15974 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, consumer-timeout, > kip-848-client-support > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Labels: consumer-threading-refactor consumer-timeout kip-848-client-support (was: consumer-threading-refactor kip-848-client-support kip-848-preview) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, consumer-timeout, > kip-848-client-support > Fix For: 3.8.0 > > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeout
[ https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15974: -- Summary: Enforce that events and requests respect user-provided timeout (was: Enforce that events and requests respect user-provided timeouts) > Enforce that events and requests respect user-provided timeout > -- > > Key: KAFKA-15974 > URL: https://issues.apache.org/jira/browse/KAFKA-15974 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16208) Design new Consumer timeout policy
Kirk True created KAFKA-16208: - Summary: Design new Consumer timeout policy Key: KAFKA-16208 URL: https://issues.apache.org/jira/browse/KAFKA-16208 Project: Kafka Issue Type: Task Components: clients, consumer, documentation Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 This task is to design and document the timeout policy for the new Consumer implementation. The documentation lives here: https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16199) Prune the event queue if event timeout expired before starting
[ https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16199: -- Labels: consumer-threading-refactor consumer-timeout (was: consumer-threading-refactor) > Prune the event queue if event timeout expired before starting > -- > > Key: KAFKA-16199 > URL: https://issues.apache.org/jira/browse/KAFKA-16199 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, consumer-timeout > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16199) Prune the event queue if event timeout expired before starting
[ https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16199: -- Summary: Prune the event queue if event timeout expired before starting (was: Prune the event queue if events have expired before starting) > Prune the event queue if event timeout expired before starting > -- > > Key: KAFKA-16199 > URL: https://issues.apache.org/jira/browse/KAFKA-16199 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16200) Ensure RequestManager handling of expired timeouts are consistent
[ https://issues.apache.org/jira/browse/KAFKA-16200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16200: -- Labels: consumer-threading-refactor consumer-timeout (was: consumer-threading-refactor) > Ensure RequestManager handling of expired timeouts are consistent > - > > Key: KAFKA-16200 > URL: https://issues.apache.org/jira/browse/KAFKA-16200 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, consumer-timeout > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Labels: consumer-threading-refactor consumer-timeout (was: consumer-threading-refactor) > Add timeout to all the CompletableApplicationEvents > --- > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, consumer-timeout > Fix For: 3.8.0 > > Original Estimate: 40h > Remaining Estimate: 40h > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]
cmccabe commented on code in PR #15270: URL: https://github.com/apache/kafka/pull/15270#discussion_r1470390923 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -166,6 +169,11 @@ class BrokerMetadataPublisher( Option(delta.featuresDelta()).foreach { featuresDelta => featuresDelta.metadataVersionChange().ifPresent{ metadataVersion => info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.") + if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) && metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) { +info("Resending BrokerRegistration with existing incarnation-id to inform the " + + "controller about log directories in the broker following metadata update") Review Comment: Please state the old and new metadata versions in the log message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]
cmccabe commented on PR #15270: URL: https://github.com/apache/kafka/pull/15270#issuecomment-1915796839 Thanks @gaurav-narula . The overall approach looks good to me. There is a consistent test failure here: ``` Gradle Test Run :core:test > Gradle Test Executor 2 > BrokerMetadataPublisherTest > testFindStrayReplicas() FAILED org.mockito.exceptions.misusing.UnfinishedVerificationException: Missing method call for verify(mock) here: -> at kafka.server.metadata.BrokerMetadataPublisherTest.testMetadataVersionUpdateToIBP_3_7_IV2OrAboveTriggersBrokerReRegistration(BrokerMetadataPublisherTest.scala:402) Example of correct verification: verify(mock).doSomething() Also, this error might show up because you verify either of: final/private/equals()/hashCode() methods. Those methods *cannot* be stubbed/verified. Mocking methods declared on non-public parent classes is not supported. at app//kafka.server.metadata.BrokerMetadataPublisherTest.mockLog(BrokerMetadataPublisherTest.scala:153) at app//kafka.server.metadata.BrokerMetadataPublisherTest.testFindStrayReplicas(BrokerMetadataPublisherTest.scala:100)``` I expect the test needs to be updated to fix the mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]
cmccabe commented on code in PR #15270: URL: https://github.com/apache/kafka/pull/15270#discussion_r1470390923 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -166,6 +169,11 @@ class BrokerMetadataPublisher( Option(delta.featuresDelta()).foreach { featuresDelta => featuresDelta.metadataVersionChange().ifPresent{ metadataVersion => info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.") + if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) && metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) { +info("Resending BrokerRegistration with existing incarnation-id to inform the " + + "controller about log directories in the broker following metadata update") Review Comment: Please list the old and new metadata versions here, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]
cmccabe commented on code in PR #15270: URL: https://github.com/apache/kafka/pull/15270#discussion_r1470390289 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -137,6 +139,7 @@ class BrokerMetadataPublisher( manifest: LoaderManifest ): Unit = { val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch() +val currentMetadataVersion = metadataCache.metadataVersion() Review Comment: There's no need to access metadataCache here. That would incur a performance penalty because you're dealing with volatiles. Instead, you can just look at `delta.image().features().metadataVersion()` (or something like that, I didn't check field names) `delta.image` is the previous metadata image (that the delta is built on top of) I would also recommend not doing this until you know you need the old offset (i.e. if featuresDelta != null) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]
cmccabe commented on code in PR #15270: URL: https://github.com/apache/kafka/pull/15270#discussion_r1470390289 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -137,6 +139,7 @@ class BrokerMetadataPublisher( manifest: LoaderManifest ): Unit = { val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch() +val currentMetadataVersion = metadataCache.metadataVersion() Review Comment: There's no need to access metadataCache here. That would incur a performance penalty because you're dealing with volatiles. Instead, you can just look at `delta.image().features().metadataVersion()` (or something like that, I didn't check field names) `delta.image` is the previous metadata image (that the delta is built on top of) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]
Phuc-Hong-Tran commented on code in PR #14327: URL: https://github.com/apache/kafka/pull/14327#discussion_r1470370840 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1573,6 +1589,33 @@ private void updateGroupsByTopics( } }); } +if (!oldSubscribedTopicRegex.isEmpty()) { +oldSubscribedTopicRegex.forEach(regex -> { +groupsByRegex.computeIfPresent(regex, (__, groupIds) -> { +groupIds.remove(groupId); +return groupIds.isEmpty() ? null : groupIds; +}); +Pattern pattern = Pattern.compile(regex); +for (String topicName : metadataImage.topics().topicsByName().keySet()) { +if (pattern.matcher(topicName).matches()) { +unsubscribeGroupFromTopic(groupId, topicName); +} +} +}); +} +if (!newSubscribedTopicRegex.isEmpty()) { +newSubscribedTopicRegex.forEach(regex -> { +groupsByRegex +.computeIfAbsent(regex, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) +.add(groupId); +Pattern pattern = Pattern.compile(regex); +for (String topicName : metadataImage.topics().topicsByName().keySet()) { +if (pattern.matcher(topicName).matches()) { +subscribeGroupToTopic(groupId, topicName); +} +} Review Comment: @dajac thanks for the explanation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller membership changes
[ https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14094: --- Summary: KIP-853: KRaft controller membership changes (was: KIP-853: KRaft controller memebership changes) > KIP-853: KRaft controller membership changes > > > Key: KAFKA-14094 > URL: https://issues.apache.org/jira/browse/KAFKA-14094 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: José Armando García Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: 4.0-blocker > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16207) Implement KRaft internal listener for KRaftVersionRecord and VotersRecord
José Armando García Sancio created KAFKA-16207: -- Summary: Implement KRaft internal listener for KRaftVersionRecord and VotersRecord Key: KAFKA-16207 URL: https://issues.apache.org/jira/browse/KAFKA-16207 Project: Kafka Issue Type: Sub-task Reporter: José Armando García Sancio Assignee: José Armando García Sancio -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on PR #15251: URL: https://github.com/apache/kafka/pull/15251#issuecomment-1915703872 > @apoorvmittal10 : Thanks for the updated PR. A few more comments. Thanks for the review and suggestions. I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16168; Implement GroupCoordinator.onPartitionsDeleted [kafka]
jolshan commented on code in PR #15237: URL: https://github.com/apache/kafka/pull/15237#discussion_r1470235881 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -2056,4 +2059,70 @@ public void testCompleteTransactionWithUnexpectedPartition() { assertFutureThrows(future, IllegalStateException.class); } + +@Test +public void testOnPartitionsDeleted() { +int partitionCount = 3; +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime, +new GroupCoordinatorMetrics() +); + +service.startup(() -> partitionCount); + +when(runtime.partitions()).thenReturn( +IntStream +.range(0, partitionCount) +.mapToObj(i -> new TopicPartition("__consumer_offsets", i)) +.collect(Collectors.toSet()) +); + +List> futures = IntStream +.range(0, partitionCount) +.mapToObj(__ -> new CompletableFuture()) +.collect(Collectors.toList()); + +IntStream.range(0, partitionCount).forEach(i -> { +CompletableFuture future = futures.get(i); +when(runtime.scheduleWriteOperation( +ArgumentMatchers.eq("on-partition-deleted"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), +ArgumentMatchers.eq(Duration.ofMillis(5000)), +ArgumentMatchers.any() +)).thenAnswer(__ -> future); +}); + +IntStream.range(0, partitionCount - 1).forEach(i -> { +futures.get(i).complete(null); +}); + +futures.get(partitionCount - 1).completeExceptionally(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()); + +// The exception is logged and swallowed. +assertDoesNotThrow(() -> Review Comment: Should we have any other validations that the partition is deleted or not? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -903,6 +906,45 @@ public boolean cleanupExpiredOffsets(String groupId, List records) { return allOffsetsExpired.get() && !openTransactionsByGroup.containsKey(groupId); } +/** + * Remove offsets of the partitions that have been deleted. + * + * @param topicPartitions The partitions that have been deleted. + * @return The list of tombstones (offset commit) to append. + */ +public List onPartitionsDeleted( +List topicPartitions +) { +List records = new ArrayList<>(); + +Map> partitionsByTopic = new HashMap<>(); +topicPartitions.forEach(tp -> partitionsByTopic +.computeIfAbsent(tp.topic(), __ -> new ArrayList<>()) +.add(tp.partition()) +); + +Consumer delete = offsets -> { Review Comment: This confused me for a bit because the Consumer here is not a kafka consumer but actually the java consumer class. These lines are creating a method called delete that takes all the offsets and appends the tombstone they need. Offsets (the variable) is overloaded a few times so it is a little confusing. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1001,8 +1002,26 @@ public void onTransactionCompleted( public void onPartitionsDeleted( List topicPartitions, BufferSupplier bufferSupplier -) { +) throws ExecutionException, InterruptedException { Review Comment: To clarify -- is onPartitionsDeleted only called when the topic behind the partition is deleted (not reassigned etc)? Or are there other cases? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -3050,6 +3060,53 @@ public void testOffsetDeletionsSensor() { verify(context.metrics).record(OFFSET_DELETIONS_SENSOR_NAME, 2); } +@Test +public void testOnPartitionsDeleted() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +// Commit offsets. +context.commitOffset("grp-0", "foo", 1, 100, 1, context.time.milliseconds()); +context.commitOffset("grp-0", "foo", 2, 200, 1, context.time.milliseconds()); +context.commitOffset("grp-0", "foo", 3, 300, 1, context.time.milliseconds()); + +context.commitOffset("grp-1", "bar", 1, 100, 1, context.time.milliseconds()); +context.commitOffset("grp-1", "bar", 2, 200, 1, context.time.milliseconds()); +context.commitOffset("grp-1", "bar", 3, 300, 1, context.time.milliseconds()); + +context.commitOffset(100L, "grp-2", "foo", 1, 100, 1, context.time.milliseconds()); +
Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]
cmccabe commented on PR #15193: URL: https://github.com/apache/kafka/pull/15193#issuecomment-1915696324 Thanks to everyone who reviewed this. It really was pretty messy (and buggy) originally, and I think it's pretty clear now. I have to commit this now to unblock the 3.7 release. If you have further suggestions for improvements, please do submit them (either a PR, message, or whatever). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]
cmccabe merged PR #15193: URL: https://github.com/apache/kafka/pull/15193 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470310097 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); +sensorsName.add(pluginErrorCount.name()); + +Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", +ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", +ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new Max()); +sensorsName.add(pluginExportTime.name()); +} + +public void recordUnknownSubscriptionCount(Uuid clientInstanceId) { +record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId); +} + +public void recordThrottleCount(Uuid clientInstanceId) { +record(THROTTLE, clientInstanceId); +} + +public void recordPluginExport(Uuid clientInstanceId, long timeMs) { +record(PLUGIN_EXPORT, clientInstanceId); +record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs); Review Comment: Valid suggestion, thanks for that. I didn't see that. I have made the change
Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]
cmccabe commented on PR #15193: URL: https://github.com/apache/kafka/pull/15193#issuecomment-1915685264 @ppatierno: yes, deprovisioning the kraft controller quorum and electing a zk-based controller should happen before rolling to remove `zookeeper.metadata.migration.enable`, etc. Should be fixed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470279663 ## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ## @@ -299,8 +332,8 @@ public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno // Create new client metrics manager which simulates a new server as it will not have any // last request information but request should succeed as subscription id should match // the one with new client instance. - -ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); +kafkaMetrics = new Metrics(); Review Comment: Agree, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470278382 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -288,6 +307,9 @@ private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInst ClientMetricsInstanceMetadata instanceMetadata) { ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); +// Maybe add client metrics, if metrics not already added. Metrics might be already added +// if the client instance was evicted from the cache because of size limit. Review Comment: The problem is that the eviction on size happens inside common LRUCache class where the code from this manager cannot be instrumented. I do not see any custom override that can be provided to LRUCache class. Having said that, there won't be any hanging additional instance metrics sensors when eviction happens on size as anyways time-based eviction will remove such instance metrics. Also, if the respective instance reports metrics prior time-based eviction, then the metrics numbers for that instance will be correctly reported as well. Hence, I do think we should not evict metrics sensors on size-based eviction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]
cmccabe commented on PR #15238: URL: https://github.com/apache/kafka/pull/15238#issuecomment-1915649469 committed, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]
cmccabe closed pull request #15238: KAFKA-16171 Fix hybrid mode controller race URL: https://github.com/apache/kafka/pull/15238 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470269365 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( Review Comment: I think you are right, I have removed the tag and moved the metric as global along instance count metric. Resolving the comment and would update the description of PR with change from the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]
mumrah commented on code in PR #15238: URL: https://github.com/apache/kafka/pull/15238#discussion_r1470254717 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -635,12 +635,24 @@ class BecomeZkControllerEvent extends MigrationEvent { public void run() throws Exception { if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, this)) { applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership); -if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) { +if (migrationLeadershipState.zkControllerEpochZkVersion() == -2) { Review Comment: There's already sort a constant in ZkMigrationLeadershipState.EMPTY -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]
mumrah commented on code in PR #15238: URL: https://github.com/apache/kafka/pull/15238#discussion_r1470254717 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -635,12 +635,24 @@ class BecomeZkControllerEvent extends MigrationEvent { public void run() throws Exception { if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, this)) { applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership); -if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) { +if (migrationLeadershipState.zkControllerEpochZkVersion() == -2) { Review Comment: There's already sort a constant in ZkMigrationLeadershipState.EMPTY -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]
cmccabe commented on code in PR #15238: URL: https://github.com/apache/kafka/pull/15238#discussion_r1470231664 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -635,12 +635,24 @@ class BecomeZkControllerEvent extends MigrationEvent { public void run() throws Exception { if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, this)) { applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership); -if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) { +if (migrationLeadershipState.zkControllerEpochZkVersion() == -2) { Review Comment: Can we have a constant for this like `ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]
cmccabe commented on code in PR #15238: URL: https://github.com/apache/kafka/pull/15238#discussion_r1470231305 ## metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java: ## @@ -139,14 +139,18 @@ public ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershi @Override public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) { -this.state = state; -return state; +if (state.zkControllerEpochZkVersion() == ZkMigrationLeadershipState.EMPTY.zkControllerEpochZkVersion()) { Review Comment: Can we just compare against -2 (or at least have a constant for this like `ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION`?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]
mumrah commented on code in PR #15193: URL: https://github.com/apache/kafka/pull/15193#discussion_r1470215533 ## docs/ops.html: ## @@ -3992,6 +3980,136 @@ Finalizing the migration # Other configs ... + Reverting to ZooKeeper mode During the Migration + +While the cluster is still in migration mode, it is possible to revert to ZooKeeper mode. The process +to follow depends on how far the migration has progressed. In order to find out how to revert, +select the final migration step that you have completed in this table. + + +Note that the directions given here assume that each step was fully completed, and they were +done in order. So, for example, we assume that if "Enter Migration Mode on the Brokers" was +completed, "Provisioning the KRaft controller quorum" was also fully completed previously. + + +If you did not fully complete any step, back out whatever you have done and then follow revert +directions for the last fully completed step. + + + + + +Final Migration Section Completed +Directions for Reverting +Notes + + +Preparing for migration + + The preparation section does not involve leaving ZooKeeper mode. So there is nothing to do in the + case of a revert. + + + + + +Provisioning the KRaft controller quorum + + + + Deprovision the KRaft controller quorum. + + + Then you are done. + + + + + + + +Enter Migration Mode on the brokers + + + + Perform a roll of the broker cluster. In this roll, remove Review Comment: Instead of "roll", how about "rolling restart" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
dajac commented on PR #15271: URL: https://github.com/apache/kafka/pull/15271#issuecomment-1915528645 Sure. Will review it tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]
dongnuo123 commented on code in PR #15268: URL: https://github.com/apache/kafka/pull/15268#discussion_r1470177662 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, List records) { } } +private JoinGroupRequestProtocol throwIfProtocolUnmatched( +ConsumerGroupMember member, +JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols +) { +for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : protocols) { +final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); +ConsumerProtocol.deserializeVersion(buffer); +final Optional generationId = ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId(); + +// If the generation id is provided, it must match the member epoch. +if (!generationId.isPresent() || generationId.get() == member.memberEpoch()) { +// TODO: need a list of all available server assignors +if (UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name()) +|| RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) { +return protocol; +} +} +} +throw new FencedMemberEpochException("The JoinGroup request doesn't have a matched generation id from a " + +"protocol supported by the server assignors with the epoch of the member known by the group coordinator (" + +member.memberEpoch() + ")."); +} + +private List transitionToConsumerGroupHeartbeatTopicPartitions( +List topicPartitions +) { +Map> topicMap = new HashMap<>(); +topicPartitions.forEach(tp -> +topicMap.computeIfAbsent(tp.topic(), __ -> new ArrayList<>()).add(tp.partition()) +); +return topicMap.entrySet().stream().map(item -> { +TopicImage topicImage = metadataImage.topics().getTopic(item.getKey()); +if (topicImage == null) { +throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic id of topic " + item.getKey() + "."); +} +return new ConsumerGroupHeartbeatRequestData.TopicPartitions() +.setTopicId(topicImage.id()) +.setPartitions(item.getValue()); +}).collect(Collectors.toList()); +} + +public CoordinatorResult upgradeGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs || +sessionTimeoutMs > classicGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +return EMPTY_RESULT; +} + +// Get or create the consumer group. +final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +throwIfConsumerGroupIsFull(group, memberId); + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +// new dynamic member. +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +member = group.getOrMaybeCreateMember(memberId, true); +newMemberCreated = !group.members().containsKey(memberId); +log.info("[GroupId {}] Member {} joins the consumer group.",
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru merged PR #15000: URL: https://github.com/apache/kafka/pull/15000 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1915503178 Test failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lucasbru commented on PR #15271: URL: https://github.com/apache/kafka/pull/15271#issuecomment-1915499373 @dajac Could you please take a look when you have time? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14725: Interrupt source task poll thread when cancelled [kafka]
C0urante commented on PR #14316: URL: https://github.com/apache/kafka/pull/14316#issuecomment-1915480693 Hmm... looks like this is causing a spike in CI failures. Even though all of the tests are known to be flaky, I'm going to try to look into the tests more closely before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]
C0urante commented on PR #15249: URL: https://github.com/apache/kafka/pull/15249#issuecomment-1915476191 [The next CI run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15249/7/tests/) also completed with no failures in the `ConnectorRestartApiIntegrationTest` suite; kicking off a third. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
jolshan commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1470117099 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,19 +816,19 @@ class ReplicaManager(val config: KafkaConfig, val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() -entriesPerPartition.foreach { case (topicPartition, records) => +entriesPerPartition.forKeyValue { (topicPartition, records) => // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) - if (!transactionalBatches.isEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) + if (transactionalBatches.nonEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) } if (transactionalProducerInfo.size > 1) { throw new InvalidPidMappingException("Transactional records contained more than one producer ID") } -def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors], - newRequestLocal: RequestLocal, - verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { +def postVerificationCallback(newRequestLocal: RequestLocal, + results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = { Review Comment: Ah right -- makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update jose4j to 0.9.4 to address CVE-2023-51775 [kafka]
mike-lloyd03 opened a new pull request, #15284: URL: https://github.com/apache/kafka/pull/15284 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) `org.bitbucket.b_c:jose4j` 0.9.3 is susceptible to Denial of Service per [CVE-2023-51775](https://www.cve.org/CVERecord?id=CVE-2023-51775). This PR updates `kafka` to use 0.9.4. Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
dajac commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1470109481 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,19 +816,19 @@ class ReplicaManager(val config: KafkaConfig, val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() -entriesPerPartition.foreach { case (topicPartition, records) => +entriesPerPartition.forKeyValue { (topicPartition, records) => // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) - if (!transactionalBatches.isEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) + if (transactionalBatches.nonEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) } if (transactionalProducerInfo.size > 1) { throw new InvalidPidMappingException("Transactional records contained more than one producer ID") } -def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors], - newRequestLocal: RequestLocal, - verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { +def postVerificationCallback(newRequestLocal: RequestLocal, + results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = { Review Comment: The function that wraps the callback only support wrapping methods with one argument. I could not find a better way to handle this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1470110954 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1062,10 +1062,10 @@ private static boolean isGroupIdNotEmpty(String groupId) { * @param operationName The name of the operation. * @param operationInputThe operation's input for logging purposes. * @param exception The exception to handle. - * @param handler A function which takes an Errors and a String and returns the expected + * @param handler A function which takes an Errors and a String and builds the expected * output. The String can be null. Note that the function could further * transform the error depending on the context. - * @return The response. + * @return The output build by the handler. Review Comment: nit: built? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
dajac commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1470109774 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala: ## @@ -85,6 +85,10 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics) groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions), false) + +// Transactional appends attempt to schedule to the request handler thread using +// a non request handler thread. Set this to avoid error. +KafkaRequestHandler.setBypassThreadCheck(true) Review Comment: That’s right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1470109704 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String groupId) { } /** - * Handles the exception in the scheduleWriteOperation. - * @return The Errors instance associated with the given exception. + * This is the handler commonly used by all the operations that requires to convert errors to + * coordinator errors. The handler also handles and log unexpected errors. + * + * @param requestName The name of the request. + * @param request The request itself for logging purposes. + * @param exception The exception to handle. + * @param responseBuilder A function which takes an Errors and a String and returns + * the response. The String can be null. + * @return The response. + * @param The type of the request. Review Comment: Ok -- i think if we include those comments in the description, that should be enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
dajac commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1470109481 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,19 +816,19 @@ class ReplicaManager(val config: KafkaConfig, val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() -entriesPerPartition.foreach { case (topicPartition, records) => +entriesPerPartition.forKeyValue { (topicPartition, records) => // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) - if (!transactionalBatches.isEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) + if (transactionalBatches.nonEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) } if (transactionalProducerInfo.size > 1) { throw new InvalidPidMappingException("Transactional records contained more than one producer ID") } -def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors], - newRequestLocal: RequestLocal, - verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { +def postVerificationCallback(newRequestLocal: RequestLocal, + results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = { Review Comment: The function that wraps the callback only support wrapping methods with one argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
dajac commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1470107871 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String groupId) { } /** - * Handles the exception in the scheduleWriteOperation. - * @return The Errors instance associated with the given exception. + * This is the handler commonly used by all the operations that requires to convert errors to + * coordinator errors. The handler also handles and log unexpected errors. + * + * @param requestName The name of the request. + * @param request The request itself for logging purposes. + * @param exception The exception to handle. + * @param responseBuilder A function which takes an Errors and a String and returns + * the response. The String can be null. + * @return The response. + * @param The type of the request. Review Comment: That’s right. The input must be toString’able. The output type is the type returned by the handler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
jolshan commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1470098563 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -982,24 +996,21 @@ class ReplicaManager(val config: KafkaConfig, producerId: Long, producerEpoch: Short, baseSequence: Int, -requestLocal: RequestLocal, -callback: (Errors, RequestLocal, VerificationGuard) => Unit +callback: ((Errors, VerificationGuard)) => Unit ): Unit = { -def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors], -newRequestLocal: RequestLocal, -verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { - callback( +def generalizedCallback(results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = { + val (preAppendErrors, verificationGuards) = results + callback(( Review Comment: nit: are the double `((` needed? I see it elsewhere so maybe I'm missing an aspect of the language. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]
jolshan commented on code in PR #15176: URL: https://github.com/apache/kafka/pull/15176#discussion_r1470096146 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -935,8 +935,12 @@ private[group] class GroupCoordinator( producerId, producerEpoch, RecordBatch.NO_SEQUENCE, - requestLocal, - postVerificationCallback + // Wrap the callback to be handled on an arbitrary request handler + // thread when transaction verification is complete. Review Comment: Should we leave a comment that the requestLocal passed in is only for the case where we execute immediately? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -982,24 +996,21 @@ class ReplicaManager(val config: KafkaConfig, producerId: Long, producerEpoch: Short, baseSequence: Int, -requestLocal: RequestLocal, -callback: (Errors, RequestLocal, VerificationGuard) => Unit +callback: ((Errors, VerificationGuard)) => Unit ): Unit = { -def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors], -newRequestLocal: RequestLocal, -verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { - callback( +def generalizedCallback(results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = { + val (preAppendErrors, verificationGuards) = results + callback(( Review Comment: nit: are the double `((` needed? I see it elsewhere so maybe I'm missing an aspect of the language. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,19 +816,19 @@ class ReplicaManager(val config: KafkaConfig, val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() -entriesPerPartition.foreach { case (topicPartition, records) => +entriesPerPartition.forKeyValue { (topicPartition, records) => // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) - if (!transactionalBatches.isEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) + if (transactionalBatches.nonEmpty) topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence) } if (transactionalProducerInfo.size > 1) { throw new InvalidPidMappingException("Transactional records contained more than one producer ID") } -def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors], - newRequestLocal: RequestLocal, - verificationGuards: Map[TopicPartition, VerificationGuard]): Unit = { +def postVerificationCallback(newRequestLocal: RequestLocal, + results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = { Review Comment: I don't have a problem with this refactor, but just curious what makes a tuple argument better than individual ones. ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala: ## @@ -85,6 +85,10 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics) groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions), false) + +// Transactional appends attempt to schedule to the request handler thread using +// a non request handler thread. Set this to avoid error. +KafkaRequestHandler.setBypassThreadCheck(true) Review Comment: Is this needed because we do the wrapper outside replica manager now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1470081608 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -997,11 +985,13 @@ void markReconciliationCompleted() { * * */ -private void resolveMetadataForUnresolvedAssignment() { -assignmentReadyToReconcile.clear(); +private SortedSet resolveMetadataForTargetAssignment() { +final SortedSet assignmentReadyToReconcile = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); +final HashMap> unresolved = new HashMap<>(currentTargetAssignment); + // Try to resolve topic names from metadata cache or subscription cache, and move // assignments from the "unresolved" collection, to the "readyToReconcile" one. Review Comment: Dropping it sounds good to me, agree with your what vs why view on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1470078455 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String groupId) { } /** - * Handles the exception in the scheduleWriteOperation. - * @return The Errors instance associated with the given exception. + * This is the handler commonly used by all the operations that requires to convert errors to + * coordinator errors. The handler also handles and log unexpected errors. + * + * @param requestName The name of the request. + * @param request The request itself for logging purposes. + * @param exception The exception to handle. + * @param responseBuilder A function which takes an Errors and a String and returns + * the response. The String can be null. + * @return The response. + * @param The type of the request. Review Comment: I'm wondering if the input is ever not a string (since it is just used for logging). I guess the out type is the type the handler returns? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
dajac commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1470072535 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,7 +524,7 @@ public CompletableFuture listGroups( .combineFutures(futures, ArrayList::new, List::addAll) .thenApply(groups -> new ListGroupsResponseData().setGroups(groups)) .exceptionally(exception -> handleOperationException( -"ListGroups", +"list-groups", Review Comment: Indeed, list groups lists all the groups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]
philipnee commented on PR #15216: URL: https://github.com/apache/kafka/pull/15216#issuecomment-1915374713 Hi @lucasbru - This PR is ready for review. I did a bit of refactor in this PR so not everything here is in the scope. Let me know what do you think! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1470066073 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,7 +524,7 @@ public CompletableFuture listGroups( .combineFutures(futures, ArrayList::new, List::addAll) .thenApply(groups -> new ListGroupsResponseData().setGroups(groups)) .exceptionally(exception -> handleOperationException( -"ListGroups", +"list-groups", Review Comment: this covers class and consumer groups? There are a few without the classic or consumer prefix so just wanted to confirm they all cover both if there is no prefix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1470066073 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -523,7 +524,7 @@ public CompletableFuture listGroups( .combineFutures(futures, ArrayList::new, List::addAll) .thenApply(groups -> new ListGroupsResponseData().setGroups(groups)) .exceptionally(exception -> handleOperationException( -"ListGroups", +"list-groups", Review Comment: this covers class and consumer groups? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470043472 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -288,6 +307,9 @@ private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInst ClientMetricsInstanceMetadata instanceMetadata) { ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); +// Maybe add client metrics, if metrics not already added. Metrics might be already added +// if the client instance was evicted from the cache because of size limit. Review Comment: Hmm, if we evict a client instance from LRU cache, should we remove the corresponding metrics? ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); +unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), +ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); +sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + +Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); +throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); +sensorsName.add(throttleCount.name()); + +Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); +pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); +sensorsName.add(pluginExportCount.name()); + +Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); +pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); +sensorsName.add(pluginErrorCount.name()); + +Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", +ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", +ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new
Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]
philipnee commented on code in PR #15216: URL: https://github.com/apache/kafka/pull/15216#discussion_r1470063334 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -671,18 +718,8 @@ private ConsumerConfig config() { return new ConsumerConfig(prop); } -private HeartbeatRequestManager createHeartbeatRequestManager() { Review Comment: dead code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]
philipnee commented on code in PR #15216: URL: https://github.com/apache/kafka/pull/15216#discussion_r1470058217 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java: ## @@ -26,20 +26,20 @@ import java.util.concurrent.TimeUnit; -public class KafkaConsumerMetrics implements AutoCloseable { +import static org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager.MetricGroupSuffix.CONSUMER; + +public class KafkaConsumerMetrics extends AbstractConsumerMetricsManager implements AutoCloseable { private final MetricName lastPollMetricName; private final Sensor timeBetweenPollSensor; private final Sensor pollIdleSensor; private final Sensor committedSensor; private final Sensor commitSyncSensor; -private final Metrics metrics; private long lastPollMs; private long pollStartMs; private long timeSinceLastPollMs; public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) { Review Comment: i'm not even sure why we have metricGrpPrefix from the beginning - it is not specified by the user and it seems to always be the default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1470001208 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -183,7 +183,7 @@ object HostedPartition { /** * This broker hosts the partition, but it is in an offline log directory. */ - final case class Offline(topicId: Option[Uuid]) extends HostedPartition + final case class Offline(partition: Option[Partition]) extends HostedPartition Review Comment: Actually ignore me. Partition wouldn't be available for `Offline` class all the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r146343 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -493,4 +519,123 @@ public void run() { } } } + +// Visible for testing +final class ClientMetricsStats { + +private static final String GROUP_NAME = "ClientMetrics"; + +// Visible for testing +static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; +static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; +static final String THROTTLE = "ClientMetricsThrottle"; +static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; +static final String PLUGIN_ERROR = "ClientMetricsPluginError"; +static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + +// Names of sensors that are registered through client instances. +private final Set sensorsName = ConcurrentHashMap.newKeySet(); +// List of metric names which are not specific to a client instance. Do not require thread +// safe structure as it will be populated only in constructor. +private final List registeredMetricNames = new ArrayList<>(); + +private final Set instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, +THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + +ClientMetricsStats() { +Measurable instanceCount = (config, now) -> clientInstanceCache.size(); +MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, +"The current number of client metric instances being managed by the broker"); +metrics.addMetric(instanceCountMetric, instanceCount); +registeredMetricNames.add(instanceCountMetric); +} + +public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { +// If one sensor of the metrics has been registered for the client instance, +// then all other sensors should have been registered; and vice versa. +if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { +return; +} + +Map tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + +Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( Review Comment: I don't think it's a good idea to tag with the client instance id. Imagine a situation in which an unbounded number of client instance IDs are erroneously being used. I don't think we want to build up knowledge of each of them individually, rather to count how many delinquent instances there are. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1469993396 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -183,7 +183,7 @@ object HostedPartition { /** * This broker hosts the partition, but it is in an offline log directory. */ - final case class Offline(topicId: Option[Uuid]) extends HostedPartition + final case class Offline(partition: Option[Partition]) extends HostedPartition Review Comment: if you are using `Partition` then maybe remove the Option as `Partition.topicId` already return option -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16206) KRaftMigrationZkWriter tries to delete deleted topic configs twice
[ https://issues.apache.org/jira/browse/KAFKA-16206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16206: - Summary: KRaftMigrationZkWriter tries to delete deleted topic configs twice (was: ZkConfigMigrationClient tries to delete topic configs twice) > KRaftMigrationZkWriter tries to delete deleted topic configs twice > -- > > Key: KAFKA-16206 > URL: https://issues.apache.org/jira/browse/KAFKA-16206 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Reporter: David Arthur >Priority: Minor > > When deleting a topic, we see spurious ERROR logs from > kafka.zk.migration.ZkConfigMigrationClient: > > {code:java} > Did not delete ConfigResource(type=TOPIC, name='xxx') since the node did not > exist. {code} > This seems to happen because ZkTopicMigrationClient#deleteTopic is deleting > the topic, partitions, and config ZNodes in one shot. Subsequent calls from > KRaftMigrationZkWriter to delete the config encounter a NO_NODE since the > ZNode is already gone. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers
[ https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16171: - Component/s: controller > Controller failover during ZK migration can prevent metadata updates to ZK > brokers > -- > > Key: KAFKA-16171 > URL: https://issues.apache.org/jira/browse/KAFKA-16171 > Project: Kafka > Issue Type: Bug > Components: controller, kraft, migration >Affects Versions: 3.6.0, 3.7.0, 3.6.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > > h2. Description > During the ZK migration, after KRaft becomes the active controller we enter a > state called hybrid mode. This means we have a mixture of ZK and KRaft > brokers. The KRaft controller updates the ZK brokers using the deprecated > controller RPCs (LeaderAndIsr, UpdateMetadata, etc). > > A race condition exists where the KRaft controller will get stuck in a retry > loop while initializing itself after a failover which prevents it from > sending these RPCs to ZK brokers. > h2. Impact > Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK > brokers will not receive any metadata updates. The ZK brokers will be able to > send requests to the controller (such as AlterPartitions), but the metadata > updates which come as a result of those requests will never be seen. This > essentially looks like the controller is unavailable from the ZK brokers > perspective. > h2. Detection and Mitigation > This bug can be seen by observing failed ZK writes from a recently elected > controller. > The tell-tale error message is: > {code:java} > Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This > indicates that another KRaft controller is making writes to ZooKeeper. {code} > with a stacktrace like: > {noformat} > java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. > Expected zkVersion = 507823. This indicates that another KRaft controller is > making writes to ZooKeeper. > at > kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613) > at > kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639) > at > kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664) > at > scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) > at > scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) > at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) > at > kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664) > at > kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158) > at > kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:1583) > at > org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat} > To mitigate this problem, a new KRaft controller should be elected. This can > be done by restarting the problematic active controller. To verify that the > new controller does not encounter the
[jira] [Updated] (KAFKA-16205) Reduce number of metadata requests during hybrid mode
[ https://issues.apache.org/jira/browse/KAFKA-16205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16205: - Component/s: controller > Reduce number of metadata requests during hybrid mode > - > > Key: KAFKA-16205 > URL: https://issues.apache.org/jira/browse/KAFKA-16205 > Project: Kafka > Issue Type: Improvement > Components: controller, kraft, migration >Affects Versions: 3.4.0, 3.5.0, 3.6.0, 3.7.0 >Reporter: David Arthur >Priority: Major > > When migrating a cluster with a high number of brokers and partitions, it is > possible for the controller channel manager queue to get backed up. This can > happen when many small RPCs are generated in response to several small > MetadataDeltas being handled MigrationPropagator. > > In the ZK controller, various optimizations have been made over the years to > reduce the number of UMR and LISR sent during controlled shutdown or other > large metadata events. For the ZK to KRaft migration, we use the > MetadataLoader infrastructure to learn about and propagate metadata to ZK > brokers. > > We need to improve the batching in MigrationPropagator to avoid performance > issues during the migration of large clusters. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16205) Reduce number of metadata requests during hybrid mode
[ https://issues.apache.org/jira/browse/KAFKA-16205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16205: - Component/s: (was: controller) > Reduce number of metadata requests during hybrid mode > - > > Key: KAFKA-16205 > URL: https://issues.apache.org/jira/browse/KAFKA-16205 > Project: Kafka > Issue Type: Improvement > Components: kraft, migration >Affects Versions: 3.4.0, 3.5.0, 3.6.0, 3.7.0 >Reporter: David Arthur >Priority: Major > > When migrating a cluster with a high number of brokers and partitions, it is > possible for the controller channel manager queue to get backed up. This can > happen when many small RPCs are generated in response to several small > MetadataDeltas being handled MigrationPropagator. > > In the ZK controller, various optimizations have been made over the years to > reduce the number of UMR and LISR sent during controlled shutdown or other > large metadata events. For the ZK to KRaft migration, we use the > MetadataLoader infrastructure to learn about and propagate metadata to ZK > brokers. > > We need to improve the batching in MigrationPropagator to avoid performance > issues during the migration of large clusters. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16205) Reduce number of metadata requests during hybrid mode
[ https://issues.apache.org/jira/browse/KAFKA-16205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16205: - Component/s: migration > Reduce number of metadata requests during hybrid mode > - > > Key: KAFKA-16205 > URL: https://issues.apache.org/jira/browse/KAFKA-16205 > Project: Kafka > Issue Type: Improvement > Components: controller, kraft, migration >Affects Versions: 3.4.0, 3.5.0, 3.6.0, 3.7.0 >Reporter: David Arthur >Priority: Major > > When migrating a cluster with a high number of brokers and partitions, it is > possible for the controller channel manager queue to get backed up. This can > happen when many small RPCs are generated in response to several small > MetadataDeltas being handled MigrationPropagator. > > In the ZK controller, various optimizations have been made over the years to > reduce the number of UMR and LISR sent during controlled shutdown or other > large metadata events. For the ZK to KRaft migration, we use the > MetadataLoader infrastructure to learn about and propagate metadata to ZK > brokers. > > We need to improve the batching in MigrationPropagator to avoid performance > issues during the migration of large clusters. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers
[ https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16171: - Component/s: migration (was: controller) > Controller failover during ZK migration can prevent metadata updates to ZK > brokers > -- > > Key: KAFKA-16171 > URL: https://issues.apache.org/jira/browse/KAFKA-16171 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.6.0, 3.7.0, 3.6.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > > h2. Description > During the ZK migration, after KRaft becomes the active controller we enter a > state called hybrid mode. This means we have a mixture of ZK and KRaft > brokers. The KRaft controller updates the ZK brokers using the deprecated > controller RPCs (LeaderAndIsr, UpdateMetadata, etc). > > A race condition exists where the KRaft controller will get stuck in a retry > loop while initializing itself after a failover which prevents it from > sending these RPCs to ZK brokers. > h2. Impact > Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK > brokers will not receive any metadata updates. The ZK brokers will be able to > send requests to the controller (such as AlterPartitions), but the metadata > updates which come as a result of those requests will never be seen. This > essentially looks like the controller is unavailable from the ZK brokers > perspective. > h2. Detection and Mitigation > This bug can be seen by observing failed ZK writes from a recently elected > controller. > The tell-tale error message is: > {code:java} > Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This > indicates that another KRaft controller is making writes to ZooKeeper. {code} > with a stacktrace like: > {noformat} > java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. > Expected zkVersion = 507823. This indicates that another KRaft controller is > making writes to ZooKeeper. > at > kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613) > at > kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639) > at > kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664) > at > scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) > at > scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) > at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) > at > kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664) > at > kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158) > at > kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436) > at > org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115) > at > org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:1583) > at > org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat} > To mitigate this problem, a new KRaft controller should be elected. This can > be done by restarting the problematic active controller. To verify that the > new