Re: [PR] MINOR: KIP-848 Uniform Assignor Bugs [kafka]

2024-01-29 Thread via GitHub


dajac commented on PR #15286:
URL: https://github.com/apache/kafka/pull/15286#issuecomment-1916253886

   Thanks @rreddy-22! Could you please describe the bug(s) in the description? 
Do we need to add new tests for them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]

2024-01-29 Thread via GitHub


showuon commented on code in PR #15273:
URL: https://github.com/apache/kafka/pull/15273#discussion_r1470688447


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -290,7 +320,15 @@ public Optional serverConfigName(String 
configName) {
 .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
 TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
 .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
-TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+.define(LOG_DIR_PROP, STRING, DEFAULT_LOG_DIR, HIGH, LOG_DIR_DOC)
+.define(LOG_DIRS_PROP, STRING, null, HIGH, LOG_DIRS_DOC)
+.define(METADATA_LOG_DIR_PROP, STRING, null, HIGH, 
METADATA_LOG_DIR_DOC)
+.define(INTER_BROKER_PROTOCOL_VERSION_PROP, STRING, 
DEFAULT_INTER_BROKER_PROTOCOL_VERSION, new MetadataVersionValidator(), MEDIUM, 
INTER_BROKER_PROTOCOL_VERSION_DOC)
+// This indicates whether unreleased APIs should be advertised by 
this node.
+.defineInternal(UNSTABLE_API_VERSIONS_ENABLE_PROP, BOOLEAN, 
DEFAULT_UNSTABLE_API_VERSIONS_ENABLE, HIGH)
+// This indicates whether unreleased MetadataVersions should be 
enabled on this node.
+.defineInternal(UNSTABLE_METADATA_VERSIONS_ENABLE_PROP, BOOLEAN, 
DEFAULT_UNSTABLE_METADATA_VERSIONS_ENABLE, HIGH);

Review Comment:
   I didn't see where we remove the definition from original kafkaConfig?



##
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
+private static Set parseProcessRoles(List processRoles, 
Map voterConnections, int nodeId) {

Review Comment:
   Where do we remove the original `parseProcessRoles` method?



##
core/src/test/scala/unit/kafka/log/LogConfigTest.scala:
##
@@ -94,6 +94,18 @@ class LogConfigTest {
   case TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_boolean")
   case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
   case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-3")
+  case LogConfig.LOG_DIR_PROP => assert(true)
+  case LogConfig.LOG_DIRS_PROP => assert(true)

Review Comment:
   What are we testing here?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -500,6 +553,34 @@ public static Map configKeys() {
 return Collections.unmodifiableMap(CONFIG.configKeys());
 }
 
+public List logDirs() {
+String csvList = logDirs != null ? logDirs : logDir;
+if (csvList == null || csvList.isEmpty()) {
+return Collections.emptyList();
+} else {
+return Arrays.stream(csvList.split("\\s*,\\s*"))
+.filter(v -> !v.equals(""))
+.collect(Collectors.toList());
+}
+}
+
+public String getMetadataLogDir() {
+if (metadataLogDir != null) {
+return metadataLogDir;
+} else {
+return logDirs().get(0);
+}
+}
+
+public Optional interBrokerProtocolVersion() {
+String originalIBP = (String) 
originals().get(LogConfig.INTER_BROKER_PROTOCOL_VERSION_PROP);
+return originalIBP != null ? Optional.of(originalIBP) : 
Optional.empty();
+}
+
+public Boolean unstableMetadataVersionsEnabled() {
+return unstableMetadataVersionsEnabled;
+}

Review Comment:
   Why do we need these methods?



##
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##
@@ -206,6 +245,27 @@ private static Integer parseVoterId(String idString) {
 }
 }
 
+private static Set parseProcessRoles(List processRoles, 
Map voterConnections, int nodeId) {
+Set distinctRoles = new HashSet<>();
+for (String role : processRoles) {
+switch (role) {
+case "broker":
+distinctRoles.add("BrokerRole");

Review Comment:
   Why don't we use the `ProcessRole` as before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


showuon commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1470669307


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig,
   delta: TopicsDelta,
   topicId: Uuid): Option[(Partition, 
Boolean)] = {
 getPartition(tp) match {
-  case HostedPartition.Offline =>
-stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
-  s"with topic id $topicId because it resides in an offline log " +
-  "directory.")
-None
+  case HostedPartition.Offline(offlinePartition) =>
+if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) {
+  stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
+s"with topic id $topicId because it resides in an offline log " +
+"directory.")
+  None
+} else {
+  stateChangeLogger.info(s"Creating new partition $tp with topic id " 
+ s"$topicId." +
+s"A topic with the same name but different id exists but it 
resides in an offline log " +
+s"directory.")

Review Comment:
   Question: From this check `if (offlinePartition.flatMap(p => 
p.topicId).contains(topicId))`, we can make sure this partition is not in an 
offline dir, but how could we know if the partition is an Online dir, or an 
None dir, or even in another offline dir? Should we use topicIDPartition as the 
key for `allPartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on PR #15230:
URL: https://github.com/apache/kafka/pull/15230#issuecomment-1916170746

   committed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]

2024-01-29 Thread via GitHub


cmccabe closed pull request #15230: KAFKA-14616: Fix stray replica of recreated 
topics in KRaft mode
URL: https://github.com/apache/kafka/pull/15230


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on PR #15230:
URL: https://github.com/apache/kafka/pull/15230#issuecomment-1916148593

   > Question: why can't we delete these disappeared(deleted) logs directly? I 
think this can only happen when topic deleting while the node is offline. If 
so, then deleting them should be fine?
   
   It probably would be fine, but I'm a bit nervous about it. What I'd really 
like is to delete them after a time delay of a few hours, but the code doesn't 
support that currently. We should add this functionality.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14616: Fix stray replica of recreated topics in KRaft mode [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on code in PR #15230:
URL: https://github.com/apache/kafka/pull/15230#discussion_r1470637685


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -563,6 +565,18 @@ class LogManager(logDirs: Seq[File],
 startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames))
   }
 
+  def deleteStrayKRaftReplicas(
+brokerId: Int,
+image: TopicsImage
+  ): Unit = {
+val strayPartitions = findStrayReplicas(brokerId, image, allLogs)
+strayPartitions.foreach(topicPartition => {
+  val log = getLog(topicPartition).get
+  log.renameDir(UnifiedLog.logStrayDirName(topicPartition), 
shouldReinitialize = true)
+  asyncDelete(topicPartition, false, false, true)

Review Comment:
   OK. I will delete the call to `log.renameDir`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8

2024-01-29 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812157#comment-17812157
 ] 

Luke Chen commented on KAFKA-16209:
---

Thanks [~goyarpit]!

> fetchSnapshot might return null if topic is created before v2.8
> ---
>
> Key: KAFKA-16209
> URL: https://issues.apache.org/jira/browse/KAFKA-16209
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.1
>Reporter: Luke Chen
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Remote log manager will fetch snapshot via ProducerStateManager 
> [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608],
>  but the snapshot map might get nothing if the topic has no snapshot created, 
> ex: topics before v2.8. Need to fix it to avoid NPE.
> old PR: https://github.com/apache/kafka/pull/14615/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-01-29 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812155#comment-17812155
 ] 

Kamal Chandraprakash commented on KAFKA-15776:
--

I've opened a KIP to add new `fetch.remote.max.wait.ms` dynamic config: 
[KIP-1018|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1018%3A+Introduce+max+remote+fetch+timeout+config+for+DelayedRemoteFetch+requests].
 Please post your feedback and suggestions on the mailing thread.

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-4759) Add support for subnet masks in SimpleACLAuthorizer

2024-01-29 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812144#comment-17812144
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-4759:
-

Thanks [~theszym] ! I wasn't aware of the largely discussed KIP backing this 
issue and PR.

I pretty much agree with Tom[1] on how to approach further discussion, though.

As the KIP has been dormant for a while, I'd suggest either check if author 
(Sönke Liebau) has any intention to progress the original discussion, or/and 
creating a new KIP with a proposal including how to address the challenges open 
on the original KIP (e.g. backward compatibility issues).


PS. It may be a good time to bump this discussion given the current 
KRaft/Zookeeper-removal.

[1] https://lists.apache.org/thread/jp974bd05fsdwrws7kbprykdw9g65dt1

> Add support for subnet masks in SimpleACLAuthorizer
> ---
>
> Key: KAFKA-4759
> URL: https://issues.apache.org/jira/browse/KAFKA-4759
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Shun Takebayashi
>Assignee: Shun Takebayashi
>Priority: Major
>
> SimpleACLAuthorizer currently accepts only single IP addresses.
> Supporting subnet masks with SimpleACLAuthorizer can make ACL configurations 
> simpler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


showuon commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1470557475


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   Yes, I agree with @OmniaGM  that we don't need to branch if/else here since 
we already do the handling in `createLogIfNotExists`. So we only need to pass 
in `partitionState.isNew` correctly here. Does that make sense?



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   Also, before this change, we'll always treat it as `new` if `directoryId == 
DirectoryId.UNASSIGNED`. But now, it needs to be `directoryId == 
DirectoryId.UNASSIGNED && partitionState.isNew`. Will that cause other issue 
@OmniaGM ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8

2024-01-29 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812139#comment-17812139
 ] 

Arpit Goyal commented on KAFKA-16209:
-

[~showuon] I am picking it up. 

> fetchSnapshot might return null if topic is created before v2.8
> ---
>
> Key: KAFKA-16209
> URL: https://issues.apache.org/jira/browse/KAFKA-16209
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.1
>Reporter: Luke Chen
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Remote log manager will fetch snapshot via ProducerStateManager 
> [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608],
>  but the snapshot map might get nothing if the topic has no snapshot created, 
> ex: topics before v2.8. Need to fix it to avoid NPE.
> old PR: https://github.com/apache/kafka/pull/14615/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8

2024-01-29 Thread Arpit Goyal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arpit Goyal reassigned KAFKA-16209:
---

Assignee: Arpit Goyal

> fetchSnapshot might return null if topic is created before v2.8
> ---
>
> Key: KAFKA-16209
> URL: https://issues.apache.org/jira/browse/KAFKA-16209
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.1
>Reporter: Luke Chen
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Remote log manager will fetch snapshot via ProducerStateManager 
> [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608],
>  but the snapshot map might get nothing if the topic has no snapshot created, 
> ex: topics before v2.8. Need to fix it to avoid NPE.
> old PR: https://github.com/apache/kafka/pull/14615/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-29 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1470555356


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -452,21 +453,38 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+Set statesFilter,
+Set typesFilter,
+long committedOffset
+) {
+// Converts each string to a value in the GroupType enum while being 
case-insensitive.
+Set enumTypesFilter = typesFilter.stream()
+.map(Group.GroupType::parse)
+.collect(Collectors.toSet());

Review Comment:
   I initially threw the illegalArgException and then caught it and ignored it 
in the listGroups method but this caused the opposite result from what we 
wanted aka it returns all the groups since the typeFilter is empty. The only 
way to get the expected behavior is to add unknown to the group type enum. This 
gives us the same result as when an unknown state filter is passed. I even 
added tests to make sure. Lmk if this doesn't work or if there was something 
else you had in mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]

2024-01-29 Thread via GitHub


JimmyWang6 commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-1916021518

   > h level comments to start with. Are you on the Apache slack? We could also 
discuss there offline if you want.
   
   @dajac 
   Thanks for your reply!
   I will remove this part of code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]

2024-01-29 Thread via GitHub


JimmyWang6 commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-1916020627

   > As a first step, it would be great if we could keep this pull request 
focused on the RPC and its implementation on the server side. I would extract 
the command line tool part into its own pull request. Would it be possible?
   
   Thanks for your reply!
   I will remove this part of code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptor [kafka]

2024-01-29 Thread via GitHub


Joker-5 commented on PR #14963:
URL: https://github.com/apache/kafka/pull/14963#issuecomment-1916017056

   Hey, @lucasbru, I saw [this pr](https://github.com/apache/kafka/pull/15000) 
had been merged, but it seems there's no `Co-authord-by` tag in the final 
commit. So there may something wrong in it?
   I would be extremely grateful if you would tell me about it if you have time.
   Feel so sorry to trouble you again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-29 Thread via GitHub


dengziming merged PR #14846:
URL: https://github.com/apache/kafka/pull/14846


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-01-29 Thread via GitHub


github-actions[bot] commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-1916012508

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix NPE during fetchSnapshot [kafka]

2024-01-29 Thread via GitHub


showuon commented on PR #14615:
URL: https://github.com/apache/kafka/pull/14615#issuecomment-1916003844

   [KAFKA-16209](https://issues.apache.org/jira/browse/KAFKA-16209) is created 
for this issue. Closing this stale PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix NPE during fetchSnapshot [kafka]

2024-01-29 Thread via GitHub


showuon closed pull request #14615: MINOR: Fix NPE during fetchSnapshot
URL: https://github.com/apache/kafka/pull/14615


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8

2024-01-29 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16209:
-

 Summary: fetchSnapshot might return null if topic is created 
before v2.8
 Key: KAFKA-16209
 URL: https://issues.apache.org/jira/browse/KAFKA-16209
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.1
Reporter: Luke Chen


Remote log manager will fetch snapshot via ProducerStateManager 
[here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608],
 but the snapshot map might get nothing if the topic has no snapshot created, 
ex: topics before v2.8. Need to fix it to avoid NPE.

old PR: https://github.com/apache/kafka/pull/14615/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8

2024-01-29 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16209:
--
Labels: newbie newbie++  (was: )

> fetchSnapshot might return null if topic is created before v2.8
> ---
>
> Key: KAFKA-16209
> URL: https://issues.apache.org/jira/browse/KAFKA-16209
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.1
>Reporter: Luke Chen
>Priority: Major
>  Labels: newbie, newbie++
>
> Remote log manager will fetch snapshot via ProducerStateManager 
> [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608],
>  but the snapshot map might get nothing if the topic has no snapshot created, 
> ex: topics before v2.8. Need to fix it to avoid NPE.
> old PR: https://github.com/apache/kafka/pull/14615/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR kip 848 uniform assignor bugs [kafka]

2024-01-29 Thread via GitHub


rreddy-22 opened a new pull request, #15286:
URL: https://github.com/apache/kafka/pull/15286

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-01-29 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812112#comment-17812112
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-15776:
--

Totally agree – was just trying to highlight the implications for not 
interrupting the threads; but I'm also in favor of improving configurations to 
deal with remote fetches.

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15839) Topic ID integration in consumer subscription state

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15839:
--
Labels: kip-848 kip-848-client-support  (was: kip-848 
kip-848-client-support kip-848-e2e kip-848-preview)

> Topic ID integration in consumer subscription state
> ---
>
> Key: KAFKA-15839
> URL: https://issues.apache.org/jira/browse/KAFKA-15839
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> With the new consumer group protocol, assignments received by the consumer 
> contain topic IDs instead of topic names. Topic Ids are used in the 
> reconciliation path, integrated using TopicIdPartition. When reconciling, 
> topic names are resolved via a metadata update, but they are also kept in a 
> local #MembershipManager cache. This local cache serves the purpose of 
> keeping assigned topicId-names (that might have been deleted from metadata, 
> ex. topic deleted). 
> That's just an initial step towards spreading topic IDs internally in the 
> consumer code. Next step to address with this task would be to include topic 
> IDs in the subscription state, so that assigned topicId-names can be accessed 
> from other components without the need of resolving names multiple times.
> Note that this task aims only at spreading topic IDs internally in the 
> consumer, no changes to expose them at the API level. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-29 Thread via GitHub


kirktrue commented on PR #15186:
URL: https://github.com/apache/kafka/pull/15186#issuecomment-1915837088

   Thanks @dajac!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16010:
--
Labels: consumer-threading-refactor consumer-timeout kip-848-client-support 
 (was: consumer-threading-refactor kip-848)

> Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
> --
>
> Key: KAFKA-16010
> URL: https://issues.apache.org/jira/browse/KAFKA-16010
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Did not get valid assignment for 
> partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, 
> topic1-0, topic1-3] after one consumer left
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286)
>   at 
> kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883)
>   at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281)
> {code}
> The logs include these lines:
>  
> {code}
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16204) Stray file core/00000000000000000001.snapshot created when running core tests

2024-01-29 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16204:
--
Labels: newbie newbie++  (was: )

> Stray file core/0001.snapshot created when running core tests
> -
>
> Key: KAFKA-16204
> URL: https://issues.apache.org/jira/browse/KAFKA-16204
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, unit tests
>Reporter: Mickael Maison
>Priority: Major
>  Labels: newbie, newbie++
>
> When running the core tests I often get a file called 
> core/0001.snapshot created in my kafka folder. It looks like 
> one of the test does not clean its resources properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16009:
--
Labels: consumer-threading-refactor consumer-timeout kip-848-client-support 
 (was: consumer-threading-refactor kip-848)

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16008:
--
Labels: consumer-threading-refactor consumer-timeout  (was: 
consumer-threading-refactor kip-848)

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Request might retry forever even if the user API timeout expires

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Summary: Request might retry forever even if the user API timeout expires  
(was: Timeout request might retry forever even if the user API times out in 
PrototypeAsyncConsumer)

> Request might retry forever even if the user API timeout expires
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15475) Request might retry forever even if the user API timeout expires

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15475:
--
Labels: consumer-threading-refactor consumer-timeout  (was: 
consumer-threading-refactor kip-848-preview)

> Request might retry forever even if the user API timeout expires
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, consumer-timeout
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeout

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Labels: consumer-threading-refactor consumer-timeout kip-848-client-support 
 (was: consumer-threading-refactor kip-848-client-support)

> Enforce that events and requests respect user-provided timeout
> --
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, consumer-timeout, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Labels: consumer-threading-refactor consumer-timeout kip-848-client-support 
 (was: consumer-threading-refactor kip-848-client-support kip-848-preview)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, consumer-timeout, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeout

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Summary: Enforce that events and requests respect user-provided timeout  
(was: Enforce that events and requests respect user-provided timeouts)

> Enforce that events and requests respect user-provided timeout
> --
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16208) Design new Consumer timeout policy

2024-01-29 Thread Kirk True (Jira)
Kirk True created KAFKA-16208:
-

 Summary: Design new Consumer timeout policy
 Key: KAFKA-16208
 URL: https://issues.apache.org/jira/browse/KAFKA-16208
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer, documentation
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


This task is to design and document the timeout policy for the new Consumer 
implementation.

The documentation lives here: 
https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16199) Prune the event queue if event timeout expired before starting

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16199:
--
Labels: consumer-threading-refactor consumer-timeout  (was: 
consumer-threading-refactor)

> Prune the event queue if event timeout expired before starting
> --
>
> Key: KAFKA-16199
> URL: https://issues.apache.org/jira/browse/KAFKA-16199
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, consumer-timeout
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16199) Prune the event queue if event timeout expired before starting

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16199:
--
Summary: Prune the event queue if event timeout expired before starting  
(was: Prune the event queue if events have expired before starting)

> Prune the event queue if event timeout expired before starting
> --
>
> Key: KAFKA-16199
> URL: https://issues.apache.org/jira/browse/KAFKA-16199
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16200) Ensure RequestManager handling of expired timeouts are consistent

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16200:
--
Labels: consumer-threading-refactor consumer-timeout  (was: 
consumer-threading-refactor)

> Ensure RequestManager handling of expired timeouts are consistent
> -
>
> Key: KAFKA-16200
> URL: https://issues.apache.org/jira/browse/KAFKA-16200
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, consumer-timeout
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents

2024-01-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16100:
--
Labels: consumer-threading-refactor consumer-timeout  (was: 
consumer-threading-refactor)

> Add timeout to all the CompletableApplicationEvents
> ---
>
> Key: KAFKA-16100
> URL: https://issues.apache.org/jira/browse/KAFKA-16100
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, consumer-timeout
> Fix For: 3.8.0
>
>   Original Estimate: 40h
>  Remaining Estimate: 40h
>
> The handling of timeouts and responses for the various kinds of 
> ApplicationEvents in the new consumer is not consistent. A small amount of 
> refactoring would make the code more maintainable and give consistent 
> behaviour for the different requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on code in PR #15270:
URL: https://github.com/apache/kafka/pull/15270#discussion_r1470390923


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -166,6 +169,11 @@ class BrokerMetadataPublisher(
   Option(delta.featuresDelta()).foreach { featuresDelta =>
 featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
   info(s"Updating metadata.version to 
${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")
+  if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) 
&& metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
+info("Resending BrokerRegistration with existing incarnation-id to 
inform the " +
+  "controller about log directories in the broker following 
metadata update")

Review Comment:
   Please state the old and new metadata versions in the log message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on PR #15270:
URL: https://github.com/apache/kafka/pull/15270#issuecomment-1915796839

   Thanks @gaurav-narula . The overall approach looks good to me. 
   
   There is a consistent test failure here:
   ```
   Gradle Test Run :core:test > Gradle Test Executor 2 > 
BrokerMetadataPublisherTest > testFindStrayReplicas() FAILED
   org.mockito.exceptions.misusing.UnfinishedVerificationException:
   Missing method call for verify(mock) here:
   -> at 
kafka.server.metadata.BrokerMetadataPublisherTest.testMetadataVersionUpdateToIBP_3_7_IV2OrAboveTriggersBrokerReRegistration(BrokerMetadataPublisherTest.scala:402)
   
   Example of correct verification:
   verify(mock).doSomething()
   
   Also, this error might show up because you verify either of: 
final/private/equals()/hashCode() methods.
   Those methods *cannot* be stubbed/verified.
   Mocking methods declared on non-public parent classes is not supported.
   at 
app//kafka.server.metadata.BrokerMetadataPublisherTest.mockLog(BrokerMetadataPublisherTest.scala:153)
   at 
app//kafka.server.metadata.BrokerMetadataPublisherTest.testFindStrayReplicas(BrokerMetadataPublisherTest.scala:100)```
   
   I expect the test needs to be updated to fix the mock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on code in PR #15270:
URL: https://github.com/apache/kafka/pull/15270#discussion_r1470390923


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -166,6 +169,11 @@ class BrokerMetadataPublisher(
   Option(delta.featuresDelta()).foreach { featuresDelta =>
 featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
   info(s"Updating metadata.version to 
${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")
+  if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) 
&& metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
+info("Resending BrokerRegistration with existing incarnation-id to 
inform the " +
+  "controller about log directories in the broker following 
metadata update")

Review Comment:
   Please list the old and new metadata versions here, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on code in PR #15270:
URL: https://github.com/apache/kafka/pull/15270#discussion_r1470390289


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -137,6 +139,7 @@ class BrokerMetadataPublisher(
 manifest: LoaderManifest
   ): Unit = {
 val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
+val currentMetadataVersion = metadataCache.metadataVersion()

Review Comment:
   There's no need to access metadataCache here. That would incur a performance 
penalty because you're dealing with volatiles. Instead, you can just look at 
`delta.image().features().metadataVersion()` (or something like that, I didn't 
check field names)
   
   `delta.image` is the previous metadata image (that the delta is built on top 
of)
   
   I would also recommend not doing this until you know you need the old offset 
(i.e. if featuresDelta != null)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on code in PR #15270:
URL: https://github.com/apache/kafka/pull/15270#discussion_r1470390289


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -137,6 +139,7 @@ class BrokerMetadataPublisher(
 manifest: LoaderManifest
   ): Unit = {
 val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
+val currentMetadataVersion = metadataCache.metadataVersion()

Review Comment:
   There's no need to access metadataCache here. That would incur a performance 
penalty because you're dealing with volatiles. Instead, you can just look at 
`delta.image().features().metadataVersion()` (or something like that, I didn't 
check field names)
   
   `delta.image` is the previous metadata image (that the delta is built on top 
of)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]

2024-01-29 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #14327:
URL: https://github.com/apache/kafka/pull/14327#discussion_r1470370840


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1573,6 +1589,33 @@ private void updateGroupsByTopics(
 }
 });
 }
+if (!oldSubscribedTopicRegex.isEmpty()) {
+oldSubscribedTopicRegex.forEach(regex -> {
+groupsByRegex.computeIfPresent(regex, (__, groupIds) -> {
+groupIds.remove(groupId);
+return groupIds.isEmpty() ? null : groupIds;
+});
+Pattern pattern = Pattern.compile(regex);
+for (String topicName : 
metadataImage.topics().topicsByName().keySet()) {
+if (pattern.matcher(topicName).matches()) {
+unsubscribeGroupFromTopic(groupId, topicName);
+}
+}
+});
+}
+if (!newSubscribedTopicRegex.isEmpty()) {
+newSubscribedTopicRegex.forEach(regex -> {
+groupsByRegex
+.computeIfAbsent(regex, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+.add(groupId);
+Pattern pattern = Pattern.compile(regex);
+for (String topicName : 
metadataImage.topics().topicsByName().keySet()) {
+if (pattern.matcher(topicName).matches()) {
+subscribeGroupToTopic(groupId, topicName);
+}
+}

Review Comment:
   @dajac thanks for the explanation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14094) KIP-853: KRaft controller membership changes

2024-01-29 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14094:
---
Summary: KIP-853: KRaft controller membership changes  (was: KIP-853: KRaft 
controller memebership changes)

> KIP-853: KRaft controller membership changes
> 
>
> Key: KAFKA-14094
> URL: https://issues.apache.org/jira/browse/KAFKA-14094
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: 4.0-blocker
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16207) Implement KRaft internal listener for KRaftVersionRecord and VotersRecord

2024-01-29 Thread Jira
José Armando García Sancio created KAFKA-16207:
--

 Summary: Implement KRaft internal listener for KRaftVersionRecord 
and VotersRecord
 Key: KAFKA-16207
 URL: https://issues.apache.org/jira/browse/KAFKA-16207
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on PR #15251:
URL: https://github.com/apache/kafka/pull/15251#issuecomment-1915703872

   > @apoorvmittal10 : Thanks for the updated PR. A few more comments.
   
   Thanks for the review and suggestions. I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16168; Implement GroupCoordinator.onPartitionsDeleted [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15237:
URL: https://github.com/apache/kafka/pull/15237#discussion_r1470235881


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -2056,4 +2059,70 @@ public void 
testCompleteTransactionWithUnexpectedPartition() {
 
 assertFutureThrows(future, IllegalStateException.class);
 }
+
+@Test
+public void testOnPartitionsDeleted() {
+int partitionCount = 3;
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime,
+new GroupCoordinatorMetrics()
+);
+
+service.startup(() -> partitionCount);
+
+when(runtime.partitions()).thenReturn(
+IntStream
+.range(0, partitionCount)
+.mapToObj(i -> new TopicPartition("__consumer_offsets", i))
+.collect(Collectors.toSet())
+);
+
+List> futures = IntStream
+.range(0, partitionCount)
+.mapToObj(__ -> new CompletableFuture())
+.collect(Collectors.toList());
+
+IntStream.range(0, partitionCount).forEach(i -> {
+CompletableFuture future = futures.get(i);
+when(runtime.scheduleWriteOperation(
+ArgumentMatchers.eq("on-partition-deleted"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 
i)),
+ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ArgumentMatchers.any()
+)).thenAnswer(__ -> future);
+});
+
+IntStream.range(0, partitionCount - 1).forEach(i -> {
+futures.get(i).complete(null);
+});
+
+futures.get(partitionCount - 
1).completeExceptionally(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception());
+
+// The exception is logged and swallowed.
+assertDoesNotThrow(() ->

Review Comment:
   Should we have any other validations that the partition is deleted or not?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -903,6 +906,45 @@ public boolean cleanupExpiredOffsets(String groupId, 
List records) {
 return allOffsetsExpired.get() && 
!openTransactionsByGroup.containsKey(groupId);
 }
 
+/**
+ * Remove offsets of the partitions that have been deleted.
+ *
+ * @param topicPartitions   The partitions that have been deleted.
+ * @return The list of tombstones (offset commit) to append.
+ */
+public List onPartitionsDeleted(
+List topicPartitions
+) {
+List records = new ArrayList<>();
+
+Map> partitionsByTopic = new HashMap<>();
+topicPartitions.forEach(tp -> partitionsByTopic
+.computeIfAbsent(tp.topic(), __ -> new ArrayList<>())
+.add(tp.partition())
+);
+
+Consumer delete = offsets -> {

Review Comment:
   This confused me for a bit because the Consumer here is not a kafka consumer 
but actually the java consumer class. 
   
   These lines are creating a method called delete that takes all the offsets 
and appends the tombstone they need. Offsets (the variable) is overloaded a few 
times so it is a little confusing.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1001,8 +1002,26 @@ public void onTransactionCompleted(
 public void onPartitionsDeleted(
 List topicPartitions,
 BufferSupplier bufferSupplier
-) {
+) throws ExecutionException, InterruptedException {

Review Comment:
   To clarify -- is onPartitionsDeleted only called when the topic behind the 
partition is deleted (not reassigned etc)? Or are there other cases?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -3050,6 +3060,53 @@ public void testOffsetDeletionsSensor() {
 verify(context.metrics).record(OFFSET_DELETIONS_SENSOR_NAME, 2);
 }
 
+@Test
+public void testOnPartitionsDeleted() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+// Commit offsets.
+context.commitOffset("grp-0", "foo", 1, 100, 1, 
context.time.milliseconds());
+context.commitOffset("grp-0", "foo", 2, 200, 1, 
context.time.milliseconds());
+context.commitOffset("grp-0", "foo", 3, 300, 1, 
context.time.milliseconds());
+
+context.commitOffset("grp-1", "bar", 1, 100, 1, 
context.time.milliseconds());
+context.commitOffset("grp-1", "bar", 2, 200, 1, 
context.time.milliseconds());
+context.commitOffset("grp-1", "bar", 3, 300, 1, 
context.time.milliseconds());
+
+context.commitOffset(100L, "grp-2", "foo", 1, 100, 1, 
context.time.milliseconds());
+

Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on PR #15193:
URL: https://github.com/apache/kafka/pull/15193#issuecomment-1915696324

   Thanks to everyone who reviewed this. It really was pretty messy (and buggy) 
originally, and I think it's pretty clear now. I have to commit this now to 
unblock the 3.7 release. If you have further suggestions for improvements, 
please do submit them (either a PR, message, or whatever).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]

2024-01-29 Thread via GitHub


cmccabe merged PR #15193:
URL: https://github.com/apache/kafka/pull/15193


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470310097


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+sensorsName.add(pluginExportTime.name());
+}
+
+public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+}
+
+public void recordThrottleCount(Uuid clientInstanceId) {
+record(THROTTLE, clientInstanceId);
+}
+
+public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+record(PLUGIN_EXPORT, clientInstanceId);
+record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);

Review Comment:
   Valid suggestion, thanks for that. I didn't see that. I have made the change 

Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on PR #15193:
URL: https://github.com/apache/kafka/pull/15193#issuecomment-1915685264

   @ppatierno: yes, deprovisioning the kraft controller quorum and electing a 
zk-based controller should happen before rolling to remove 
`zookeeper.metadata.migration.enable`, etc. Should be fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470279663


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -299,8 +332,8 @@ public void 
testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno
 // Create new client metrics manager which simulates a new server as 
it will not have any
 // last request information but request should succeed as subscription 
id should match
 // the one with new client instance.
-
-ClientMetricsManager newClientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 100, time);
+kafkaMetrics = new Metrics();

Review Comment:
   Agree, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470278382


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -288,6 +307,9 @@ private ClientMetricsInstance 
createClientInstanceAndUpdateCache(Uuid clientInst
 ClientMetricsInstanceMetadata instanceMetadata) {
 
 ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata);
+// Maybe add client metrics, if metrics not already added. Metrics 
might be already added
+// if the client instance was evicted from the cache because of size 
limit.

Review Comment:
   The problem is that the eviction on size happens inside common LRUCache 
class where the code from this manager cannot be instrumented. I do not see any 
custom override that can be provided to LRUCache class. 
   
   Having said that, there won't be any hanging additional instance metrics 
sensors when eviction happens on size as anyways time-based eviction will 
remove such instance metrics. Also, if the respective instance reports metrics 
prior time-based eviction, then the metrics numbers for that instance will be 
correctly reported as well. Hence, I do think we should not evict metrics 
sensors on size-based eviction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on PR #15238:
URL: https://github.com/apache/kafka/pull/15238#issuecomment-1915649469

   committed, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]

2024-01-29 Thread via GitHub


cmccabe closed pull request #15238: KAFKA-16171 Fix hybrid mode controller race
URL: https://github.com/apache/kafka/pull/15238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470269365


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   I think you are right, I have removed the tag and moved the metric as global 
along instance count metric. Resolving the comment and would update the 
description of PR with change from the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]

2024-01-29 Thread via GitHub


mumrah commented on code in PR #15238:
URL: https://github.com/apache/kafka/pull/15238#discussion_r1470254717


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -635,12 +635,24 @@ class BecomeZkControllerEvent extends MigrationEvent {
 public void run() throws Exception {
 if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, 
this)) {
 applyMigrationOperation("Claiming ZK controller leadership", 
zkMigrationClient::claimControllerLeadership);
-if (migrationLeadershipState.zkControllerEpochZkVersion() == 
-1) {
+if (migrationLeadershipState.zkControllerEpochZkVersion() == 
-2) {

Review Comment:
   There's already sort a constant in ZkMigrationLeadershipState.EMPTY



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]

2024-01-29 Thread via GitHub


mumrah commented on code in PR #15238:
URL: https://github.com/apache/kafka/pull/15238#discussion_r1470254717


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -635,12 +635,24 @@ class BecomeZkControllerEvent extends MigrationEvent {
 public void run() throws Exception {
 if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, 
this)) {
 applyMigrationOperation("Claiming ZK controller leadership", 
zkMigrationClient::claimControllerLeadership);
-if (migrationLeadershipState.zkControllerEpochZkVersion() == 
-1) {
+if (migrationLeadershipState.zkControllerEpochZkVersion() == 
-2) {

Review Comment:
   There's already sort a constant in ZkMigrationLeadershipState.EMPTY



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on code in PR #15238:
URL: https://github.com/apache/kafka/pull/15238#discussion_r1470231664


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -635,12 +635,24 @@ class BecomeZkControllerEvent extends MigrationEvent {
 public void run() throws Exception {
 if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, 
this)) {
 applyMigrationOperation("Claiming ZK controller leadership", 
zkMigrationClient::claimControllerLeadership);
-if (migrationLeadershipState.zkControllerEpochZkVersion() == 
-1) {
+if (migrationLeadershipState.zkControllerEpochZkVersion() == 
-2) {

Review Comment:
   Can we have a constant for this like 
`ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16171 Fix hybrid mode controller race [kafka]

2024-01-29 Thread via GitHub


cmccabe commented on code in PR #15238:
URL: https://github.com/apache/kafka/pull/15238#discussion_r1470231305


##
metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java:
##
@@ -139,14 +139,18 @@ public ZkMigrationLeadershipState 
setMigrationRecoveryState(ZkMigrationLeadershi
 
 @Override
 public ZkMigrationLeadershipState 
claimControllerLeadership(ZkMigrationLeadershipState state) {
-this.state = state;
-return state;
+if (state.zkControllerEpochZkVersion() == 
ZkMigrationLeadershipState.EMPTY.zkControllerEpochZkVersion()) {

Review Comment:
   Can we just compare against -2 (or at least have a constant for this like 
`ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION`?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16101: KRaft migration documentation is incorrect [kafka]

2024-01-29 Thread via GitHub


mumrah commented on code in PR #15193:
URL: https://github.com/apache/kafka/pull/15193#discussion_r1470215533


##
docs/ops.html:
##
@@ -3992,6 +3980,136 @@ Finalizing the migration
 
 # Other configs ...
 
+  Reverting to ZooKeeper mode During the Migration
+  
+While the cluster is still in migration mode, it is possible to revert to 
ZooKeeper mode.  The process
+to follow depends on how far the migration has progressed. In order to 
find out how to revert,
+select the final migration step that you have completed in 
this table.
+  
+  
+Note that the directions given here assume that each step was fully 
completed, and they were
+done in order. So, for example, we assume that if "Enter Migration Mode on 
the Brokers" was
+completed, "Provisioning the KRaft controller quorum" was also fully 
completed previously.
+  
+  
+If you did not fully complete any step, back out whatever you have done 
and then follow revert
+directions for the last fully completed step.
+  
+
+  
+  
+  
+Final Migration Section Completed
+Directions for Reverting
+Notes
+  
+  
+Preparing for migration
+
+  The preparation section does not involve leaving ZooKeeper mode. So 
there is nothing to do in the
+  case of a revert.
+
+
+
+  
+  
+Provisioning the KRaft controller quorum
+
+  
+
+  Deprovision the KRaft controller quorum.
+
+
+  Then you are done.
+
+  
+
+
+
+  
+  
+Enter Migration Mode on the brokers
+
+  
+
+  Perform a roll of the broker cluster. In this roll, remove

Review Comment:
   Instead of "roll", how about "rolling restart"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-29 Thread via GitHub


dajac commented on PR #15271:
URL: https://github.com/apache/kafka/pull/15271#issuecomment-1915528645

   Sure. Will review it tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-01-29 Thread via GitHub


dongnuo123 commented on code in PR #15268:
URL: https://github.com/apache/kafka/pull/15268#discussion_r1470177662


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, 
List records) {
 }
 }
 
+private JoinGroupRequestProtocol throwIfProtocolUnmatched(
+ConsumerGroupMember member,
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols
+) {
+for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : 
protocols) {
+final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata());
+ConsumerProtocol.deserializeVersion(buffer);
+final Optional generationId = 
ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId();
+
+// If the generation id is provided, it must match the member 
epoch.
+if (!generationId.isPresent() || generationId.get() == 
member.memberEpoch()) {
+// TODO: need a list of all available server assignors
+if 
(UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name())
+|| 
RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) {
+return protocol;
+}
+}
+}
+throw new FencedMemberEpochException("The JoinGroup request doesn't 
have a matched generation id from a " +
+"protocol supported by the server assignors with the epoch of the 
member known by the group coordinator (" +
+member.memberEpoch() + ").");
+}
+
+private List 
transitionToConsumerGroupHeartbeatTopicPartitions(
+List topicPartitions
+) {
+Map> topicMap = new HashMap<>();
+topicPartitions.forEach(tp ->
+topicMap.computeIfAbsent(tp.topic(), __ -> new 
ArrayList<>()).add(tp.partition())
+);
+return topicMap.entrySet().stream().map(item -> {
+TopicImage topicImage = 
metadataImage.topics().getTopic(item.getKey());
+if (topicImage == null) {
+throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic 
id of topic " + item.getKey() + ".");
+}
+return new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+.setTopicId(topicImage.id())
+.setPartitions(item.getValue());
+}).collect(Collectors.toList());
+}
+
+public CoordinatorResult upgradeGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+final int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > classicGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+return EMPTY_RESULT;
+}
+
+// Get or create the consumer group.
+final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+throwIfConsumerGroupIsFull(group, memberId);
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// new dynamic member.
+if (isUnknownMember && 
JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) {
+// If member id required, send back a response to call for 
another join group request with allocated member id.
+log.info("Dynamic member with unknown member id joins group 
{}. " +
+"Created a new member id {} and requesting the member 
to rejoin with this id.",
+group.groupId(), memberId);
+
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.MEMBER_ID_REQUIRED.code())
+);
+return EMPTY_RESULT;
+} else {
+member = group.getOrMaybeCreateMember(memberId, true);
+newMemberCreated = !group.members().containsKey(memberId);
+log.info("[GroupId {}] Member {} joins the consumer group.", 

Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-29 Thread via GitHub


lucasbru merged PR #15000:
URL: https://github.com/apache/kafka/pull/15000


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-29 Thread via GitHub


lucasbru commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1915503178

   Test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-29 Thread via GitHub


lucasbru commented on PR #15271:
URL: https://github.com/apache/kafka/pull/15271#issuecomment-1915499373

   @dajac Could you please take a look when you have time? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14725: Interrupt source task poll thread when cancelled [kafka]

2024-01-29 Thread via GitHub


C0urante commented on PR #14316:
URL: https://github.com/apache/kafka/pull/14316#issuecomment-1915480693

   Hmm... looks like this is causing a spike in CI failures. Even though all of 
the tests are known to be flaky, I'm going to try to look into the tests more 
closely before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]

2024-01-29 Thread via GitHub


C0urante commented on PR #15249:
URL: https://github.com/apache/kafka/pull/15249#issuecomment-1915476191

   [The next CI 
run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15249/7/tests/)
 also completed with no failures in the `ConnectorRestartApiIntegrationTest` 
suite; kicking off a third.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1470117099


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,19 +816,19 @@ class ReplicaManager(val config: KafkaConfig,
 
 val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
 val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
-entriesPerPartition.foreach { case (topicPartition, records) =>
+entriesPerPartition.forKeyValue { (topicPartition, records) =>
   // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
   val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
   transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
-  if (!transactionalBatches.isEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
+  if (transactionalBatches.nonEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
 }
 if (transactionalProducerInfo.size > 1) {
   throw new InvalidPidMappingException("Transactional records contained 
more than one producer ID")
 }
 
-def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors],
- newRequestLocal: RequestLocal,
- verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
+def postVerificationCallback(newRequestLocal: RequestLocal,
+ results: (Map[TopicPartition, Errors], 
Map[TopicPartition, VerificationGuard])): Unit = {

Review Comment:
   Ah right -- makes sense  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update jose4j to 0.9.4 to address CVE-2023-51775 [kafka]

2024-01-29 Thread via GitHub


mike-lloyd03 opened a new pull request, #15284:
URL: https://github.com/apache/kafka/pull/15284

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   `org.bitbucket.b_c:jose4j` 0.9.3 is susceptible to Denial of Service per 
[CVE-2023-51775](https://www.cve.org/CVERecord?id=CVE-2023-51775).
   
   This PR updates `kafka` to use 0.9.4.
   
   Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-29 Thread via GitHub


dajac commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1470109481


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,19 +816,19 @@ class ReplicaManager(val config: KafkaConfig,
 
 val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
 val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
-entriesPerPartition.foreach { case (topicPartition, records) =>
+entriesPerPartition.forKeyValue { (topicPartition, records) =>
   // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
   val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
   transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
-  if (!transactionalBatches.isEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
+  if (transactionalBatches.nonEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
 }
 if (transactionalProducerInfo.size > 1) {
   throw new InvalidPidMappingException("Transactional records contained 
more than one producer ID")
 }
 
-def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors],
- newRequestLocal: RequestLocal,
- verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
+def postVerificationCallback(newRequestLocal: RequestLocal,
+ results: (Map[TopicPartition, Errors], 
Map[TopicPartition, VerificationGuard])): Unit = {

Review Comment:
   The function that wraps the callback only support wrapping methods with one 
argument. I could not find a better way to handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1470110954


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1062,10 +1062,10 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
  * @param operationName The name of the operation.
  * @param operationInputThe operation's input for logging purposes.
  * @param exception The exception to handle.
- * @param handler   A function which takes an Errors and a String 
and returns the expected
+ * @param handler   A function which takes an Errors and a String 
and builds the expected
  *  output. The String can be null. Note that the 
function could further
  *  transform the error depending on the context.
- * @return The response.
+ * @return The output build by the handler.

Review Comment:
   nit: built?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-29 Thread via GitHub


dajac commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1470109774


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:
##
@@ -85,6 +85,10 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 groupCoordinator = GroupCoordinator(config, replicaManager, 
heartbeatPurgatory, rebalancePurgatory, timer.time, metrics)
 groupCoordinator.startup(() => 
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
   false)
+
+// Transactional appends attempt to schedule to the request handler thread 
using
+// a non request handler thread. Set this to avoid error.
+KafkaRequestHandler.setBypassThreadCheck(true)

Review Comment:
   That’s right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1470109704


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
 }
 
 /**
- * Handles the exception in the scheduleWriteOperation.
- * @return The Errors instance associated with the given exception.
+ * This is the handler commonly used by all the operations that requires 
to convert errors to
+ * coordinator errors. The handler also handles and log unexpected errors.
+ *
+ * @param requestName   The name of the request.
+ * @param request   The request itself for logging purposes.
+ * @param exception The exception to handle.
+ * @param responseBuilder   A function which takes an Errors and a String 
and returns
+ *  the response. The String can be null.
+ * @return The response.
+ * @param  The type of the request.

Review Comment:
   Ok -- i think if we include those comments in the description, that should 
be enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-29 Thread via GitHub


dajac commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1470109481


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,19 +816,19 @@ class ReplicaManager(val config: KafkaConfig,
 
 val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
 val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
-entriesPerPartition.foreach { case (topicPartition, records) =>
+entriesPerPartition.forKeyValue { (topicPartition, records) =>
   // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
   val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
   transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
-  if (!transactionalBatches.isEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
+  if (transactionalBatches.nonEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
 }
 if (transactionalProducerInfo.size > 1) {
   throw new InvalidPidMappingException("Transactional records contained 
more than one producer ID")
 }
 
-def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors],
- newRequestLocal: RequestLocal,
- verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
+def postVerificationCallback(newRequestLocal: RequestLocal,
+ results: (Map[TopicPartition, Errors], 
Map[TopicPartition, VerificationGuard])): Unit = {

Review Comment:
   The function that wraps the callback only support wrapping methods with one 
argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-29 Thread via GitHub


dajac commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1470107871


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
 }
 
 /**
- * Handles the exception in the scheduleWriteOperation.
- * @return The Errors instance associated with the given exception.
+ * This is the handler commonly used by all the operations that requires 
to convert errors to
+ * coordinator errors. The handler also handles and log unexpected errors.
+ *
+ * @param requestName   The name of the request.
+ * @param request   The request itself for logging purposes.
+ * @param exception The exception to handle.
+ * @param responseBuilder   A function which takes an Errors and a String 
and returns
+ *  the response. The String can be null.
+ * @return The response.
+ * @param  The type of the request.

Review Comment:
   That’s right. The input must be toString’able. The output type is the type 
returned by the handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1470098563


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -982,24 +996,21 @@ class ReplicaManager(val config: KafkaConfig,
 producerId: Long,
 producerEpoch: Short,
 baseSequence: Int,
-requestLocal: RequestLocal,
-callback: (Errors, RequestLocal, VerificationGuard) => Unit
+callback: ((Errors, VerificationGuard)) => Unit
   ): Unit = {
-def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-newRequestLocal: RequestLocal,
-verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-  callback(
+def generalizedCallback(results: (Map[TopicPartition, Errors], 
Map[TopicPartition, VerificationGuard])): Unit = {
+  val (preAppendErrors, verificationGuards) = results
+  callback((

Review Comment:
   nit: are the double `((` needed? I see it elsewhere so maybe I'm missing an 
aspect of the language.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [6/N] Avoid recheduling callback in request thread [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15176:
URL: https://github.com/apache/kafka/pull/15176#discussion_r1470096146


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -935,8 +935,12 @@ private[group] class GroupCoordinator(
   producerId,
   producerEpoch,
   RecordBatch.NO_SEQUENCE,
-  requestLocal,
-  postVerificationCallback
+  // Wrap the callback to be handled on an arbitrary request handler
+  // thread when transaction verification is complete.

Review Comment:
   Should we leave a comment that the requestLocal passed in is only for the 
case where we execute immediately?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -982,24 +996,21 @@ class ReplicaManager(val config: KafkaConfig,
 producerId: Long,
 producerEpoch: Short,
 baseSequence: Int,
-requestLocal: RequestLocal,
-callback: (Errors, RequestLocal, VerificationGuard) => Unit
+callback: ((Errors, VerificationGuard)) => Unit
   ): Unit = {
-def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
-newRequestLocal: RequestLocal,
-verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
-  callback(
+def generalizedCallback(results: (Map[TopicPartition, Errors], 
Map[TopicPartition, VerificationGuard])): Unit = {
+  val (preAppendErrors, verificationGuards) = results
+  callback((

Review Comment:
   nit: are the double `((` needed? I see it elsewhere so maybe I'm missing an 
aspect of the language.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -813,19 +816,19 @@ class ReplicaManager(val config: KafkaConfig,
 
 val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
 val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
-entriesPerPartition.foreach { case (topicPartition, records) =>
+entriesPerPartition.forKeyValue { (topicPartition, records) =>
   // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
   val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
   transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
-  if (!transactionalBatches.isEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
+  if (transactionalBatches.nonEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
 }
 if (transactionalProducerInfo.size > 1) {
   throw new InvalidPidMappingException("Transactional records contained 
more than one producer ID")
 }
 
-def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors],
- newRequestLocal: RequestLocal,
- verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
+def postVerificationCallback(newRequestLocal: RequestLocal,
+ results: (Map[TopicPartition, Errors], 
Map[TopicPartition, VerificationGuard])): Unit = {

Review Comment:
   I don't have a problem with this refactor, but just curious what makes a 
tuple argument better than individual ones.



##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:
##
@@ -85,6 +85,10 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 groupCoordinator = GroupCoordinator(config, replicaManager, 
heartbeatPurgatory, rebalancePurgatory, timer.time, metrics)
 groupCoordinator.startup(() => 
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
   false)
+
+// Transactional appends attempt to schedule to the request handler thread 
using
+// a non request handler thread. Set this to avoid error.
+KafkaRequestHandler.setBypassThreadCheck(true)

Review Comment:
   Is this needed because we do the wrapper outside replica manager now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]

2024-01-29 Thread via GitHub


lianetm commented on code in PR #15271:
URL: https://github.com/apache/kafka/pull/15271#discussion_r1470081608


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -997,11 +985,13 @@ void markReconciliationCompleted() {
  * 
  * 
  */
-private void resolveMetadataForUnresolvedAssignment() {
-assignmentReadyToReconcile.clear();
+private SortedSet resolveMetadataForTargetAssignment() {
+final SortedSet assignmentReadyToReconcile = new 
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
+final HashMap> unresolved = new 
HashMap<>(currentTargetAssignment);
+
 // Try to resolve topic names from metadata cache or subscription 
cache, and move
 // assignments from the "unresolved" collection, to the 
"readyToReconcile" one.

Review Comment:
   Dropping it sounds good to me, agree with your what vs why view on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1470078455


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
 }
 
 /**
- * Handles the exception in the scheduleWriteOperation.
- * @return The Errors instance associated with the given exception.
+ * This is the handler commonly used by all the operations that requires 
to convert errors to
+ * coordinator errors. The handler also handles and log unexpected errors.
+ *
+ * @param requestName   The name of the request.
+ * @param request   The request itself for logging purposes.
+ * @param exception The exception to handle.
+ * @param responseBuilder   A function which takes an Errors and a String 
and returns
+ *  the response. The String can be null.
+ * @return The response.
+ * @param  The type of the request.

Review Comment:
   I'm wondering if the input is ever not a string (since it is just used for 
logging). I guess the out type is the type the handler returns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-29 Thread via GitHub


dajac commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1470072535


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,7 +524,7 @@ public CompletableFuture listGroups(
 .combineFutures(futures, ArrayList::new, List::addAll)
 .thenApply(groups -> new 
ListGroupsResponseData().setGroups(groups))
 .exceptionally(exception -> handleOperationException(
-"ListGroups",
+"list-groups",

Review Comment:
   Indeed, list groups lists all the groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]

2024-01-29 Thread via GitHub


philipnee commented on PR #15216:
URL: https://github.com/apache/kafka/pull/15216#issuecomment-1915374713

   Hi @lucasbru - This PR is ready for review.  I did a bit of refactor in this 
PR so not everything here is in the scope.  Let me know what do you think!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1470066073


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,7 +524,7 @@ public CompletableFuture listGroups(
 .combineFutures(futures, ArrayList::new, List::addAll)
 .thenApply(groups -> new 
ListGroupsResponseData().setGroups(groups))
 .exceptionally(exception -> handleOperationException(
-"ListGroups",
+"list-groups",

Review Comment:
   this covers class and consumer groups? There are a few without the classic 
or consumer prefix so just wanted to confirm they all cover both if there is no 
prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]

2024-01-29 Thread via GitHub


jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1470066073


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -523,7 +524,7 @@ public CompletableFuture listGroups(
 .combineFutures(futures, ArrayList::new, List::addAll)
 .thenApply(groups -> new 
ListGroupsResponseData().setGroups(groups))
 .exceptionally(exception -> handleOperationException(
-"ListGroups",
+"list-groups",

Review Comment:
   this covers class and consumer groups?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470043472


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -288,6 +307,9 @@ private ClientMetricsInstance 
createClientInstanceAndUpdateCache(Uuid clientInst
 ClientMetricsInstanceMetadata instanceMetadata) {
 
 ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata);
+// Maybe add client metrics, if metrics not already added. Metrics 
might be already added
+// if the client instance was evicted from the cache because of size 
limit.

Review Comment:
   Hmm, if we evict a client instance from LRU cache, should we remove the 
corresponding metrics?



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new 

Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]

2024-01-29 Thread via GitHub


philipnee commented on code in PR #15216:
URL: https://github.com/apache/kafka/pull/15216#discussion_r1470063334


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -671,18 +718,8 @@ private ConsumerConfig config() {
 return new ConsumerConfig(prop);
 }
 
-private HeartbeatRequestManager createHeartbeatRequestManager() {

Review Comment:
   dead code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16115: Adding missing heartbeat metrics [kafka]

2024-01-29 Thread via GitHub


philipnee commented on code in PR #15216:
URL: https://github.com/apache/kafka/pull/15216#discussion_r1470058217


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java:
##
@@ -26,20 +26,20 @@
 
 import java.util.concurrent.TimeUnit;
 
-public class KafkaConsumerMetrics implements AutoCloseable {
+import static 
org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager.MetricGroupSuffix.CONSUMER;
+
+public class KafkaConsumerMetrics extends AbstractConsumerMetricsManager 
implements AutoCloseable {
 private final MetricName lastPollMetricName;
 private final Sensor timeBetweenPollSensor;
 private final Sensor pollIdleSensor;
 private final Sensor committedSensor;
 private final Sensor commitSyncSensor;
-private final Metrics metrics;
 private long lastPollMs;
 private long pollStartMs;
 private long timeSinceLastPollMs;
 
 public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {

Review Comment:
   i'm not even sure why we have metricGrpPrefix from the beginning - it is not 
specified by the user and it seems to always be the default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1470001208


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -183,7 +183,7 @@ object HostedPartition {
   /**
* This broker hosts the partition, but it is in an offline log directory.
*/
-  final case class Offline(topicId: Option[Uuid]) extends HostedPartition
+  final case class Offline(partition: Option[Partition]) extends 
HostedPartition

Review Comment:
   Actually ignore me. Partition wouldn't be available for `Offline` class all 
the time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r146343


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   I don't think it's a good idea to tag with the client instance id. Imagine a 
situation in which an unbounded number of client instance IDs are erroneously 
being used. I don't think we want to build up knowledge of each of them 
individually, rather to count how many delinquent instances there are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1469993396


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -183,7 +183,7 @@ object HostedPartition {
   /**
* This broker hosts the partition, but it is in an offline log directory.
*/
-  final case class Offline(topicId: Option[Uuid]) extends HostedPartition
+  final case class Offline(partition: Option[Partition]) extends 
HostedPartition

Review Comment:
   if you are using `Partition` then maybe remove the Option as 
`Partition.topicId` already return option 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16206) KRaftMigrationZkWriter tries to delete deleted topic configs twice

2024-01-29 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16206:
-
Summary: KRaftMigrationZkWriter tries to delete deleted topic configs twice 
 (was: ZkConfigMigrationClient tries to delete topic configs twice)

> KRaftMigrationZkWriter tries to delete deleted topic configs twice
> --
>
> Key: KAFKA-16206
> URL: https://issues.apache.org/jira/browse/KAFKA-16206
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, migration
>Reporter: David Arthur
>Priority: Minor
>
> When deleting a topic, we see spurious ERROR logs from 
> kafka.zk.migration.ZkConfigMigrationClient:
>  
> {code:java}
> Did not delete ConfigResource(type=TOPIC, name='xxx') since the node did not 
> exist. {code}
> This seems to happen because ZkTopicMigrationClient#deleteTopic is deleting 
> the topic, partitions, and config ZNodes in one shot. Subsequent calls from 
> KRaftMigrationZkWriter to delete the config encounter a NO_NODE since the 
> ZNode is already gone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers

2024-01-29 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16171:
-
Component/s: controller

> Controller failover during ZK migration can prevent metadata updates to ZK 
> brokers
> --
>
> Key: KAFKA-16171
> URL: https://issues.apache.org/jira/browse/KAFKA-16171
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft, migration
>Affects Versions: 3.6.0, 3.7.0, 3.6.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
>
> h2. Description
> During the ZK migration, after KRaft becomes the active controller we enter a 
> state called hybrid mode. This means we have a mixture of ZK and KRaft 
> brokers. The KRaft controller updates the ZK brokers using the deprecated 
> controller RPCs (LeaderAndIsr, UpdateMetadata, etc). 
>  
> A race condition exists where the KRaft controller will get stuck in a retry 
> loop while initializing itself after a failover which prevents it from 
> sending these RPCs to ZK brokers.
> h2. Impact
> Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK 
> brokers will not receive any metadata updates. The ZK brokers will be able to 
> send requests to the controller (such as AlterPartitions), but the metadata 
> updates which come as a result of those requests will never be seen. This 
> essentially looks like the controller is unavailable from the ZK brokers 
> perspective.
> h2. Detection and Mitigation
> This bug can be seen by observing failed ZK writes from a recently elected 
> controller.
> The tell-tale error message is:
> {code:java}
> Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This 
> indicates that another KRaft controller is making writes to ZooKeeper. {code}
> with a stacktrace like:
> {noformat}
> java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. 
> Expected zkVersion = 507823. This indicates that another KRaft controller is 
> making writes to ZooKeeper.
>   at 
> kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613)
>   at 
> kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639)
>   at 
> kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664)
>   at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
>   at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
>   at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
>   at 
> kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438)
>   at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:1583)
>   at 
> org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat}
> To mitigate this problem, a new KRaft controller should be elected. This can 
> be done by restarting the problematic active controller. To verify that the 
> new controller does not encounter the 

[jira] [Updated] (KAFKA-16205) Reduce number of metadata requests during hybrid mode

2024-01-29 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16205:
-
Component/s: controller

> Reduce number of metadata requests during hybrid mode
> -
>
> Key: KAFKA-16205
> URL: https://issues.apache.org/jira/browse/KAFKA-16205
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, kraft, migration
>Affects Versions: 3.4.0, 3.5.0, 3.6.0, 3.7.0
>Reporter: David Arthur
>Priority: Major
>
> When migrating a cluster with a high number of brokers and partitions, it is 
> possible for the controller channel manager queue to get backed up. This can 
> happen when many small RPCs are generated in response to several small 
> MetadataDeltas being handled MigrationPropagator.
>  
> In the ZK controller, various optimizations have been made over the years to 
> reduce the number of UMR and LISR sent during controlled shutdown or other 
> large metadata events. For the ZK to KRaft migration, we use the 
> MetadataLoader infrastructure to learn about and propagate metadata to ZK 
> brokers.
>  
> We need to improve the batching in MigrationPropagator to avoid performance 
> issues during the migration of large clusters.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16205) Reduce number of metadata requests during hybrid mode

2024-01-29 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16205:
-
Component/s: (was: controller)

> Reduce number of metadata requests during hybrid mode
> -
>
> Key: KAFKA-16205
> URL: https://issues.apache.org/jira/browse/KAFKA-16205
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, migration
>Affects Versions: 3.4.0, 3.5.0, 3.6.0, 3.7.0
>Reporter: David Arthur
>Priority: Major
>
> When migrating a cluster with a high number of brokers and partitions, it is 
> possible for the controller channel manager queue to get backed up. This can 
> happen when many small RPCs are generated in response to several small 
> MetadataDeltas being handled MigrationPropagator.
>  
> In the ZK controller, various optimizations have been made over the years to 
> reduce the number of UMR and LISR sent during controlled shutdown or other 
> large metadata events. For the ZK to KRaft migration, we use the 
> MetadataLoader infrastructure to learn about and propagate metadata to ZK 
> brokers.
>  
> We need to improve the batching in MigrationPropagator to avoid performance 
> issues during the migration of large clusters.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16205) Reduce number of metadata requests during hybrid mode

2024-01-29 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16205:
-
Component/s: migration

> Reduce number of metadata requests during hybrid mode
> -
>
> Key: KAFKA-16205
> URL: https://issues.apache.org/jira/browse/KAFKA-16205
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, kraft, migration
>Affects Versions: 3.4.0, 3.5.0, 3.6.0, 3.7.0
>Reporter: David Arthur
>Priority: Major
>
> When migrating a cluster with a high number of brokers and partitions, it is 
> possible for the controller channel manager queue to get backed up. This can 
> happen when many small RPCs are generated in response to several small 
> MetadataDeltas being handled MigrationPropagator.
>  
> In the ZK controller, various optimizations have been made over the years to 
> reduce the number of UMR and LISR sent during controlled shutdown or other 
> large metadata events. For the ZK to KRaft migration, we use the 
> MetadataLoader infrastructure to learn about and propagate metadata to ZK 
> brokers.
>  
> We need to improve the batching in MigrationPropagator to avoid performance 
> issues during the migration of large clusters.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16171) Controller failover during ZK migration can prevent metadata updates to ZK brokers

2024-01-29 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16171:
-
Component/s: migration
 (was: controller)

> Controller failover during ZK migration can prevent metadata updates to ZK 
> brokers
> --
>
> Key: KAFKA-16171
> URL: https://issues.apache.org/jira/browse/KAFKA-16171
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, migration
>Affects Versions: 3.6.0, 3.7.0, 3.6.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
>
> h2. Description
> During the ZK migration, after KRaft becomes the active controller we enter a 
> state called hybrid mode. This means we have a mixture of ZK and KRaft 
> brokers. The KRaft controller updates the ZK brokers using the deprecated 
> controller RPCs (LeaderAndIsr, UpdateMetadata, etc). 
>  
> A race condition exists where the KRaft controller will get stuck in a retry 
> loop while initializing itself after a failover which prevents it from 
> sending these RPCs to ZK brokers.
> h2. Impact
> Since the KRaft controller cannot send any RPCs to the ZK brokers, the ZK 
> brokers will not receive any metadata updates. The ZK brokers will be able to 
> send requests to the controller (such as AlterPartitions), but the metadata 
> updates which come as a result of those requests will never be seen. This 
> essentially looks like the controller is unavailable from the ZK brokers 
> perspective.
> h2. Detection and Mitigation
> This bug can be seen by observing failed ZK writes from a recently elected 
> controller.
> The tell-tale error message is:
> {code:java}
> Check op on KRaft Migration ZNode failed. Expected zkVersion = 507823. This 
> indicates that another KRaft controller is making writes to ZooKeeper. {code}
> with a stacktrace like:
> {noformat}
> java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. 
> Expected zkVersion = 507823. This indicates that another KRaft controller is 
> making writes to ZooKeeper.
>   at 
> kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2613)
>   at 
> kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2639)
>   at 
> kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2664)
>   at 
> scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
>   at 
> scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
>   at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
>   at 
> kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2664)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.$anonfun$createTopic$1(ZkTopicMigrationClient.scala:158)
>   at 
> kafka.zk.migration.ZkTopicMigrationClient.createTopic(ZkTopicMigrationClient.scala:141)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$27(KRaftMigrationZkWriter.java:441)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:262)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:64)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.lambda$run$0(KRaftMigrationDriver.java:791)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:880)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsSnapshot$28(KRaftMigrationZkWriter.java:438)
>   at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsSnapshot(KRaftMigrationZkWriter.java:436)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleSnapshot(KRaftMigrationZkWriter.java:115)
>   at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SyncKRaftMetadataEvent.run(KRaftMigrationDriver.java:790)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:1583)
>   at 
> org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66){noformat}
> To mitigate this problem, a new KRaft controller should be elected. This can 
> be done by restarting the problematic active controller. To verify that the 
> new 

  1   2   >