Re: [PR] KAFKA-15645 ReplicationQuotasTestRig rewritten in java [kafka]
nizhikov commented on PR #14588: URL: https://github.com/apache/kafka/pull/14588#issuecomment-1823869599 Hello @mimaison Are you be able to review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15860: ControllerRegistration must be written out to the metadata image [kafka]
cmccabe merged PR #14807: URL: https://github.com/apache/kafka/pull/14807 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
apoorvmittal10 closed pull request #14632: KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) URL: https://github.com/apache/kafka/pull/14632 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 closed pull request #14699: KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) URL: https://github.com/apache/kafka/pull/14699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8575) Investigate removing EAGER protocol & cleaning up task suspension in Streams rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8575: --- Fix Version/s: 4.0.0 > Investigate removing EAGER protocol & cleaning up task suspension in Streams > rebalancing > - > > Key: KAFKA-8575 > URL: https://issues.apache.org/jira/browse/KAFKA-8575 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.0 >Reporter: A. Sophie Blee-Goldman >Priority: Critical > Fix For: 4.0.0 > > > With KIP-429 the suspend/resume of tasks may have minimal gains while adding > a lot of complexity and potential bugs. We should consider removing/cleaning > it up and going a step further to remove the EAGER protocol from Streams > entirely. > Plan to remove this in 3.1/4.0, whichever comes after 3.0. This will make 3.0 > a bridge release for users upgrading from any version below 2.4, but they > will still be able to do so in the usual two rolling bounces. > > *The upgrade path from 2.3 and below, to any \{to_version} higher than 3.1 > will be:* > 1. During the first rolling bounce, upgrade the jars to a version between 2.4 > - 3.1 and add the UPGRADE_FROM config for whichever version you are upgrading > from > 2. During the second rolling bounce, upgrade the jars to the desired > \{to_version} and remove the UPGRADE_FROM config > > EAGER will be effectively deprecated in 3.0 but not removed until the next > version. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-8088) Deprecate `WindowStoreIterator` interface
[ https://issues.apache.org/jira/browse/KAFKA-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8088: --- Fix Version/s: 4.0.0 > Deprecate `WindowStoreIterator` interface > - > > Key: KAFKA-8088 > URL: https://issues.apache.org/jira/browse/KAFKA-8088 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 4.0.0 > > > The `WindowStore` interface has multiple methods to fetch() data. However, > the return types are mixed up. Two methods return `WindowStoreIterator` while > all others return `KeyValueIterator`. > We should align the return types and replace `WindowStoreIterator` with > `KeyValueIterator`. For backward compatibility reasons we can only deprecate > the interface for now and remove it only later. > KIP-439: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-439%3A+Deprecate+Interface+WindowStoreIterator] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12281) Deprecate org.apache.kafka.streams.errors.BrokerNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-12281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12281: Labels: beginner needs-kip newbie (was: needs-kip) > Deprecate org.apache.kafka.streams.errors.BrokerNotFoundException > - > > Key: KAFKA-12281 > URL: https://issues.apache.org/jira/browse/KAFKA-12281 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: beginner, needs-kip, newbie > > It's been 3 years since 234ec8a gets rid of usage of BrokerNotFoundException. > Hence, it is time to deprecate BrokerNotFoundException. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15848: - Assignee: Kirk True > Consumer API timeout inconsistent between ConsumerDelegate implementations > -- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Description: The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use and interpretation of the {{Timer}} that is supplied. h3. tl;dr {{AsyncKafkaConsumer}} is very literal about the timeout, whereas {{LegacyKafkaConsumer}} seems to give a little wiggle room. {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for success of its operations _before_ checking the timer: # Submit operation asynchronously # Wait for operation to complete using {{NetworkClient.poll()}} # Check for result ## If successful, return success ## If fatal failure, return failure # Check timer ## If timer expired, return failure {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: # Submit operation asynchronously # Wait for operation to complete using {{Future.get()}} ## If operation timed out, {{Future.get()}} will throw a timeout error # Check for result ## If successful, return success ## Otherwise, return failure h3. How to reproduce This causes subtle timing issues, but they can be easily reproduced via any of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. Here's a bit of code that illustrates the difference between the two approaches. {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a manner similar to this: {code:java} public int getCount(Timer timer) { do { final RequestFuture future = sendSomeRequest(partitions); client.poll(future, timer); if (future.isDone()) return future.get(); } while (timer.notExpired()); return -1; } {code} {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: {code:java} private int getCount(Timer timer) { try { CompletableFuture future = new CompleteableFuture<>(); applicationEventQueue.add(future); return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { return -1; } } {code} The call to {{add}} enqueues the network operation, but it then _immediately_ invokes {{Future.get()}} with the timeout to implement a time-bounded blocking call. Since this method is being called with a timeout of 0, it _immediately_ throws a {{{}TimeoutException{}}}. h3. Suggested fix TBD :( was: The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use and interpretation of the {{Timer}} that is supplied. h3. tl;dr {{AsyncKafkaConsumer}} is very literal about the timeout, whereas {{LegacyKafkaConsumer}} seems to give a little wiggle room. {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for success of its operations _before_ checking the timer: # Submit operation asynchronously # Wait for operation to complete using {{NetworkClient.poll()}} # Check for result ## If successful, return success ## If fatal failure, return failure # Check timer ## If timer expired, return failure {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: # Submit operation asynchronously # Wait for operation to complete using {{Future.get()}} ## If operation timed out, {{Future.get()}} will throw a timeout error # Check for result ## If successful, return success ## Otherwise, return failure h3. How to reproduce This causes subtle timing issues, but they can be easily reproduced via any of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. Here's a bit of code that illustrates the difference between the two approaches. {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a manner similar to this: {code:keyword} public boolean doSomeWork(Timer timer) { do { final RequestFuture future = sendSomeRequest(partitions); client.poll(future, timer); if (future.isDone()) return {code} {color:#910091}true{color} {code:java} ; } while (timer.notExpired()); return false; } {code} {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: {code:keyword} private boolean doSomeWork(Timer timer) { Set partitions = subscriptions.initializingPartitions(); try { CompleteableApplicationEvent event = {code} {color:#910091}. . .{color} {code:java} CompletableFuture future = event.future(); applicationEventHandler.add(event); future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); return true; } catch (TimeoutException e) { return false; } } {code} The call to {{add}} enqueues the network operation, but it then _immediately_ invokes {{Future.get()}} with the
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Description: The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use and interpretation of the {{Timer}} that is supplied. h3. tl;dr {{AsyncKafkaConsumer}} is very literal about the timeout, whereas {{LegacyKafkaConsumer}} seems to give a little wiggle room. {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for success of its operations _before_ checking the timer: # Submit operation asynchronously # Wait for operation to complete using {{NetworkClient.poll()}} # Check for result ## If successful, return success ## If fatal failure, return failure # Check timer ## If timer expired, return failure {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: # Submit operation asynchronously # Wait for operation to complete using {{Future.get()}} ## If operation timed out, {{Future.get()}} will throw a timeout error # Check for result ## If successful, return success ## Otherwise, return failure h3. How to reproduce This causes subtle timing issues, but they can be easily reproduced via any of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. Here's a bit of code that illustrates the difference between the two approaches. {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a manner similar to this: {code:keyword} public boolean doSomeWork(Timer timer) { do { final RequestFuture future = sendSomeRequest(partitions); client.poll(future, timer); if (future.isDone()) return {code} {color:#910091}true{color} {code:java} ; } while (timer.notExpired()); return false; } {code} {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: {code:keyword} private boolean doSomeWork(Timer timer) { Set partitions = subscriptions.initializingPartitions(); try { CompleteableApplicationEvent event = {code} {color:#910091}. . .{color} {code:java} CompletableFuture future = event.future(); applicationEventHandler.add(event); future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); return true; } catch (TimeoutException e) { return false; } } {code} The call to {{add}} enqueues the network operation, but it then _immediately_ invokes {{Future.get()}} with the timeout to implement a time-bounded blocking call. Since this method is being called with a timeout of 0, it _immediately_ throws a {{{}TimeoutException{}}}. h3. Suggested fix TBD :( was: The {{LegacyKafkaConsumer}} and {{AsyncKafkaConsumer}} implementations have a fundamental difference in their timing when it relates to their use of the {{Timer}} that is supplied. h3. tl;dr For time-bounded tasks, {{LegacyKafkaConsumer}} does the following: # Attempt the operation # If successful, return result # Check timer, return if expired # Go to top {{AsyncKafkaConsumer}} effectively does the inverse: # Check timer, return if expired # Attempt the operation # If successful, return result # Go to top {{AsyncKafkaConsumer}} is very literal about the timeout, whereas {{LegacyKafkaConsumer}} seems to give a little wiggle room. h3. How to reproduce This causes subtle timing issues, but they can be easily reproduced via the {{KafkaConsumerTest}} unit tests, e.g. {{verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit()}}. This test invokes {{consumer.poll(Duration.ofMillis(0))}}, i.e. with a zero-length timeout. As part of the {{poll()}} logic, the consumer may need to refresh offsets. To accomplish this, {{LegacyKafkaConsumer}} uses the {{ConsumerCoordinator.fetchCommittedOffsets()}} method which is structured like this: {code:java} public void fetchCommittedOffsets(Set partitions, Timer timer) { do { final RequestFuture> future = sendOffsetFetchRequest(partitions); . . . client.poll(future, timer); . . . return future.value(); } while (timer.notExpired()); return null; } {code} The {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: {code:java} private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { Set initializingPartitions = subscriptions.initializingPartitions(); try { OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(initializingPartitions); Map offsets = applicationEventHandler.addAndGet(event, timer); . . . return true; } catch (TimeoutException e) { return false; } } {code} The call to {{addAndGet}} enqueues the operation on the queue for the network I/O thread and then _immediately_ invokes {{Future.get()}} with the timeout to implement a time-bounded
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Summary: Consumer API timeout inconsistent between ConsumerDelegate implementations (was: Consumer API timeout inconsistent between LegacyKafkaConsumer and AsyncKafkaConsumer) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > The {{LegacyKafkaConsumer}} and {{AsyncKafkaConsumer}} implementations have a > fundamental difference in their timing when it relates to their use of the > {{Timer}} that is supplied. > h3. tl;dr > For time-bounded tasks, {{LegacyKafkaConsumer}} does the following: > # Attempt the operation > # If successful, return result > # Check timer, return if expired > # Go to top > {{AsyncKafkaConsumer}} effectively does the inverse: > # Check timer, return if expired > # Attempt the operation > # If successful, return result > # Go to top > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via the > {{KafkaConsumerTest}} unit tests, e.g. > {{verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit()}}. This test > invokes {{consumer.poll(Duration.ofMillis(0))}}, i.e. with a zero-length > timeout. > As part of the {{poll()}} logic, the consumer may need to refresh offsets. To > accomplish this, {{LegacyKafkaConsumer}} uses the > {{ConsumerCoordinator.fetchCommittedOffsets()}} method which is structured > like this: > {code:java} > public void fetchCommittedOffsets(Set partitions, Timer > timer) { > do { > final RequestFuture> future = > sendOffsetFetchRequest(partitions); > . . . > client.poll(future, timer); > . . . > return future.value(); > } while (timer.notExpired()); > return null; > } > {code} > The {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { > Set initializingPartitions = > subscriptions.initializingPartitions(); > try { > OffsetFetchApplicationEvent event = new > OffsetFetchApplicationEvent(initializingPartitions); > Map offsets = > applicationEventHandler.addAndGet(event, timer); > . . . > return true; > } catch (TimeoutException e) { > return false; > } > } > {code} > The call to {{addAndGet}} enqueues the operation on the queue for the network > I/O thread and then _immediately_ invokes {{Future.get()}} with the timeout > to implement a time-bounded blocking call. {{Future.get()}} will be passed > the value of {{0}} (from the above call to {{poll(0)}}, and _immediately_ > throw a {{TimeoutException}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]
ocadaruma opened a new pull request, #14242: URL: https://github.com/apache/kafka/pull/14242 JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-15046 - While any blocking operation under holding the UnifiedLog.lock could lead to serious performance (even availability) issues, currently there are several paths that calls fsync(2) inside the lock * In the meantime the lock is held, all subsequent produces against the partition may block * This easily causes all request-handlers to be busy on bad disk performance * Even worse, when a disk experiences tens of seconds of glitch (it's not rare in spinning drives), it makes the broker to unable to process any requests with unfenced from the cluster (i.e. "zombie" like status) - This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls performed under the lock: * (1) ProducerStateManager.takeSnapshot at UnifiedLog.roll - I moved fsync(2) call to the scheduler thread as part of existing "flush-log" job (before incrementing recovery point) - Since it's still ensured that the snapshot is flushed before incrementing recovery point, this change shouldn't cause any problem * (2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log segment deletion - This method calls `Utils.atomicMoveWithFallback` with `needFlushParentDir = true` internally, which calls fsync. - I changed it to call `Utils.atomicMoveWithFallback` with `needFlushParentDir = false` (which is consistent behavior with index files deletion. index files deletion also doesn't flush parent dir) - This change shouldn't cause problems neither. * (3) LeaderEpochFileCache.truncateFromStart when incrementing log-start-offset - This path is called from deleteRecords on request-handler threads. - Here, we don't need fsync(2) either actually. - On unclean shutdown, few leader epochs might be remained in the file but it will be [handled by LogLoader](https://github.com/apache/kafka/blob/3f4816dd3eafaf1a0636d3ee689069f897c99e28/core/src/main/scala/kafka/log/LogLoader.scala#L185) on start-up so not a problem * (4) LeaderEpochFileCache.truncateFromEnd as part of log truncation - Likewise, we don't need fsync(2) here, since any epochs which are untruncated on unclean shutdown will be handled on log loading procedure - Please refer JIRA ticket for the further details and the performance experiment result To check if these changes don't cause a problem, below consistency expectation table will be helpful: | No | File | Consistency expectation | Description | Note | |:---|:--|:|:-|:-| | 1 | ProducerStateSnapshot | Snapshot files on the disk before the recovery point should be consistent with the log segments | - On restart after unclean shutdown, Kafka will skip the snapshot recovery procedure before the recovery point.- If the snapshot content before recovery point is not consistent with the log, it will cause a problem like idempotency violation due to the missing producer state. | Hence, the inconsistency after the recovery point is acceptable because it will be recovered to the consistent state on the log loading procedure | | 2 | ProducerStateSnapshot | Deleted snapshot files on the disk should be eventually consistent with log segments | - On log segment deletion by any reasons (e.g. retention, topic deletion), corresponding snapshot files will be deleted.- Even when the broker crashes by power
Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]
ocadaruma commented on PR #14242: URL: https://github.com/apache/kafka/pull/14242#issuecomment-1823612335 closing once to rebuild -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]
kirktrue commented on PR #14768: URL: https://github.com/apache/kafka/pull/14768#issuecomment-1823610187 > Even if there was KIP for this change, we cannot put it into a minor release, but need to wait for 4.0. Is there any preparatory steps we need to take now (i.e. in the 3.7 release time frame) to make sure we are free to make this change in 4.0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]
kirktrue commented on code in PR #14768: URL: https://github.com/apache/kafka/pull/14768#discussion_r1402784590 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -150,11 +150,9 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { LogContext logContext = createLogContext(config, groupRebalanceConfig); this.log = logContext.logger(getClass()); boolean enableAutoCommit = config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG); -groupId.ifPresent(groupIdStr -> { -if (groupIdStr.isEmpty()) { -log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); -} -}); + +if (this.groupId.isPresent() && this.groupId.get().isEmpty()) +throw new InvalidGroupIdException("The configured group.id should not be an empty string or whitespace"); Review Comment: Fair enough. I'll back out the error case for `LegacyKafkaConsumer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]
mjsax commented on PR #14768: URL: https://github.com/apache/kafka/pull/14768#issuecomment-1823600766 Even if there was KIP for this change, we cannot put it into a minor release, but need to wait for 4.0. This change would have been ok for 3.0, but this ship has sailed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]
mjsax commented on code in PR #14768: URL: https://github.com/apache/kafka/pull/14768#discussion_r1402777990 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -150,11 +150,9 @@ public class LegacyKafkaConsumer implements ConsumerDelegate { LogContext logContext = createLogContext(config, groupRebalanceConfig); this.log = logContext.logger(getClass()); boolean enableAutoCommit = config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG); -groupId.ifPresent(groupIdStr -> { -if (groupIdStr.isEmpty()) { -log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); -} -}); + +if (this.groupId.isPresent() && this.groupId.get().isEmpty()) +throw new InvalidGroupIdException("The configured group.id should not be an empty string or whitespace"); Review Comment: I don't think we can make this change in a minor release, but we need to wait for 4.0. It's break compatibility, and we are not allowed to break it in a minor release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]
mjsax commented on code in PR #14768: URL: https://github.com/apache/kafka/pull/14768#discussion_r1402777467 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -163,11 +163,9 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); this.log = logContext.logger(getClass()); -groupId.ifPresent(groupIdStr -> { -if (groupIdStr.isEmpty()) { -log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); -} -}); + +if (this.groupId.isPresent() && this.groupId.get().isEmpty()) +throw new InvalidGroupIdException("The configured group.id should not be an empty string or whitespace"); Review Comment: This the change apply to the old protocol? If yes, I don't think we can make this change in a minor release, but we need to wait for 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
soarez commented on PR #14820: URL: https://github.com/apache/kafka/pull/14820#issuecomment-1823599261 @cmccabe @pprovenzano please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]
mjsax commented on code in PR #14768: URL: https://github.com/apache/kafka/pull/14768#discussion_r1402777467 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -163,11 +163,9 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); this.log = logContext.logger(getClass()); -groupId.ifPresent(groupIdStr -> { -if (groupIdStr.isEmpty()) { -log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); -} -}); + +if (this.groupId.isPresent() && this.groupId.get().isEmpty()) +throw new InvalidGroupIdException("The configured group.id should not be an empty string or whitespace"); Review Comment: This the change apply to the old protocol? If yes, I don't think we can make this change in a minor release, but we need to wait for 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
[ https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15768: Description: Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect `IllegalArgumentException` if any result is a `FailedQueryResult` (and even if there is only a single FailedQueryResult). The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly (and incorrectly) calls `getResult`. Given the semantics of `getOnlyPartitionResult` we should not care if the result is SuccessQueryResult or FailedQueryResult, but only check if there is a single result or not. (The user has no means to avoid getting the underlying error otherwise.) Side-note: why does `FailedQueryResult#getResult` throw an IllegalArgumentException (there is no argument passed into the method – it should rather be an `IllegalStateException` – but I guess we would need a KIP for this fix?) was: Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect `IllegalArgumentException` if any result is a `FailedQueryResult` (and even if there is only a single FailedQueryResult). The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly (and incorrectly) calls `getResult`. Given the semantics of `getOnlyPartitionResult` we should not care if the result is SuccessQueryResult or FailedQueryResult, but only check if there is a single result or not. (The user has not means to avoid getting an exception otherwise.) Side-note: why does `FailedQueryResult#getResult` throw an IllegalArgumentException (there is no argument passed into the method – it should rather be an `IllegalStateException` – but I guess we would need a KIP for this fix?) > StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult > -- > > Key: KAFKA-15768 > URL: https://issues.apache.org/jira/browse/KAFKA-15768 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Hanyu Zheng >Priority: Major > > Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect > `IllegalArgumentException` if any result is a `FailedQueryResult` (and even > if there is only a single FailedQueryResult). > The issue is the internal `filter(r -> r.getResult() != 0)` step, that > blindly (and incorrectly) calls `getResult`. > Given the semantics of `getOnlyPartitionResult` we should not care if the > result is SuccessQueryResult or FailedQueryResult, but only check if there is > a single result or not. (The user has no means to avoid getting the > underlying error otherwise.) > Side-note: why does `FailedQueryResult#getResult` throw an > IllegalArgumentException (there is no argument passed into the method – it > should rather be an `IllegalStateException` – but I guess we would need a KIP > for this fix?) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
[ https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15768: Description: Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect `IllegalArgumentException` if any result is a `FailedQueryResult` (and even if there is only a single FailedQueryResult). The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly (and incorrectly) calls `getResult`. Given the semantics of `getOnlyPartitionResult` we should not care if the result is SuccessQueryResult or FailedQueryResult, but only check if there is a single result or not. (The user has not means to avoid getting an exception otherwise.) Side-note: why does `FailedQueryResult#getResult` throw an IllegalArgumentException (there is no argument passed into the method – it should rather be an `IllegalStateException` – but I guess we would need a KIP for this fix?) was: Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect `IllegalArgumentException` if the any result is a `FailedQueryResult` (and even if there is only a single FailedQueryResult). The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly (and incorrectly) calls `getResult`. Given the semantics of `getOnlyPartitionResult` we should not care if the result is SuccessQueryResult or FailedQueryResult, but only check if there is a single result or not. (The user has not means to avoid getting an exception otherwise.) Side-note: why does `FailedQueryResult#getResult` throw an IllegalArgumentException (there is no argument passed into the method – it should rather be an `IllegalStateException` – but I guess we would need a KIP for this fix?) > StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult > -- > > Key: KAFKA-15768 > URL: https://issues.apache.org/jira/browse/KAFKA-15768 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Hanyu Zheng >Priority: Major > > Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect > `IllegalArgumentException` if any result is a `FailedQueryResult` (and even > if there is only a single FailedQueryResult). > The issue is the internal `filter(r -> r.getResult() != 0)` step, that > blindly (and incorrectly) calls `getResult`. > Given the semantics of `getOnlyPartitionResult` we should not care if the > result is SuccessQueryResult or FailedQueryResult, but only check if there is > a single result or not. (The user has not means to avoid getting an exception > otherwise.) > Side-note: why does `FailedQueryResult#getResult` throw an > IllegalArgumentException (there is no argument passed into the method – it > should rather be an `IllegalStateException` – but I guess we would need a KIP > for this fix?) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available
[ https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15817: -- Fix Version/s: 3.7.0 3.6.1 > Avoid reconnecting to the same IP address if multiple addresses are available > - > > Key: KAFKA-15817 > URL: https://issues.apache.org/jira/browse/KAFKA-15817 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Bob Barrett >Assignee: Bob Barrett >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS > resolution behavior for clients to re-resolve DNS after disconnecting from a > broker, rather than wait until we iterated over all addresses from a given > resolution. This is useful when the IP addresses have changed between the > connection and disconnection. > However, with the behavior change, this does mean that clients could > potentially reconnect immediately to the same IP they just disconnected from, > if the IPs have not changed. In cases where the disconnection happened > because that IP was unhealthy (such as a case where a load balancer has > instances in multiple availability zones and one zone is unhealthy, or a case > where an intermediate component in the network path is going through a > rolling restart), this will delay the client successfully reconnecting. To > address this, clients should remember the IP they just disconnected from and > skip that IP when reconnecting, as long as the address resolved to multiple > addresses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix broken method link in DistributedHerder::writeToConfigTopicAsLeader Javadoc [kafka]
yashmayya merged PR #14824: URL: https://github.com/apache/kafka/pull/14824 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14552) Remove no longer required server protocol versions in Kafka 4.0
[ https://issues.apache.org/jira/browse/KAFKA-14552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788891#comment-17788891 ] Colin McCabe commented on KAFKA-14552: -- I could go either way. I think most of the configuration key removals are "implied" by other KIPs (or sometimes stated directly there) but I thought it would be good to gather them somewhere. > Remove no longer required server protocol versions in Kafka 4.0 > --- > > Key: KAFKA-14552 > URL: https://issues.apache.org/jira/browse/KAFKA-14552 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Blocker > Fix For: 4.0.0 > > > Kafka 4.0 will remove support for zk mode and kraft mode became production > ready in Kafka 3.3. Furthermore, migration from zk mode to kraft mode will > require upgrading to the bridge release first (likely 3.5, but could also be > 3.6). > This provides an opportunity to remove exclusively server side protocols > versions that only exist to allow direct upgrades from versions older than > 3.n where n is either 0 (KRaft preview), 3 (KRaft production ready) or 5 > (bridge release). We should decide on the right `n` and make the change as > part of 4.0. > Note that this is complementary to the protocols that will be completely > removed as part of zk mode removal. Step one would be to create a list of > protocols that will be completely removed due to zk mode removal and the list > of exclusively server side protocols remaining after that (one example is > ControlledShutdown). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15888: DistributedHerder log context should not use the same client ID for each Connect worker by default [kafka]
C0urante commented on PR #14825: URL: https://github.com/apache/kafka/pull/14825#issuecomment-1823468350 Would it be worth it to keep the `connect-` prefix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]
scholzj commented on PR #14756: URL: https://github.com/apache/kafka/pull/14756#issuecomment-1823431704 Thanks for the review comments @viktorsomogyi. They should be now fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
apoorvmittal10 opened a new pull request, #14632: URL: https://github.com/apache/kafka/pull/14632 The PR adds support of alter/describe configs for client-metrics as defined in [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-kafka-configs.sh) Below are the results of the commands: Help section adds details for `client-metrics`: ``` ./bin/kafka-configs.sh --help This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics . . . --add-config Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1, k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity-type 'topics': . . . controller_mutation_rate producer_byte_rate request_percentage For entity-type 'clients': consumer_byte_rate controller_mutation_rate producer_byte_rate request_percentage For entity-type 'ips': connection_creation_rate For entity-type 'client-metrics': interval.ms match metrics Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --entity-type Type of entity (topics/clients/users/brokers/broker- loggers/ips/client-metrics) --entity-name Name of entity (topic name/client id/user principal name/broker id/ip/client metrics subscription name) ``` Incorrect entity type: ``` ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type client-metrics1 --describe --entity-name METRICSUB Invalid entity type client-metrics1, the entity type must be one of topics, clients, users, brokers, ips, client-metrics, broker-loggers with a --bootstrap-server or --bootstrap-controller argument ``` Describe wihout entity name: ``` ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type client-metrics --describe an entity name must be specified with --describe of client-metrics ``` Describe with blank entity name: ``` ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type client-metrics --describe --entity-name "" an entity name must be specified with --describe of client-metrics ``` Invalid entity name. Omitted to throw exception as the describe response is further needed in alter to construct if the new subscription to be added or altered. ``` ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type client-metrics --describe --entity-name "random" Dynamic configs for client-metric random are: ``` Successful alter of `client-metrics`: ``` ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type client-metrics --entity-name METRICSUB --add-config "metrics=org.apache.kafka.consumer.,interval.ms=6,match=[client_software_name=kafka.python,client_software_version=1\.2\..*]" Completed updating config for client-metric METRICSUB. ``` Successful describe of `client-metrics`: ``` ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type client-metrics --describe --entity-name METRICSUB Dynamic configs for client-metric METRICSUB are: interval.ms=6 sensitive=false synonyms={} match=client_software_name=kafka.python,client_software_version=1\.2\..* sensitive=false synonyms={} metrics=org.apache.kafka.consumer.
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
apoorvmittal10 closed pull request #14632: KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) URL: https://github.com/apache/kafka/pull/14632 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
jolshan commented on PR #14789: URL: https://github.com/apache/kafka/pull/14789#issuecomment-1823374923 I realized i didn't cherrypick to 3.6.1. I can do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15888: DistributedHerder log context should not use the same client ID for each Connect worker by default [kafka]
yashmayya opened a new pull request, #14825: URL: https://github.com/apache/kafka/pull/14825 - https://issues.apache.org/jira/browse/KAFKA-15888 - By default, if there is no `client.id` configured on a Connect worker running in distributed mode, the same client ID (`connect-1`) will be used in the log context for the `DistributedHerder` class in every single worker in the Connect cluster. - This default is quite confusing and obviously not very useful. - Further, based on how this default is configured - https://github.com/apache/kafka/blob/150b0e8290cda57df668ba89f6b422719866de5a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L299 it seems like this might have been an unintentional bug (the static `AtomicInteger` is incremented in the `DistributedHerder`'s constructor, but we're only going to initialize a single `DistributedHerder` per worker JVM process). - This patch changes the default to simply use the `workerId` (the advertised host name and port of the worker) instead, which should be unique for each worker in a cluster. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15888) DistributedHerder log context should not use the same client ID for each Connect worker by default
Yash Mayya created KAFKA-15888: -- Summary: DistributedHerder log context should not use the same client ID for each Connect worker by default Key: KAFKA-15888 URL: https://issues.apache.org/jira/browse/KAFKA-15888 Project: Kafka Issue Type: Bug Components: connect, KafkaConnect Reporter: Yash Mayya Assignee: Yash Mayya By default, if there is no "{{{}client.id"{}}} configured on a Connect worker running in distributed mode, the same client ID ("connect-1") will be used in the log context for the DistributedHerder class in every single worker in the Connect cluster. This default is quite confusing and obviously not very useful. Further, based on how this default is configured ([ref|https://github.com/apache/kafka/blob/150b0e8290cda57df668ba89f6b422719866de5a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L299]), it seems like this might have been an unintentional bug. We could simply use the workerId (the advertised host name and port of the worker) by default instead, which should be unique for each worker in a cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Fix broken method link in DistributedHerder::writeToConfigTopicAsLeader Javadoc [kafka]
yashmayya opened a new pull request, #14824: URL: https://github.com/apache/kafka/pull/14824 - The `ConfigBackingStore::putConnectorConfig` method was modified in https://github.com/apache/kafka/pull/14704 to take in a third parameter (`TargetState`). - This breaks the method link in the doc comment for `DistributedHerder::writeToConfigTopicAsLeader`. It doesn't break the build since we don't generate public Javadocs for this class, but it does affect things like code navigation in IDEs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15887) Autocommit during close consistently fails with exception in background thread
Lucas Brutschy created KAFKA-15887: -- Summary: Autocommit during close consistently fails with exception in background thread Key: KAFKA-15887 URL: https://issues.apache.org/jira/browse/KAFKA-15887 Project: Kafka Issue Type: Sub-task Reporter: Lucas Brutschy Assignee: Philip Nee when I run {{AsyncKafkaConsumerTest}} I get this every time I call close: {code:java} java.lang.IndexOutOfBoundsException: Index: 0 at java.base/java.util.Collections$EmptyList.get(Collections.java:4483) at java.base/java.util.Collections$UnmodifiableList.get(Collections.java:1310) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.findCoordinatorSync(ConsumerNetworkThread.java:302) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.ensureCoordinatorReady(ConsumerNetworkThread.java:288) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.maybeAutoCommitAndLeaveGroup(ConsumerNetworkThread.java:276) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.cleanup(ConsumerNetworkThread.java:257) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:101) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15860: ControllerRegistration must be written out to the metadata image [kafka]
cmccabe commented on code in PR #14807: URL: https://github.com/apache/kafka/pull/14807#discussion_r1402476918 ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -205,4 +207,22 @@ private static List getImageRecords(ClusterImage image) { image.write(writer, new ImageWriterOptions.Builder().build()); return writer.records(); } + +@Test +public void testHandleLossOfControllerRegistrations() { +ClusterImage testImage = new ClusterImage(Collections.emptyMap(), +Collections.singletonMap(1000, new ControllerRegistration.Builder(). +setId(1000). +setIncarnationId(Uuid.fromString("9ABu6HEgRuS-hjHLgC4cHw")). +setListeners(Collections.singletonMap("PLAINTEXT", +new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 19092))). +setSupportedFeatures(Collections.emptyMap()).build())); +RecordListWriter writer = new RecordListWriter(); +final AtomicReference lossString = new AtomicReference<>(""); +testImage.write(writer, new ImageWriterOptions.Builder(). +setMetadataVersion(MetadataVersion.IBP_3_6_IV2). +setLossHandler(loss -> lossString.compareAndSet("", loss.loss())). +build()); +assertEquals("controller registration data", lossString.get()); Review Comment: Both of the unit tests I added in this PR verify that a `ClusterImage` with controller registration generates a snapshot with `RegisterControllerRecords` if the metadata version is `3.7-IV0` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15860: ControllerRegistration must be written out to the metadata image [kafka]
jsancio commented on code in PR #14807: URL: https://github.com/apache/kafka/pull/14807#discussion_r1402453079 ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -205,4 +207,22 @@ private static List getImageRecords(ClusterImage image) { image.write(writer, new ImageWriterOptions.Builder().build()); return writer.records(); } + +@Test +public void testHandleLossOfControllerRegistrations() { +ClusterImage testImage = new ClusterImage(Collections.emptyMap(), +Collections.singletonMap(1000, new ControllerRegistration.Builder(). +setId(1000). +setIncarnationId(Uuid.fromString("9ABu6HEgRuS-hjHLgC4cHw")). +setListeners(Collections.singletonMap("PLAINTEXT", +new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 19092))). +setSupportedFeatures(Collections.emptyMap()).build())); +RecordListWriter writer = new RecordListWriter(); +final AtomicReference lossString = new AtomicReference<>(""); +testImage.write(writer, new ImageWriterOptions.Builder(). +setMetadataVersion(MetadataVersion.IBP_3_6_IV2). +setLossHandler(loss -> lossString.compareAndSet("", loss.loss())). +build()); +assertEquals("controller registration data", lossString.get()); Review Comment: Thanks for fixing this test. I think we are still missing a test that shows that a `ClusterImage` with controller registration generates a snapshot with `RegisterControllerRecords` if the metadata version is `3.7-IV0`. Can we add that test, if you agree? If not, can you point me to that test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]
viktorsomogyi commented on code in PR #14756: URL: https://github.com/apache/kafka/pull/14756#discussion_r1400948591 ## clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java: ## @@ -185,6 +188,18 @@ private SslEngineFactory createNewSslEngineFactory(Map newConfigs) { } } +private static boolean getBoolean(final Map configs, final String key, final boolean defaultValue) { +final Object value = configs.getOrDefault(key, defaultValue); +if (value instanceof Boolean) { +return (boolean) value; +} else if (value instanceof String) { +return Boolean.parseBoolean((String) value); +} else { +log.warn("Invalid value (" + value + ") on configuration '" + key + "'. Please specify a true/false value."); Review Comment: Please also mention that in this case the default value is used (and the default value itself), otherwise it reads a bit incomplete. ## clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java: ## @@ -185,6 +188,18 @@ private SslEngineFactory createNewSslEngineFactory(Map newConfigs) { } } +private static boolean getBoolean(final Map configs, final String key, final boolean defaultValue) { Review Comment: This is probably more like a utility-like method, would you please move it to `org.apache.kafka.common.utils.ConfigUtils` for now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error with Scala 2.12 [kafka]
mimaison commented on PR #14823: URL: https://github.com/apache/kafka/pull/14823#issuecomment-1823191335 cc @ijuma @cmccabe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult [kafka]
hanyuzheng7 commented on code in PR #14821: URL: https://github.com/apache/kafka/pull/14821#discussion_r1402442067 ## streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java: ## @@ -77,6 +79,12 @@ public QueryResult getOnlyPartitionResult() { "The query did not return exactly one partition result: " + partitionResults ); } else { +if (nonempty.isEmpty() && partitionResults.size() != 0 && partitionResults.get(0).isFailure()) { +FailureReason failureReason = partitionResults.get(0).getFailureReason(); +if (failureReason.equals(FailureReason.UNKNOWN_QUERY_TYPE)) { +return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "unknown query type"); +} Review Comment: I think we can use a for loop to count failure reason, if all of them are unknown query type, we can return `QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "unknown query type")` in the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error with Scala 2.12 [kafka]
mimaison commented on PR #14823: URL: https://github.com/apache/kafka/pull/14823#issuecomment-1823186116 The Java 8 Scala 2.12 build has been broken for 9 days since this backport https://github.com/apache/kafka/commit/1de84590c476d3ea0577cde2b4cd12e6584b8fe9 https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/3.6/112/pipeline/9/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] HOTFIX: Fix compilation error with Scala 2.12 [kafka]
mimaison opened a new pull request, #14823: URL: https://github.com/apache/kafka/pull/14823 Fixes [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.6/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:2646:51: the result type of an implicit conversion must be more specific than Object ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult [kafka]
hanyuzheng7 commented on code in PR #14821: URL: https://github.com/apache/kafka/pull/14821#discussion_r1402442067 ## streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java: ## @@ -77,6 +79,12 @@ public QueryResult getOnlyPartitionResult() { "The query did not return exactly one partition result: " + partitionResults ); } else { +if (nonempty.isEmpty() && partitionResults.size() != 0 && partitionResults.get(0).isFailure()) { +FailureReason failureReason = partitionResults.get(0).getFailureReason(); +if (failureReason.equals(FailureReason.UNKNOWN_QUERY_TYPE)) { +return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "unknown query type"); +} Review Comment: I think we can use a for loop to count failure reason, if all unknown query type, we can return `QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "unknown query type")` in the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15876: Introduce RemoteStorageNotReadyException retryable error [kafka]
kamalcph opened a new pull request, #14822: URL: https://github.com/apache/kafka/pull/14822 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult [kafka]
hanyuzheng7 opened a new pull request, #14821: URL: https://github.com/apache/kafka/pull/14821 Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect `IllegalArgumentException` if the any result is a `FailedQueryResult` (and even if there is only a single FailedQueryResult). The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly (and incorrectly) calls `getResult`. Given the semantics of `getOnlyPartitionResult` we should not care if the result is SuccessQueryResult or FailedQueryResult, but only check if there is a single result or not. (The user has not means to avoid getting an exception otherwise.) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15886) Always specify directories for new partition registrations
[ https://issues.apache.org/jira/browse/KAFKA-15886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-15886: --- Assignee: Igor Soarez > Always specify directories for new partition registrations > -- > > Key: KAFKA-15886 > URL: https://issues.apache.org/jira/browse/KAFKA-15886 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > When creating partition registrations directories must always be defined. > If creating a partition from a PartitionRecord or PartitionChangeRecord from > an older version that does not support directory assignments, then > DirectoryId.MIGRATING is assumed. > If creating a new partition, or triggering a change in assignment, > DirectoryId.UNASSIGNED should be specified, unless the target broker has a > single online directory registered, in which case the replica should be > assigned directly to that single directory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
soarez opened a new pull request, #14820: URL: https://github.com/apache/kafka/pull/14820 When creating partition registrations directories must always be defined. If creating a partition from a PartitionRecord or PartitionChangeRecord from an older version that does not support directory assignments, then DirectoryId.MIGRATING is assumed. If creating a new partition, or triggering a change in assignment, DirectoryId.UNASSIGNED should be specified, unless the target broker has a single online directory registered, in which case the replica should be assigned directly to that single directory. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment
[ https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788830#comment-17788830 ] Mickael Maison commented on KAFKA-15620: Removed 3.6.1 from the fix version list as the release script does not handle Jira resolved as duplicates properly. The original Jira, KAFKA-15479, has 3.6.1 listed in its fix versions. > Duplicate remote log DELETE_SEGMENT metadata is generated when there are > multiple leader epochs in the segment > -- > > Key: KAFKA-15620 > URL: https://issues.apache.org/jira/browse/KAFKA-15620 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.6.0 >Reporter: Henry Cai >Priority: Major > Fix For: 3.7.0 > > > Use the newly released 3.6.0, turn on tiered storage feature: > {*}remote.log.storage.system.enable{*}=true > 1. Set up topic tier5 to be remote storage enabled. Adding some data to the > topic and the data is copied to remote storage. After a few days when the > log segment is removed from remote storage due to log retention expiration, > noticed the following warnings in the server log: > [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: > [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, > eventTimestampMs=1697005926358, brokerId=1043}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log > segment. > (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore) > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No remote log segment metadata found for > :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id=YFNCaWjPQFSKCngQ1QcKpA} > at > org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2. After some debugging, realized the problem is *there are 2 sets of > DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this > segment. The DELETE_SEGMENT_FINISHED in the first set remove the segment > from the metadata cache and this caused the above exception when the > DELETE_SEGMENT_STARTED from the second set needs to be processed. > > 3. And traced the log to where the log retention kicked in and saw *there > were two delete log segment generated* at that time: > ``` > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > ``` > 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this > segment (which triggers the generation of the later DELETE_SEGMENT metadata): > [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: > [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , startOffset=6387830, endOffset=9578660, brokerId=1043, > maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, > segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] >
[jira] [Updated] (KAFKA-15876) Introduce Remote Storage Not Ready Exception
[ https://issues.apache.org/jira/browse/KAFKA-15876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15876: - Labels: kip (was: ) > Introduce Remote Storage Not Ready Exception > > > Key: KAFKA-15876 > URL: https://issues.apache.org/jira/browse/KAFKA-15876 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Labels: kip > > When tiered storage is enabled on the cluster, Kafka broker has to build the > remote log metadata for all the partitions that it is either leader/follower > on node restart. The remote log metadata is built in asynchronous fashion and > does not interfere with the broker startup path. Once the broker becomes > online, it cannot handle the client requests (FETCH and LIST_OFFSETS) to > access remote storage until the metadata gets built for those partitions. > Currently, we are returning a ReplicaNotAvailable exception back to the > client so that it will retry after sometime. > [ReplicaNotAvailableException|https://sourcegraph.com/github.com/apache/kafka@254335d24ab6b6d13142dcdb53fec3856c16de9e/-/blob/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java] > is applicable when there is a reassignment is in-progress and kind of > deprecated with the NotLeaderOrFollowerException > ([PR#8979|https://github.com/apache/kafka/pull/8979]). It's good to introduce > an appropriate retriable exception for remote storage errors to denote that > it is not ready to accept the client requests yet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15886) Always specify directories for new partition registrations
Igor Soarez created KAFKA-15886: --- Summary: Always specify directories for new partition registrations Key: KAFKA-15886 URL: https://issues.apache.org/jira/browse/KAFKA-15886 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez When creating partition registrations directories must always be defined. If creating a partition from a PartitionRecord or PartitionChangeRecord from an older version that does not support directory assignments, then DirectoryId.MIGRATING is assumed. If creating a new partition, or triggering a change in assignment, DirectoryId.UNASSIGNED should be specified, unless the target broker has a single online directory registered, in which case the replica should be assigned directly to that single directory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment
[ https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-15620: --- Fix Version/s: (was: 3.6.1) > Duplicate remote log DELETE_SEGMENT metadata is generated when there are > multiple leader epochs in the segment > -- > > Key: KAFKA-15620 > URL: https://issues.apache.org/jira/browse/KAFKA-15620 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.6.0 >Reporter: Henry Cai >Priority: Major > Fix For: 3.7.0 > > > Use the newly released 3.6.0, turn on tiered storage feature: > {*}remote.log.storage.system.enable{*}=true > 1. Set up topic tier5 to be remote storage enabled. Adding some data to the > topic and the data is copied to remote storage. After a few days when the > log segment is removed from remote storage due to log retention expiration, > noticed the following warnings in the server log: > [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: > [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, > eventTimestampMs=1697005926358, brokerId=1043}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log > segment. > (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore) > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > No remote log segment metadata found for > :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id=YFNCaWjPQFSKCngQ1QcKpA} > at > org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89) > at > org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157) > at > org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133) > at java.base/java.lang.Thread.run(Thread.java:829) > > 2. After some debugging, realized the problem is *there are 2 sets of > DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this > segment. The DELETE_SEGMENT_FINISHED in the first set remove the segment > from the metadata cache and this caused the above exception when the > DELETE_SEGMENT_STARTED from the second set needs to be processed. > > 3. And traced the log to where the log retention kicked in and saw *there > were two delete log segment generated* at that time: > ``` > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 > partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment > RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, > id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach > based on the largest record timestamp in the segment > (kafka.log.remote.RemoteLogManager$RLMTask) > ``` > 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this > segment (which triggers the generation of the later DELETE_SEGMENT metadata): > [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: > [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} > , startOffset=6387830, endOffset=9578660, brokerId=1043, > maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, > segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] > (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) > > You can see there are 2 leader epochs in this segment: > *segmentLeaderEpochs=\{5=6387830, 6=6721329}* > > 5. From the remote log retention code >
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
kamalcph commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1823109337 > The first one is that I no longer see a point in segment.bytes and segment.ms (and by extension log.segment.bytes and log.segment.ms) with respect to tiered topics. If a person says "hey, I only want 4GB of data or data from the last 10 minutes around" then why would they ever need to configure how often a segment should be closed? If this is the case shouldn't this be followed by ignoring those two properties for tiered topics? `segment.bytes` will take effect when the topic has continuous inflow of data. If the user configures `segment.bytes` to 1 GB and `segment.ms` to 1 day and the partition has a bytes-in load of 1 MB/sec, then the segment gets filled in ~40 mins and gets rotated to passive. `segment.ms` will take effect when the topic has continuous inflow of data. If the user configures `segment.bytes` to 1 GB and `segment.ms` to 1 day and the partition has a bytes-in load of 100 bytes/sec, then the segment won't be filled and gets rotated to passive after 1 day. In this patch, we are trying to handle the case where the topic has some data in the active segment but doesn't have continuous inflow of data. So, both the `segment.ms` and `segment.bytes` configs are applicable for tiered storage topics. > The second one is that you will be changing the definition of local.retention. Prior to this change it meant that closed segments will be served from local disk for at most this much size or time as long as they have been moved to tiered storage. Now it will mean that anything beyond this size and time will be found only in tiered storage. Isn't this a "public facing change" and thus requiring some announcements? This was the original plan and inline with the local-log segments. If a topic was deprecated and won't be having any more payload in future, the user will expect that all the data in that topic will be removed post the retention time, otherwise it can fail to meet certain compliance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]
soarez commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1402362044 ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -231,13 +238,12 @@ public void run() throws Exception { } else { failedAttempts = 0; AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data(); + Set failed = filterFailures(data, inflight); +Set completed = Utils.diff(HashSet::new, inflight.values().stream().collect(Collectors.toSet()), failed); +completed.forEach(assignmentEvent -> assignmentEvent.callback.run()); + log.warn("Re-queueing assignments: {}", failed); -if (!failed.isEmpty()) { -for (AssignmentEvent event : failed) { -pending.put(event.partition, event); -} -} Review Comment: I think we need to requeue these, did this not break the tests in AssignmentsManagerTest? ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + // Visible for testing + def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: DirectoryEventRequestState): Unit = { +assignmentRequestStates.put(topicPartition, state) + } + private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { +val partitionRequestState = Option(assignmentRequestStates.get(topicPartition)) +val topicId = partition.topicId +if (topicId.isEmpty) + throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists but its ID doesn't exist.") + +partitionRequestState match { + case None => +// Schedule assignment request and don't promote the future replica yet until the controller accepted the request. +partition.maybeFutureReplicaCaughtUp(_ => { + partition.futureReplicaDirectoryId() +.map { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), _, +updatedAssignmentRequestStat(topicPartition)(_)) +} +}) + case Some(DirectoryEventRequestState.COMPLETED) => +// Promote future replica if controller accepted the request and the replica caught-up with the original log. +if (partition.maybeReplaceCurrentWithFutureReplica()) { + removePartitions(Set(topicPartition)) + assignmentRequestStates.remove(topicPartition) +} + case _ => Review Comment: Looks better. Thanks ## server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java: ## @@ -19,13 +19,16 @@ import org.apache.kafka.common.Uuid; +import java.util.function.Consumer; Review Comment: Looks like this is unused. Did you run checkstyle? ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -76,13 +83,69 @@ class ReplicaAlterLogDirsThread(name: String, futureLog.updateHighWatermark(partitionData.highWatermark) futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) -if (partition.maybeReplaceCurrentWithFutureReplica()) - removePartitions(Set(topicPartition)) +directoryEventHandler match { + case DirectoryEventHandler.NOOP => +if (partition.maybeReplaceCurrentWithFutureReplica()) + removePartitions(Set(topicPartition)) + case _ => +maybePromoteFutureReplica(topicPartition, partition) +} quota.record(records.sizeInBytes) logAppendInfo } + override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { +// Schedule assignment request to revert any queued request before cancelling +for { + topicPartition <- topicPartitions + partitionState <- partitionAssignmentRequestState(topicPartition) + if partitionState == QUEUED + partition = replicaMgr.getPartitionOrException(topicPartition) + topicId <- partition.topicId + logId <- partition.logDirectoryId()
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
kamalcph commented on PR #14787: URL: https://github.com/apache/kafka/pull/14787#issuecomment-1823088481 @satishd Addressed your review comments. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix a few reconciliation bugs [kafka]
dajac opened a new pull request, #14819: URL: https://github.com/apache/kafka/pull/14819 Requires changes to make the reconciliation logic in the async consumer work. With this, I was able to join a consumer groups with a few members. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1402217667 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -829,21 +902,50 @@ private CoordinatorResult consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); -final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); -throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - -if (memberEpoch == 0) { -log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +ConsumerGroupMember updatedMember; +boolean staticMemberReplaced = false; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); +if (createIfNotExists) { +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +} +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} else { +member = group.staticMember(instanceId); +throwIfStaticMemberValidationFails(groupId, instanceId, member, memberEpoch, memberId); +if (member == null) { +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +} +throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); +if (createIfNotExists) { +log.info("[GroupId {}] Member {}, Instance-Id {} joins the consumer group.", groupId, memberId, instanceId); +} +staticMemberReplaced = staticMemberReplaced(memberEpoch, member); +if (staticMemberReplaced) { +removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); +// The replacing member gets the same set of assignments as the departed member. +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()) + .setPartitionsPendingAssignment(member.partitionsPendingAssignment()) + .setPartitionsPendingRevocation(member.partitionsPendingRevocation()); Review Comment: If I don't set `setAssignedPartitions`, then the replacing static member doesn't get it's assignments back. The pending assignments bit, I added just to ensure all assignments from previous member are assigned to the new member. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
vamossagar12 commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1402215662 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -898,31 +1000,45 @@ private CoordinatorResult consumerGr group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); } -// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The -// delta between the existing and the new target assignment is persisted to the partition. +// 2. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member +// replaces an existing static member. The delta between the existing and the new target assignment is persisted to the partition. int targetAssignmentEpoch = group.assignmentEpoch(); Assignment targetAssignment = group.targetAssignment(memberId); -if (groupEpoch > targetAssignmentEpoch) { +if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) { String preferredServerAssignor = group.computePreferredServerAssignor( member, updatedMember ).orElse(defaultAssignor.name()); try { -TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = -new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)) +TargetAssignmentBuilder assignmentResultBuilder = +new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor)); +TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; +// A new static member is replacing an older one with the same subscriptions. +// We just need to remove the older member and add the newer one. The new member can +// reuse the target assignment of the older member. +if (staticMemberReplaced && groupEpoch == targetAssignmentEpoch) { Review Comment: Actually, `groupEpoch == targetAssignmentEpoch` is not needed. I was just trying to ensure that the group epoch and target member epoch are the same which is what will happen when static member is replaced. So, in a way it's redundant. I will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1402179874 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + +public static final String DOMAIN = "org.apache.kafka"; +// Client metrics tags +public static final String CLIENT_RACK = "client_rack"; +public static final String GROUP_ID = "group_id"; +public static final String GROUP_INSTANCE_ID = "group_instance_id"; +public static final String TRANSACTIONAL_ID = "transactional_id"; + +private static final Map PRODUCER_CONFIG_MAPPING = new HashMap<>(); +private static final Map CONSUMER_CONFIG_MAPPING = new HashMap<>(); + +private Resource resource = null; +private Map config = null; + +static { +PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ClientTelemetryProvider.TRANSACTIONAL_ID); + +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, ClientTelemetryProvider.GROUP_ID); +CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ClientTelemetryProvider.GROUP_INSTANCE_ID); +} + +@Override +public synchronized void configure(Map configs) { +this.config = configs; +} + +/** + * Validate that all the data required for generating correct metrics is present. The provider + * will be disabled if validation fails. + * + * @param metricsContext {@link MetricsContext} + * @return false if all the data required for generating correct metrics is missing, true + * otherwise. + */ +public boolean validate(MetricsContext metricsContext, Map config) { +// metric collection will be disabled for clients without a client id (e.g. transient admin clients) +return ClientTelemetryUtils.validateResourceLabel(config, CommonClientConfigs.CLIENT_ID_CONFIG) && Review Comment: Though the property `client.id` is optional but ProducerConfig and ConsumerConfig generates a client.id if not defined, which is not the case in AdminClient. Hence the admin client created from CLI (i.e. KafkaConfigs.sh, etc.) should not report any metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15885) Reduce lock contention when cleaning topics
Krzysztof Piecuch created KAFKA-15885: - Summary: Reduce lock contention when cleaning topics Key: KAFKA-15885 URL: https://issues.apache.org/jira/browse/KAFKA-15885 Project: Kafka Issue Type: Improvement Components: log cleaner Reporter: Krzysztof Piecuch Somewhat similar to KAFKA-14213, there are a couple of subroutines which require the same lock which results in throttling compaction speed and limits parallelism. There's a couple of problems here: # LogCleanerManager.grabFilthiestCompactedLog - iterates through a list of partitions multiple times, all of this while holding a lock # LogCleanerManager.grabFilthiestCompactedLog doesn't cache anything and returns only 1 item at a time - method is issued every time a cleaner thread asks for a new partition to compact # LogCleanerManager.checkCleaningAborted - a quick check which: ## shares a lock with grabFilthiestCompactedLog ## is executed every time a LogCleaner reads bufsize data to compact # LogCleaner's bufsize is limited to 1G / (number of log cleaner threads) Here's the scenario where this design falls short: * I have 15k partitions * all of which need to be compacted fairly often but it doesn't take a lot of time to compact them * Most of the cputime spent by cleaner threads is spent on grabFilthiestCompactedLog ** so the other cleaners can't do anything since they need to acquire a lock to read data to compact as per 3.1. and 3.2. ** because of 4. log cleaners run out of work to do as soon as grabFilthiestLog is called * Negative performance scaling - increasing # of log cleaner threads decreases log cleaner's bufsize which makes them hammer the lock mentioned in 3.1. and 3.2. more often I suggest: * making LogCleanerManager to use more fine-grained locking (ex. RW lock for checkCleaningAborted data structures) to decrease the effect of negative performance scaling * making LogCleanerManager.grabFilthiestLog faster on average: ** we don't need grabFilthiestLog to be 100% accurate ** we can try caching candidates for "filthiestLog" and re-calculate the cache every 1 minute or so. ** change the algorithm to probabilistic sampling (get 100 topics and pick the worst one?) or even round-robin * Alternatively we could make LogCleaner's bufsize to allow values higher -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15884) Reduce lock contention when cleaning topics
Krzysztof Piecuch created KAFKA-15884: - Summary: Reduce lock contention when cleaning topics Key: KAFKA-15884 URL: https://issues.apache.org/jira/browse/KAFKA-15884 Project: Kafka Issue Type: Improvement Components: log cleaner Reporter: Krzysztof Piecuch Somewhat similar to KAFKA-14213, there are a couple of subroutines which require the same lock which results in throttling compaction speed and limits parallelism. There's a couple of problems here: # LogCleanerManager.grabFilthiestCompactedLog - iterates through a list of partitions multiple times, all of this while holding a lock # LogCleanerManager.grabFilthiestCompactedLog doesn't cache anything and returns only 1 item at a time - method is issued every time a cleaner thread asks for a new partition to compact # LogCleanerManager.checkCleaningAborted - a quick check which: ## shares a lock with grabFilthiestCompactedLog ## is executed every time a LogCleaner reads bufsize data to compact # LogCleaner's bufsize is limited to 1G / (number of log cleaner threads) Here's the scenario where this design falls short: * I have 15k partitions * all of which need to be compacted fairly often but it doesn't take a lot of time to compact them * Most of the cputime spent by cleaner threads is spent on grabFilthiestCompactedLog ** so the other cleaners can't do anything since they need to acquire a lock to read data to compact as per 3.1. and 3.2. ** because of 4. log cleaners run out of work to do as soon as grabFilthiestLog is called * Negative performance scaling - increasing # of log cleaner threads decreases log cleaner's bufsize which makes them hammer the lock mentioned in 3.1. and 3.2. more often I suggest: * making LogCleanerManager to use more fine-grained locking (ex. RW lock for checkCleaningAborted data structures) to decrease the effect of negative performance scaling * making LogCleanerManager.grabFilthiestLog faster on average: ** we don't need grabFilthiestLog to be 100% accurate ** we can try caching candidates for "filthiestLog" and re-calculate the cache every 1 minute or so. ** change the algorithm to probabilistic sampling (get 100 topics and pick the worst one?) or even round-robin * Alternatively we could make LogCleaner's bufsize to allow values higher -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1402159178 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + +public static final String DOMAIN = "org.apache.kafka"; +// Client metrics tags +public static final String CLIENT_RACK = "client_rack"; +public static final String GROUP_ID = "group_id"; +public static final String GROUP_INSTANCE_ID = "group_instance_id"; +public static final String TRANSACTIONAL_ID = "transactional_id"; Review Comment: The KIP says `group member id (if any, consumer)`. I considered `if any` as not required label as that will not be present as standard consumer property rather I think broker generates same. Also I do not find `group member id` is exposed as label for consumer metrics. @AndrewJSchofield Please let me know if my understanding is incorrect or we can do better here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf
[ https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Olson updated KAFKA-7631: Fix Version/s: 2.7.0 > NullPointerException when SCRAM is allowed bu ScramLoginModule is not in > broker's jaas.conf > --- > > Key: KAFKA-7631 > URL: https://issues.apache.org/jira/browse/KAFKA-7631 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 2.0.0, 2.5.0 >Reporter: Andras Beni >Priority: Minor > Fix For: 2.7.0 > > Attachments: KAFKA-7631.patch > > > When user wants to use delegation tokens and lists {{SCRAM}} in > {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to > broker's JAAS configuration, a null pointer exception is thrown on broker > side and the connection is closed. > Meaningful error message should be logged and sent back to the client. > {code} > java.lang.NullPointerException > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:427) > at kafka.network.Processor.poll(SocketServer.scala:679) > at kafka.network.Processor.run(SocketServer.scala:584) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf
[ https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Olson resolved KAFKA-7631. - Resolution: Fixed Marking as resolved since I believe it is. > NullPointerException when SCRAM is allowed bu ScramLoginModule is not in > broker's jaas.conf > --- > > Key: KAFKA-7631 > URL: https://issues.apache.org/jira/browse/KAFKA-7631 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 2.0.0, 2.5.0 >Reporter: Andras Beni >Priority: Minor > Fix For: 2.7.0 > > Attachments: KAFKA-7631.patch > > > When user wants to use delegation tokens and lists {{SCRAM}} in > {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to > broker's JAAS configuration, a null pointer exception is thrown on broker > side and the connection is closed. > Meaningful error message should be logged and sent back to the client. > {code} > java.lang.NullPointerException > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:427) > at kafka.network.Processor.poll(SocketServer.scala:679) > at kafka.network.Processor.run(SocketServer.scala:584) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14767: URL: https://github.com/apache/kafka/pull/14767#discussion_r1402135807 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -913,6 +915,13 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me reporters.forEach { reporter => metrics.addReporter(reporter) currentReporters += reporter.getClass.getName -> reporter + val clientTelemetryReceiver = reporter match { Review Comment: Changes not required now, and the file shall be removed when merged with open PR: https://github.com/apache/kafka/pull/14699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf
[ https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788786#comment-17788786 ] Andrew Olson commented on KAFKA-7631: - This looks like a duplicate of KAFKA-10556 (fixed in 2.7.0). > NullPointerException when SCRAM is allowed bu ScramLoginModule is not in > broker's jaas.conf > --- > > Key: KAFKA-7631 > URL: https://issues.apache.org/jira/browse/KAFKA-7631 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 2.0.0, 2.5.0 >Reporter: Andras Beni >Priority: Minor > Attachments: KAFKA-7631.patch > > > When user wants to use delegation tokens and lists {{SCRAM}} in > {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to > broker's JAAS configuration, a null pointer exception is thrown on broker > side and the connection is closed. > Meaningful error message should be logged and sent back to the client. > {code} > java.lang.NullPointerException > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:427) > at kafka.network.Processor.poll(SocketServer.scala:679) > at kafka.network.Processor.run(SocketServer.scala:584) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15856: Add integration tests for JoinGroup API and SyncGroup API [kafka]
dongnuo123 commented on code in PR #14800: URL: https://github.com/apache/kafka/pull/14800#discussion_r1402122588 ## core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala: ## @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol +import org.apache.kafka.common.message.SyncGroupRequestData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import java.util.Collections +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSyncGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testSyncGroup() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSyncGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testSyncGroup() + } + + private def testSyncGroup(): Unit = { +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +for (version <- 0 to ApiKeys.SYNC_GROUP.latestVersion(isUnstableApiEnabled)) { + // Sync with unknown group id. + syncGroupWithOldProtocol( +groupId = "grp-unknown", +memberId = "member-id", +generationId = -1, +expectedProtocolType = null, +expectedProtocolName = null, +expectedError = Errors.UNKNOWN_MEMBER_ID, +version = version.toShort + ) + + val metadata = ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo")) + ).array + + // Join a dynamic member without member id. + // Prior to JoinGroup version 4, a new member is immediately added if it sends a join group request with UNKNOWN_MEMBER_ID. + val joinLeaderResponseData = sendJoinRequest( +groupId = "grp", +metadata = metadata + ) + val leaderMemberId = joinLeaderResponseData.memberId + + // Rejoin the group with the member id. + sendJoinRequest( +groupId = "grp", +memberId = leaderMemberId, +metadata = metadata + ) + + if (version >= 5) { +// Sync the leader with unmatched protocolName. +syncGroupWithOldProtocol( + groupId = "grp", + memberId = leaderMemberId, + generationId = 1, + protocolName = "unmatched", + assignments = List(new SyncGroupRequestData.SyncGroupRequestAssignment() +.setMemberId(leaderMemberId)
[jira] [Created] (KAFKA-15883) Implement RemoteCopyLagBytes
Christo Lolov created KAFKA-15883: - Summary: Implement RemoteCopyLagBytes Key: KAFKA-15883 URL: https://issues.apache.org/jira/browse/KAFKA-15883 Project: Kafka Issue Type: Sub-task Reporter: Christo Lolov Assignee: Christo Lolov -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Update LICENSE-binary for 3.6.1 [kafka]
mimaison merged PR #14812: URL: https://github.com/apache/kafka/pull/14812 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
apoorvmittal10 commented on PR #14632: URL: https://github.com/apache/kafka/pull/14632#issuecomment-1822777968 > @apoorvmittal10 : > > 1. Do you know why JDK 11 and Scala 2.13 didn't build? > 2. For getting green build, it would be useful to help triage the new test failures. If we could identify the PR that introduced the failure, we could ping the author for a fix. I ll try to find out some details regarding failing tests. Build for JDK11 sems stuck in publishing test result where the build went for more than 7 hours. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15882) Scheduled nightly github actions workflow for CVE reports on published docker images
Vedarth Sharma created KAFKA-15882: -- Summary: Scheduled nightly github actions workflow for CVE reports on published docker images Key: KAFKA-15882 URL: https://issues.apache.org/jira/browse/KAFKA-15882 Project: Kafka Issue Type: Sub-task Reporter: Vedarth Sharma Assignee: Vedarth Sharma This scheduled github actions workflow will check supported published docker images for CVEs and generate reports. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15881) Update release.py script to include docker image
Vedarth Sharma created KAFKA-15881: -- Summary: Update release.py script to include docker image Key: KAFKA-15881 URL: https://issues.apache.org/jira/browse/KAFKA-15881 Project: Kafka Issue Type: Sub-task Reporter: Vedarth Sharma Assignee: Vedarth Sharma Make changes in release.py to include build and push of RC docker image to RM's dockerhub account and include the details in VOTE email template. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER [kafka]
jcme opened a new pull request, #14818: URL: https://github.com/apache/kafka/pull/14818 # Overview * This change pertains to [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism of Kafka authentication. * Kafka clients can use [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism by overriding the [custom call back handlers](https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod) . * [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575) available from v3.1 further extends the mechanism with a production grade implementation. * Kafka's [SASL/OAUTHBEARER ](https://kafka.apache.org/documentation/#security_sasl_oauthbearer) mechanism currently **rejects the non-JWT (i.e. opaque) tokens**. This is because of a more restrictive set of characters than what [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1) recommends. * This JIRA can be considered an extension of [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575) to support the opaque tokens as well apart from the JWT tokens. # Solution * Have updated the regex in the the offending class to be compliant with the [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1) * Have provided a supporting test case that includes the possible character set defined in [RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15880) Add github actions workflow for promoting RC docker image
Vedarth Sharma created KAFKA-15880: -- Summary: Add github actions workflow for promoting RC docker image Key: KAFKA-15880 URL: https://issues.apache.org/jira/browse/KAFKA-15880 Project: Kafka Issue Type: Sub-task Reporter: Vedarth Sharma Assignee: Vedarth Sharma RC docker image needs to be pulled and pushed to apache/kafka through this github actions workflow. We need to ensure that only PMC members can access this workflow. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15879) Add documentation for the Docker image
Vedarth Sharma created KAFKA-15879: -- Summary: Add documentation for the Docker image Key: KAFKA-15879 URL: https://issues.apache.org/jira/browse/KAFKA-15879 Project: Kafka Issue Type: Sub-task Reporter: Vedarth Sharma Assignee: Vedarth Sharma Update quickstart with docker image details and docker section in getting started section -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15803: Update leader epoch in commitAsync and committed [kafka]
lucasbru opened a new pull request, #14817: URL: https://github.com/apache/kafka/pull/14817 To align the new consumer completely with the legacy consumer, we need to update the latest seen leader epoch in the metadata both when we commit offsets with a leader epoch using `commitAsync` and when we fetch the latest committed offsets using `committed`. We add unit tests to `AsyncKafkaConsumer` to test that metadata is correctly updated. We also add a corresponding test for `commitSync`, which already had the leader epoch logic. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15803) Update last seen epoch during commit
[ https://issues.apache.org/jira/browse/KAFKA-15803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788757#comment-17788757 ] Lucas Brutschy commented on KAFKA-15803: >From what I can see, we seem to be missing mostly is updating the epoch in >`commitAsync` and `committed`. Otherwise, both implementations seem to behave the same. > Update last seen epoch during commit > > > Key: KAFKA-15803 > URL: https://issues.apache.org/jira/browse/KAFKA-15803 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: kip-848-client-support, kip-848-e2e, kip-848-preview > > At the time we implemented commitAsync in the prototypeAsyncConsumer, > metadata was not there. The ask here is to investigate if we need to add the > following function to the commit code: > > private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, > OffsetAndMetadata offsetAndMetadata) { > if (offsetAndMetadata != null) > offsetAndMetadata.leaderEpoch().ifPresent(epoch -> > metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); > } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
[ https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anuj Sharma updated KAFKA-15878: Description: {code:java} // code placeholder {code} h1. Overview * This issue pertains to [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] mechanism of Kafka authentication. * Kafka clients can use [SASL/OAUTHBEARER |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism by overriding the [custom call back handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] . * [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] available from v3.1 further extends the mechanism with a production grade implementation. * Kafka's [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is because of a more restrictive set of characters than what [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] recommends. * This JIRA can be considered an extension of [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] to support the opaque tokens as well apart from the JWT tokens. In summary the following character set should be supported as per the RFC - {code:java} 1*( ALPHA / DIGIT / "-" / "." / "_" / "~" / "+" / "/" ) *"=" {code} was: {code:java} // code placeholder {code} h1. Overview * This issue pertains to [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] mechanism of Kafka authentication. * Kafka clients can use [SASL/OAUTHBEARER |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism by overriding the [custom call back handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] . * [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] available from v3.1 further extends the mechanism with a production grade implementation. * Kafka's [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is because of a more restrictive set of characters than what [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] recommends. * This JIRA can be considered an extension of [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] to support the opaque tokens as well apart from the JWT tokens. In summary the following character set should be supported as per the RFC - {code:java} 1*( ALPHA / DIGIT / "-" / "." / "_" / "~" / "+" / "/" ) *"=" {code} > KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER > > > Key: KAFKA-15878 > URL: https://issues.apache.org/jira/browse/KAFKA-15878 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Anuj Sharma >Priority: Major > > {code:java} > // code placeholder > {code} > h1. Overview > * This issue pertains to > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism of Kafka authentication. > * Kafka clients can use [SASL/OAUTHBEARER > |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism > by overriding the [custom call back > handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] > . > * > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > available from v3.1 further extends the mechanism with a production grade > implementation. > * Kafka's > [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] > mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is > because of a more restrictive set of characters than what > [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] > recommends. > * This JIRA can be considered an extension of > [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] > to support the opaque tokens as well apart from the JWT tokens. > > In summary the following character set should be supported as per the RFC - > {code:java} > 1*( ALPHA / DIGIT / >"-" / "." / "_" / "~" / "+" / "/" ) *"=" > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
Anuj Sharma created KAFKA-15878: --- Summary: KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER Key: KAFKA-15878 URL: https://issues.apache.org/jira/browse/KAFKA-15878 Project: Kafka Issue Type: Improvement Components: clients Reporter: Anuj Sharma {code:java} // code placeholder {code} h1. Overview * This issue pertains to [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] mechanism of Kafka authentication. * Kafka clients can use [SASL/OAUTHBEARER |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism by overriding the [custom call back handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod] . * [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] available from v3.1 further extends the mechanism with a production grade implementation. * Kafka's [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer] mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is because of a more restrictive set of characters than what [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] recommends. * This JIRA can be considered an extension of [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575] to support the opaque tokens as well apart from the JWT tokens. In summary the following character set should be supported as per the RFC - {code:java} 1*( ALPHA / DIGIT / "-" / "." / "_" / "~" / "+" / "/" ) *"=" {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1822604515 > @apoorvmittal10 : Thanks for the updated PR. LGTM. Are the 34 test failures related? Thanks @junrao, the tests are not related and I ll trigger re-build to see if tests fail persists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on PR #14620: URL: https://github.com/apache/kafka/pull/14620#issuecomment-1822599777 @xvrl Can you please re-review and let me know if fixing temporality change by resetting ledger in follow up PR makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1401906005 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value, but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 metrics: a rate and a count. For eg: + * org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it has to be + * created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. + * + * + * + * Frequencies is created with an array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the same way as Frequencies. The only difference is that it
[jira] [Created] (KAFKA-15877) Support change of temporality in Java client
Apoorv Mittal created KAFKA-15877: - Summary: Support change of temporality in Java client Key: KAFKA-15877 URL: https://issues.apache.org/jira/browse/KAFKA-15877 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal Details: https://github.com/apache/kafka/pull/14620#discussion_r1401554867 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15876) Introduce Remote Storage Not Ready Exception
Kamal Chandraprakash created KAFKA-15876: Summary: Introduce Remote Storage Not Ready Exception Key: KAFKA-15876 URL: https://issues.apache.org/jira/browse/KAFKA-15876 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash Assignee: Kamal Chandraprakash When tiered storage is enabled on the cluster, Kafka broker has to build the remote log metadata for all the partitions that it is either leader/follower on node restart. The remote log metadata is built in asynchronous fashion and does not interfere with the broker startup path. Once the broker becomes online, it cannot handle the client requests (FETCH and LIST_OFFSETS) to access remote storage until the metadata gets built for those partitions. Currently, we are returning a ReplicaNotAvailable exception back to the client so that it will retry after sometime. [ReplicaNotAvailableException|https://sourcegraph.com/github.com/apache/kafka@254335d24ab6b6d13142dcdb53fec3856c16de9e/-/blob/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java] is applicable when there is a reassignment is in-progress and kind of deprecated with the NotLeaderOrFollowerException ([PR#8979|https://github.com/apache/kafka/pull/8979]). It's good to introduce an appropriate retriable exception for remote storage errors to denote that it is not ready to accept the client requests yet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
dajac commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1822554162 @AndrewJSchofield We are about to merge https://github.com/apache/kafka/pull/14781. Is it possible to build this one on top of it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Update LICENSE-binary for 3.6.1 [kafka]
showuon commented on code in PR #14812: URL: https://github.com/apache/kafka/pull/14812#discussion_r1401875110 ## LICENSE-binary: ## @@ -205,7 +205,7 @@ This project bundles some components that are also licensed under the Apache License Version 2.0: -audience-annotations-0.13.0 +audience-annotations-0.12.0 Review Comment: Yes, it's quite surprised that we need to downgrade it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
cadonna commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1401760510 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -277,8 +294,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) } - @Test - def testAutoCommitOnCloseAfterWakeup(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // close() is not committing offsets in consumer group protocol Review Comment: This is just a transient state, isn't it? At least https://issues.apache.org/jira/browse/KAFKA-15327 says that committing on close is planned. If KAFKA-15327 is still valid, can we formulate this comment accordingly like `close() is not committing offsets in consumer group protocol for now but it should when implementation is complete`. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: I guess the `ConsumerRebalanceListener` is also something that will be supported in the future by the consumer group protocol. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -537,33 +574,41 @@ class PlaintextConsumerTest extends BaseConsumerTest { awaitAssignment(consumer, shrunkenAssignment) } - @Test - def testPartitionsFor(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol + def testPartitionsFor(groupProtocol: String): Unit = { val numParts = 2 createTopic("part-test", numParts, 1) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() val parts = consumer.partitionsFor("part-test") assertNotNull(parts) assertEquals(2, parts.size) } - @Test - def testPartitionsForAutoCreate(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol + def testPartitionsForAutoCreate(groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() // First call would create the topic consumer.partitionsFor("non-exist-topic") val partitions = consumer.partitionsFor("non-exist-topic") assertFalse(partitions.isEmpty) } - @Test - def testPartitionsForInvalidTopic(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol Review Comment: See my comment above about the functionality being supported in future. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) runMultiConsumerSessionTimeoutTest(false) } - @Test - def testMultiConsumerSessionTimeoutOnClose(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnClose(groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) runMultiConsumerSessionTimeoutTest(true) } - @Test - def testInterceptors(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Consumer interceptors not implemented for consumer group protocol Review Comment: Will also interceptors be supported? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
Re: [PR] KAFKA-15875 Make Snapshot public [kafka]
jlprat commented on code in PR #14816: URL: https://github.com/apache/kafka/pull/14816#discussion_r1401847506 ## server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java: ## @@ -24,10 +24,10 @@ * A snapshot of some timeline data structures. * * The snapshot contains historical data for several timeline data structures. - * We use an IdentityHashMap to store this data. This way, we can easily drop all of + * We use an IdentityHashMap to store this data. This way, we can easily drop all * the snapshot data. */ -class Snapshot { +public class Snapshot { Review Comment: Class is now public, but only read methods are public, so no class from outside the package can tamper with the data in this data structure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15875 Make Snapshot public [kafka]
jlprat opened a new pull request, #14816: URL: https://github.com/apache/kafka/pull/14816 Makes Snapshop, Revertable and Delta classes public. Only read methods are made public as well. Snapshot, Revertable and Delta are package protected but they potentially leak to classes in other packages (org.apache.kafka.controller.OffsetControlManager). *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15856: Add integration tests for JoinGroup API and SyncGroup API [kafka]
dajac commented on code in PR #14800: URL: https://github.com/apache/kafka/pull/14800#discussion_r1401815563 ## clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java: ## @@ -37,6 +37,12 @@ public JoinGroupResponse(JoinGroupResponseData data, short version) { if (version < 7 && data.protocolName() == null) { data.setProtocolName(""); } + +// If nullable string for the protocol name is supported, +// we set empty string to be null to ensure compliance. +if (version >= 7 && data.protocolName() != null && data.protocolName().isEmpty()) { +data.setProtocolName(null); +} Review Comment: Should we add a unit test in JoinGroupResponseTest to cover this change? ## core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala: ## @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol +import org.apache.kafka.common.message.SyncGroupRequestData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import java.util.Collections +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSyncGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testSyncGroup() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSyncGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testSyncGroup() + } + + private def testSyncGroup(): Unit = { +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +for (version <- 0 to ApiKeys.SYNC_GROUP.latestVersion(isUnstableApiEnabled)) { + // Sync with unknown group id. + syncGroupWithOldProtocol( +groupId = "grp-unknown", +memberId = "member-id", +generationId = -1, +expectedProtocolType = null, +expectedProtocolName = null, +expectedError = Errors.UNKNOWN_MEMBER_ID, +version = version.toShort + ) + + val metadata = ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo")) + ).array + + // Join a dynamic member without member id. + // Prior to JoinGroup version 4, a new member is immediately added if it sends a join group request with UNKNOWN_MEMBER_ID. + val joinLeaderResponseData = sendJoinRequest( +
Re: [PR] KAFKA-15756: [1/2] Migrate existing integration tests to run old protocol in new coordinator [kafka]
dajac commented on code in PR #14781: URL: https://github.com/apache/kafka/pull/14781#discussion_r1401788044 ## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ## @@ -67,6 +67,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { if (isZkMigrationTest()) { cfgs.foreach(_.setProperty(KafkaConfig.MigrationEnabledProp, "true")) } +if (isNewGroupCoordinatorEnabled()) { + cfgs.foreach(_.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")) Review Comment: We could actually remove this now that https://github.com/apache/kafka/commit/7826d5fc8ab695a5ad927338469ddc01b435a298 was merged. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -547,23 +564,29 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(2, parts.size) } - @Test - def testPartitionsForAutoCreate(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) + def testPartitionsForAutoCreate(quorum: String): Unit = { val consumer = createConsumer() // First call would create the topic consumer.partitionsFor("non-exist-topic") -val partitions = consumer.partitionsFor("non-exist-topic") -assertFalse(partitions.isEmpty) +var partitions = consumer.partitionsFor("non-exist-topic") +TestUtils.waitUntilTrue(() => { + partitions = consumer.partitionsFor("non-exist-topic") + !partitions.isEmpty +}, s"Timed out while awaiting non empty partitions.") Review Comment: nit: We don't really need `var partitions` here. How about? ``` TestUtils.waitUntilTrue(() => { !consumer.partitionsFor("non-exist-topic").isEmpty }, s"Timed out while awaiting non empty partitions.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]
appchemist opened a new pull request, #14815: URL: https://github.com/apache/kafka/pull/14815 KRaft support in LeaderEpochIntegrationTest ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15869) Document semantics of nullable nested API entities
[ https://issues.apache.org/jira/browse/KAFKA-15869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788698#comment-17788698 ] Anton Agestam commented on KAFKA-15869: --- Sorry for spamming here, I found the thread now in the non-Pony UI: https://www.mail-archive.com/dev@kafka.apache.org/msg127971.html > Document semantics of nullable nested API entities > -- > > Key: KAFKA-15869 > URL: https://issues.apache.org/jira/browse/KAFKA-15869 > Project: Kafka > Issue Type: Wish >Reporter: Anton Agestam >Priority: Minor > > The initial version of ConsumerGroupHeartbeatResponse [introduced the first > field across the protocol that is a nullable nested > entity|https://github.com/dajac/kafka/blob/3acd87a3e82e1d2fd4c07218d362e7665b99c547/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json#L48]. > As the implementor of a third-party schema parser it is not clear how to > handle this field, where such fields are allowed, and how null is represented > for such fields. > As far as I can tell, the [protocol > guide|https://kafka.apache.org/protocol.html#The_Messages_ConsumerGroupHeartbeat] > does not mention the nullability at all. > The reason I ask where such fields are allowed is because if the answer to > how null is represented here is just omitting writing any bytes, then I > suspect the only unambiguous place for such field to appear would be as the > last field of a top-level entity. Even then, how is it discriminated from > tagged fields? > Is it possible this field was made nullable by mistake? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15869) Document semantics of nullable nested API entities
[ https://issues.apache.org/jira/browse/KAFKA-15869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788694#comment-17788694 ] Anton Agestam commented on KAFKA-15869: --- [~dajac] The discussion link on that KIP looks wrong (presumably a left-over from a KIP template?), and neither Google or the mail archive search is being helpful. Do you happen to know if there's a discussion thread for it? > Document semantics of nullable nested API entities > -- > > Key: KAFKA-15869 > URL: https://issues.apache.org/jira/browse/KAFKA-15869 > Project: Kafka > Issue Type: Wish >Reporter: Anton Agestam >Priority: Minor > > The initial version of ConsumerGroupHeartbeatResponse [introduced the first > field across the protocol that is a nullable nested > entity|https://github.com/dajac/kafka/blob/3acd87a3e82e1d2fd4c07218d362e7665b99c547/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json#L48]. > As the implementor of a third-party schema parser it is not clear how to > handle this field, where such fields are allowed, and how null is represented > for such fields. > As far as I can tell, the [protocol > guide|https://kafka.apache.org/protocol.html#The_Messages_ConsumerGroupHeartbeat] > does not mention the nullability at all. > The reason I ask where such fields are allowed is because if the answer to > how null is represented here is just omitting writing any bytes, then I > suspect the only unambiguous place for such field to appear would be as the > last field of a top-level entity. Even then, how is it discriminated from > tagged fields? > Is it possible this field was made nullable by mistake? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]
dajac merged PR #14582: URL: https://github.com/apache/kafka/pull/14582 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashwin Pankaj reassigned KAFKA-15699: - Assignee: Ashwin Pankaj > MirrorConnectorsIntegrationBaseTest is flaky > > > Key: KAFKA-15699 > URL: https://issues.apache.org/jira/browse/KAFKA-15699 > Project: Kafka > Issue Type: Bug >Reporter: Calvin Liu >Assignee: Ashwin Pankaj >Priority: Major > > It may relate to inappropriate test timeout > testReplicateSourceDefault() > {code:java} > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be > different, because it's in exclude filter! ==> expected: not equal but was: > <8640> > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testReplicateSourceDefault$9(MirrorConnectorsIntegrationBaseTest.java:826){code} > testOffsetSyncsTopicsOnTarget() > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, > deadlineMs=1698275006778, tries=1, nextAllowedTryMs=1698275715972) timed out > at 1698275715878 after 1 attempt(s)at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) >at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15875) Snapshot class is package protected but returned in public methods
Josep Prat created KAFKA-15875: -- Summary: Snapshot class is package protected but returned in public methods Key: KAFKA-15875 URL: https://issues.apache.org/jira/browse/KAFKA-15875 Project: Kafka Issue Type: Task Affects Versions: 3.6.0 Reporter: Josep Prat Assignee: Josep Prat org.apache.kafka.timeline.Snapshot class is package protected but it is part of the public API of org.apache.kafka.timeline.SnapshotRegistry. This might cause compilation errors if we ever try to assign the returned object of these methods to a variable. org.apache.kafka.controller.OffsetControlManager is calling SnapshotRegistry's methods that return a Snapshot and OffsetControlManager is in another package. The SnapshotRegistry class seems to not be public API so I don't think this needs a KIP. -- This message was sent by Atlassian Jira (v8.20.10#820010)