[jira] [Created] (KAFKA-15821) Active topics for deleted connectors are not reset in standalone mode
Chris Egerton created KAFKA-15821: - Summary: Active topics for deleted connectors are not reset in standalone mode Key: KAFKA-15821 URL: https://issues.apache.org/jira/browse/KAFKA-15821 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0, 2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.5.0, 3.7.0 Reporter: Chris Egerton In [KIP-558|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect], a new REST endpoint was added to report the set of active topics for a connector. The KIP specified that "Deleting a connector will reset this connector's set of active topics", and this logic was successfully implemented in distributed mode. However, in standalone mode, active topics for deleted connectors are not deleted, and if a connector is re-created, it will inherit the active topics of its predecessor(s) unless they were manually reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15802) Trying to access uncopied segments metadata on listOffsets
[ https://issues.apache.org/jira/browse/KAFKA-15802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15802: --- Fix Version/s: 3.7.0 3.6.1 > Trying to access uncopied segments metadata on listOffsets > -- > > Key: KAFKA-15802 > URL: https://issues.apache.org/jira/browse/KAFKA-15802 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.6.0 >Reporter: Francois Visconte >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > We have a tiered storage cluster running with Aiven s3 plugin. > On our cluster, we have a process doing regular listOffsets requests. > This triggers the following exception: > {code:java} > org.apache.kafka.common.KafkaException: > org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: > Requested remote resource was not found > at > org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$6(RemoteIndexCache.java:355) > at > org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:318) > Nov 09, 2023 1:42:01 PM com.github.benmanes.caffeine.cache.LocalAsyncCache > lambda$handleCompletion$7 > WARNING: Exception thrown during asynchronous load > java.util.concurrent.CompletionException: > io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key > cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest > does not exists in storage S3Storage{bucketName='bucket', partSize=16777216} > at > com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:107) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760) > at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > Caused by: io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key > cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest > does not exists in storage S3Storage{bucketName='bucket', partSize=16777216} > at io.aiven.kafka.tieredstorage.storage.s3.S3Storage.fetch(S3Storage.java:80) > at > io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.lambda$new$1(SegmentManifestProvider.java:59) > at > com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:103) > ... 7 more > Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The > specified key does not exist. (Service: S3, Status Code: 404, Request ID: > CFMP27PVC9V2NNEM, Extended Request ID: > F5qqlV06qQJ5qCuWl91oueBaha0QLMBURJudnOnFDQk+YbgFcAg70JBATcARDxN44DGo+PpfZHAsum+ioYMoOw==) > at > software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125) > at > software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82) > at > software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60) > at > software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30) > at > software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) > at > software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) > at >
Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]
satishd commented on code in PR #14727: URL: https://github.com/apache/kafka/pull/14727#discussion_r1392031838 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; +private static final Set SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of( Review Comment: COPY_SEGMENT_STARTED segments are eligible for deletion when those segments were not able to be copied by the leader as the leader went through ungraceful shutdown or for any oher reasons. New leader may pickup the resepctive segments for the targeted offsets that need to be copied and the earlier failed segment will remain in the COPY_SEGMENT_STARTED state and it will eventually be deleted by retention cleanup logic. So, COPY_SEGMENT_STARTED is a valid transition even now when copy and deletion are happening sequentially. ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -1027,8 +1027,32 @@ void testFindOffsetByTimestampWithInvalidEpochSegments() throws IOException, Rem assertEquals(Optional.empty(), maybeTimestampAndOffset3); } +@Test +void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, RemoteStorageException { +TopicPartition tp = leaderTopicIdPartition.topicPartition(); + +long ts = time.milliseconds(); +long startOffset = 120; +int targetLeaderEpoch = 10; + +TreeMap validSegmentEpochs = new TreeMap<>(); +validSegmentEpochs.put(targetLeaderEpoch, startOffset); + +LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint); +leaderEpochFileCache.assign(4, 99L); +leaderEpochFileCache.assign(5, 99L); +leaderEpochFileCache.assign(targetLeaderEpoch, startOffset); +leaderEpochFileCache.assign(12, 500L); + +doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_STARTED); + +Optional maybeTimestampAndOffset1 = remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, leaderEpochFileCache); Review Comment: Can you rename `maybeTimestampAndOffset1` as `maybeTimestampAndOffset`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]
philipnee commented on PR #14680: URL: https://github.com/apache/kafka/pull/14680#issuecomment-1809546161 Hi @lucasbru - Thanks for your feedback, the PR was updated. Would you have time to go over the comments again? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
apoorvmittal10 commented on PR #14632: URL: https://github.com/apache/kafka/pull/14632#issuecomment-1809524348 There are different tests failing from previous runs and none related to the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15820) Add a metric to track the number of partitions under min ISR
Calvin Liu created KAFKA-15820: -- Summary: Add a metric to track the number of partitions under min ISR Key: KAFKA-15820 URL: https://issues.apache.org/jira/browse/KAFKA-15820 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu Assignee: Calvin Liu -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits [kafka]
github-actions[bot] commented on PR #13911: URL: https://github.com/apache/kafka/pull/13911#issuecomment-1809490953 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1391862343 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -1093,6 +1099,10 @@ private void subscribeInternal(Collection topics, Optional(topics), listener)) metadata.requestUpdateForNewTopics(); + +// Trigger subscribe event to effectively join the group if not already part of it, +// or just send the new subscription to the broker. +applicationEventHandler.add(new SubscriptionChangeApplicationEvent()); Review Comment: There's another `subscribeInternal()` for the topic pattern path. We want this there too, right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() { @Override public void enforceRebalance() { -throw new KafkaException("method not implemented"); Review Comment: Good call! ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals.events; + +/** + * Application event triggered when a user calls the unsubscribe API. This will make the consumer + * release all its assignments and send a heartbeat request to leave the consumer group. + * This event holds a future that will complete when the invocation of callbacks to release + * complete and the heartbeat to leave the group is sent out (minimal effort to send the + * leave group heartbeat, without waiting for any response or considering timeouts). + */ +public class UnsubscribeApplicationEvent extends CompletableApplicationEvent { Review Comment: The intention of the `CompleteableApplicationEvent` was to have a way for the consumer to block on the results of operations performed in the background thread. Since the `Consumer.unsubscribe()` API call is non-blocking, I'm thinking this should be a subclass of `ApplicationEvent`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -25,7 +25,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, -LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA +LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, Review Comment: ```suggestion LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIBED, ``` `SUBSCRIPTION_CHANGE` is a bit vague. Does it encompass more than the event of the user calling `Consumer.subscribe()`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +347,537 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state",
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391891296 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; +@SuppressWarnings("ClassFanOutComplexity") Review Comment: Well actually after reading through the whole PR, I do think it's unnecessary complexity for the `TaskManager` after all -- see [this comment](https://github.com/apache/kafka/pull/14735#discussion_r1391855192). But that's more because of the complexity of determining when to invoke `#onUpdateSuspended` and to keep everything in the StoreChangelogReader class. Hopefully that all makes sense to you too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]
sean-rossignol commented on PR #13283: URL: https://github.com/apache/kafka/pull/13283#issuecomment-1809404175 Hey, anyway I could help unblock this pr @vamossagar12 ? We have an internal kstreams app with a wildcard consumer and one bottleneck we're hitting is as the number of source topics grows so does our initial total heap size when we need to rehydrate the components down stream of our cooker. The problem is that we set the max heap size across all instances of the kstreams app to handle the initial expected load when we first deploy the application. Over time the number of source topics grow and come rehydrate time we will have to increase the resources dedicated to the instance, futz with the buffered.records.per.partition config, or reset the offsets of the source topic partitions in batches, otherwise all instances of our application will go OOM. Being able to define and enforce a maximum input buffer size at the instance/thread level would allow us to handle these rehydration events without needing to change any other elements of our deployments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391877914 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; +@SuppressWarnings("ClassFanOutComplexity") Review Comment: Oh, I see. I was thinking that this might be an unnecessary complexity for the `TaskManager` class, and instead we can pass standby callback to the `ActiveTaskCreator`. But not sure if Standby callback API is compatible with the terminology used in the `ActiveTaskCreator` class. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; +@SuppressWarnings("ClassFanOutComplexity") Review Comment: Oh, I see. I was thinking that this might be an unnecessary complexity for the `TaskManager` class, and instead we can pass standby callback to the `ActiveTaskCreator`. But not sure if Standby callback API is compatible with the terminology used in the `ActiveTaskCreator` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15816: Fix leaked sockets in core tests [kafka]
gharris1727 opened a new pull request, #14754: URL: https://github.com/apache/kafka/pull/14754 These tests leak sockets due to various typos. Additionally, many tests leaked sockets because MiniKdc stops asynchronously, leaving sockets open at the end of the test which are eventually cleaned up. This patch uses the alternative dispose method which awaits termination of the internal resources of the KDC before proceeding with the normal shutdown. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-988 [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391815838 ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -580,6 +583,17 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState } } +public void setStandbyUpdateListener(final StandbyUpdateListener globalStandbyListener) { Review Comment: Can you add javadocs (you can pretty much copy/paste from the above API for reference) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15819) KafkaServer leaks KafkaRaftManager when ZK migration enabled
[ https://issues.apache.org/jira/browse/KAFKA-15819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15819: Priority: Minor (was: Major) > KafkaServer leaks KafkaRaftManager when ZK migration enabled > > > Key: KAFKA-15819 > URL: https://issues.apache.org/jira/browse/KAFKA-15819 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > In SharedServer, TestRaftServer, and MetadataShell, the KafkaRaftManager is > maintained as an instance variable, and shutdown when the outer instance is > shutdown. However, in the KafkaServer, the KafkaRaftManager is instantiated > and started, but then the reference is lost. > [https://github.com/apache/kafka/blob/49d3122d425171b6a59a2b6f02d3fe63d3ac2397/core/src/main/scala/kafka/server/KafkaServer.scala#L416-L442] > Instead, the KafkaServer should behave like the other call-sites of > KafkaRaftManager, and shutdown the KafkaRaftManager during shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KIP-988 [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391848915 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map tasks, // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); +} else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY && storeMetadata.endOffset() != null) { Review Comment: Why do we only invoke the standby listener when we have the `endOffset` filled out? I don't know off the top of my head when it would be empty or not, but I would think the listener callbacks are still useful even without this one field filled in. Also, users might rely on the standby listener callbacks being invoked. I think we should guarantee that the listener is always called, at least in the absence of app-wide errors that cause a shutdown) If you're wondering what to do in the case of it being null, I would suggest just passing in `-1L` as a sentinel value. As long as we mention that this is a possibility in the javadocs for the `endOffset` argument, I don't see any problem with leaving it up to the user to decide how to react in this case ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: Seems like this is the only place where we invoke the `onUpdateSuspended` callback, ie we're missing the `SuspendReason.MIGRATED` case right? We can just look to the active restore listener and follow that same example -- looks like it invokes the analogous `#onRestoreSuspended` callback in the StoreChangelogReader, specifically in the `#unregister` method. Personally, I think that would be the best place to invoke the standby listener's `#onUpdateSuspended`, not just in the `MIGRATED` case but also for `PROMOTED`, so we can keep the logic in one place. And `#unregister` is the perfect place to do so, since it always gets invoked whether the task is being closed or recycled. You can just add a parameter to the `#unregister` method to pass in which of those two options it was. Keeping everything in the StoreChangelogReader also helps us avoid some gnarly questions about special case handling, because the question of when a task is closed can actually be pretty complicated when you look at it within the TaskManager: for example corrupted tasks may be closed and revived, opening up questions about whether and when to invoke `#onUpdateSuspend` if it's going to be revived again. But within the StoreChangelogReader we know that each task should have a 1:1 ratio of calls to `prepareChangelogs` and `unregister`, so it's much easier to reason about -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-988 [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391818555 ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. Review Comment: ```suggestion * Method called upon the initialization of the standby task, just before it begins to load from the changelog. ``` nit: I won't comment this everywhere, and frankly it's not a big deal and I'm only pointing it out because it should be a quick fix with find, but we usually don't capitalize terms like "Standby Task". Mainly to help differentiate between the common name and the class name. So for example "TopicPartition" is fine, but "Topic Partition" would be weird. Similarly, I would use "standby task" here, although "StandbyTask" is also correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-988 [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391822284 ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + * @param currentEndOffset the current latest offset on the associated changelog partition. + */ +void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long currentEndOffset); + +/** + * Method called after restoring a batch of records. In this case the maximum size of the batch is whatever + * the value of the MAX_POLL_RECORDS is set to. + * Review Comment: formatting nit: add a single `` between paragraphs in the javadocs (although you don't need a closing `` tag, nor do you need a `` between the last paragraph and the `@param` section) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-988 [kafka]
ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391818555 ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. Review Comment: ```suggestion * Method called upon the initialization of the standby task, just before it begins to load from the changelog. ``` nit: I won't comment this everywhere, and frankly it's not a big deal and I'm only pointing it out because it should be a quick fix with find, but we usually don't capitalize terms like "Standby Task". Mainly to help differentiate between the common name and the class name. So for example "TopicPartition" is fine, but "Topic Partition" would be weird. Similarly, I would use "standby task" here, although "StandbyTask" is also correct ## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + +enum SuspendReason { +MIGRATED, +PROMOTED +} + +/** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeNamethe name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + * @param currentEndOffset the current latest offset on the associated changelog partition. Review Comment: ```suggestion * @param currentEndOffset the current end offset on the associated changelog partition. ``` I guess you could also say "highest offset", but "latest" feels a bit ambiguous, at least to me personally ## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ## @@ -742,6 +756,30 @@ public void onRestoreSuspended(final TopicPartition topicPartition, final String } } +final class DelegatingStandbyUpdateListener implements StandbyUpdateListener { + +@Override +public void onUpdateStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long currentEndOffset) { Review Comment: nit: these method signatures are all really long, can you break up the parameters? This is the format we use: ```suggestion public void onUpdateStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long currentEndOffset) { ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
cmccabe merged PR #14682: URL: https://github.com/apache/kafka/pull/14682 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan opened a new pull request, #14753: URL: https://github.com/apache/kafka/pull/14753 In 91fa196930ece7342f38a5404…dfc09cf607916eb, I accidentally removed the action queue paramater that was added in 7d147cf2413e5d361422728e5c9306574658c78d I don't think this broke anything since we don't use verification for group coordinator commits, but I should fix it to be as it was before. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
kirktrue commented on PR #14752: URL: https://github.com/apache/kafka/pull/14752#issuecomment-1809303495 @philipnee Would you tag this with `ctr` and review? This is a low priority clean up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-988 [kafka]
ableegoldman commented on PR #14735: URL: https://github.com/apache/kafka/pull/14735#issuecomment-1809299774 I'll take a look! In the meantime, can you format the PR title with the ticket number? I thought we had a guide on this somewhere but I can't find it mentioned anywhere Anyways, it should be of the form `KAFKA-12345: title of the PR` ideally with the KIP number included in the "title of the PR" after the JIRA ticket number. (I'm not just giving you a hard time for no reason, there are a bunch of integrations and tools that rely on/assume this formatting to do things like automatically link PRs to JIRA tickets) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: WakeupTrigger cleanup [kafka]
kirktrue opened a new pull request, #14752: URL: https://github.com/apache/kafka/pull/14752 Added comments, made package-visible, and removed inner interface and classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue closed pull request #14670: KAFKA-15277: Design & implement support for internal Consumer delegates URL: https://github.com/apache/kafka/pull/14670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]
jeqo commented on code in PR #14727: URL: https://github.com/apache/kafka/pull/14727#discussion_r1391798949 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -988,30 +997,33 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE return; } RemoteLogSegmentMetadata metadata = segmentsIterator.next(); -if (segmentsToDelete.contains(metadata)) { -continue; -} -// When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated -// as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those -// remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether -// the epochs present in the segment lies in the checkpoint file. It will always return false -// since the checkpoint file was already truncated. -boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset( + +if (SEGMENT_DELETION_VALID_STATES.contains(metadata.state())) { Review Comment: Agree, applying this fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]
jeqo commented on code in PR #14727: URL: https://github.com/apache/kafka/pull/14727#discussion_r1391797997 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; +private static final Set SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of( Review Comment: Great catch. I guess the transition hasn't considered the current implementation where copying and deletion are sequential. If the implementation changes, it may be possible to have this scenario. I'd say it should be considered a valid transition. I will include it in the valid states and adapt the test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391783416 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -277,6 +279,24 @@ class BrokerServer( time ) + if (config.logDirs.size > 1) { Review Comment: Should we log something here, to indicate that JBOD mode is on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15819: Fix leaked KafkaRaftManager in ZK mode during migration [kafka]
gharris1727 opened a new pull request, #14751: URL: https://github.com/apache/kafka/pull/14751 The other call sites for KafkaRaftManager (SharedServer, TestRaftServer, MetadataShell) appear to shutdown the KafkaRaftManager when shutting down themselves. The call-site in ZK-mode KafkaServer should behave the same way. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391782832 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); +} +} + +
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391781726 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); + +private static final long MAX_BACKOFF_INTERVAL_MS = TimeUnit.SECONDS.toNanos(10); + +private final Time time; +private final NodeToControllerChannelManager channelManager; +private final int brokerId; +private final Supplier brokerEpochSupplier; +private final KafkaEventQueue eventQueue; + +// These variables should only be mutated from the event loop thread +private Map inflight = null; +private Map pending = new HashMap<>(); +private final ExponentialBackoff resendExponentialBackoff = +new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02); +private int failedAttempts = 0; + +public AssignmentsManager(Time time, + NodeToControllerChannelManager channelManager, + int brokerId, + Supplier brokerEpochSupplier) { +this.time = time; +this.channelManager = channelManager; +this.brokerId = brokerId; +this.brokerEpochSupplier = brokerEpochSupplier; +this.eventQueue = new KafkaEventQueue(time, +new LogContext("[AssignmentsManager id=" + brokerId + "]"), +"broker-" + brokerId + "-directory-assignments-manager-"); +} + +public void close() throws InterruptedException { +eventQueue.close(); +channelManager.shutdown(); +} + +public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) { +eventQueue.append(new AssignmentEvent(time.nanoseconds(), topicPartition, dirId)); +} + +// only for testing +void wakeup() { +eventQueue.wakeup(); +} + +/** + * Base class for all the events handled by {@link AssignmentsManager}. + */ +private abstract static class Event implements EventQueue.Event { +/** + * Override the default behavior in + * {@link EventQueue.Event#handleException} + * which swallows the exception. + */ +@Override +public void handleException(Throwable e) { +throw new RuntimeException(e); Review Comment:
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391780510 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); Review Comment: Also a lot of CPU clocks don't go down to an individual nanosecond. I don't know the exact details (varies a lot by platform) but basically you probably can't schedule an actual 1 ns delay. Something like a 1/4 a ms would be more reasonable? Though still likely to be padded out to a longer length by the actual platform -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15819) KafkaServer leaks KafkaRaftManager when ZK migration enabled
Greg Harris created KAFKA-15819: --- Summary: KafkaServer leaks KafkaRaftManager when ZK migration enabled Key: KAFKA-15819 URL: https://issues.apache.org/jira/browse/KAFKA-15819 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.6.0 Reporter: Greg Harris Assignee: Greg Harris In SharedServer, TestRaftServer, and MetadataShell, the KafkaRaftManager is maintained as an instance variable, and shutdown when the outer instance is shutdown. However, in the KafkaServer, the KafkaRaftManager is instantiated and started, but then the reference is lost. [https://github.com/apache/kafka/blob/49d3122d425171b6a59a2b6f02d3fe63d3ac2397/core/src/main/scala/kafka/server/KafkaServer.scala#L416-L442] Instead, the KafkaServer should behave like the other call-sites of KafkaRaftManager, and shutdown the KafkaRaftManager during shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391779074 ## core/src/main/java/kafka/server/AssignmentsManager.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData; +import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData; +import org.apache.kafka.common.message.AssignReplicasToDirsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AssignReplicasToDirsRequest; +import org.apache.kafka.common.requests.AssignReplicasToDirsResponse; +import org.apache.kafka.common.utils.ExponentialBackoff; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AssignmentsManager { + +private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class); + +/** + * Assignments are dispatched to the controller this long after + * being submitted to {@link AssignmentsManager}, if there + * is no request in flight already. + * The interval is reset when a new assignment is submitted. + */ +private static final long DISPATCH_INTERVAL_NS = TimeUnit.SECONDS.toNanos(1); Review Comment: I don't think 1 nanosecond is going to give you much batching! At 4 GHz, 1 nanosecond is like 4 CPU cycles Obviously you have piplining, instruction reordering, yadda yadda, but you get the idea: this is NOT a reasonable amount of time to get anything done in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15818) Implement max poll internval
[ https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15818: --- Description: The consumer needs to be polled at a candance lower than MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group. Currently, we send an acknowledgment event to the network thread per poll. The event only triggers update on autocommit state, we need to implement updating the poll timer so that the consumer can leave the group when the timer expires. The current logic looks like this: {code:java} if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(). log.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + "was longer than the configured max.poll.interval.ms, which typically implies that " + "the poll loop is spending too much time processing messages. You can address this " + "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + "returned in poll() with max.poll.records."); maybeLeaveGroup("consumer poll timeout has expired."); } {code} was: In the network thread, we need a timer configure to take MAX_POLL_INTERVAL_MAX. The reason is if the user don't poll the consumer within the internal, the member needs to leave the group. Currently, we send an acknowledgement event to the network thread per poll. It needs to do two things 1. update autocommit state 2. update max poll interval timer The current logic looks like this: {code:java} if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(). log.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + "was longer than the configured max.poll.interval.ms, which typically implies that " + "the poll loop is spending too much time processing messages. You can address this " + "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + "returned in poll() with max.poll.records."); maybeLeaveGroup("consumer poll timeout has expired."); } {code} > Implement max poll internval > > > Key: KAFKA-15818 > URL: https://issues.apache.org/jira/browse/KAFKA-15818 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Priority: Blocker > > The consumer needs to be polled at a candance lower than > MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group. > Currently, we send an acknowledgment event to the network thread per poll. > The event only triggers update on autocommit state, we need to implement > updating the poll timer so that the consumer can leave the group when the > timer expires. > > The current logic looks like this: > {code:java} > if (heartbeat.pollTimeoutExpired(now)) { > // the poll timeout has expired, which means that the foreground thread > has stalled > // in between calls to poll(). > log.warn("consumer poll timeout has expired. This means the time between > subsequent calls to poll() " + > "was longer than the configured max.poll.interval.ms, which typically > implies that " + > "the poll loop is spending too much time processing messages. You can > address this " + > "either by increasing max.poll.interval.ms or by reducing the maximum > size of batches " + > "returned in poll() with max.poll.records."); > maybeLeaveGroup("consumer poll timeout has expired."); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1809242988 I don't know if we want to do this in this PR, but one thing I would suggest for ReplicaManager is to use the `FaultHandler` paradigm we have in the `QuorumController` code. Specifically, when `QuorumController` hits an unrecoverable condition it will invoke the `FaultHandler`. In normal operation this maps to logging an error message and exiting. In unit tests, it maps to an exception, and also setting a flag that will cause any integration test to always fail. This accomplishes two things: 1. avoids calling `exit(1)` in junit tests, which will kill Jenkins dead (even after 3 decades of Java, we don't have the technology to intercept `exit()` in unit testrs >:( ) 2. allows us to always know if something is going wrong in the unit / integration test. There can also be non-fatal fault handlers, which tend to make point 2 even more important (since many times throwing an exception or logging an ERROR will not prevent the test from succeeding!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15818) Implement max poll internval
[ https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15818: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > Implement max poll internval > > > Key: KAFKA-15818 > URL: https://issues.apache.org/jira/browse/KAFKA-15818 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Priority: Blocker > > In the network thread, we need a timer configure to take > MAX_POLL_INTERVAL_MAX. The reason is if the user don't poll the consumer > within the internal, the member needs to leave the group. > > Currently, we send an acknowledgement event to the network thread per poll. > It needs to do two things 1. update autocommit state 2. update max poll > interval timer > > The current logic looks like this: > {code:java} > if (heartbeat.pollTimeoutExpired(now)) { > // the poll timeout has expired, which means that the foreground thread > has stalled > // in between calls to poll(). > log.warn("consumer poll timeout has expired. This means the time between > subsequent calls to poll() " + > "was longer than the configured max.poll.interval.ms, which typically > implies that " + > "the poll loop is spending too much time processing messages. You can > address this " + > "either by increasing max.poll.interval.ms or by reducing the maximum > size of batches " + > "returned in poll() with max.poll.records."); > maybeLeaveGroup("consumer poll timeout has expired."); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1809239029 I don't have a good place to put this comment (github only lets me comment on changed lines) but there is a problem with this code: ``` private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) { override def doWork(): Unit = { val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir() if (haltBrokerOnDirFailure) { fatal(s"Halting broker because dir $newOfflineLogDir is offline") Exit.halt(1) } handleLogDirFailure(newOfflineLogDir) } } ``` If the directory that failed is the metadata directory, we need to exit unconditionally. This is because we have not implemented any way of failing over to a different directory for metadata. I suppose we should have a post-3.7 follow-up JIRA for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15818) Implement max poll internval
Philip Nee created KAFKA-15818: -- Summary: Implement max poll internval Key: KAFKA-15818 URL: https://issues.apache.org/jira/browse/KAFKA-15818 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee In the network thread, we need a timer configure to take MAX_POLL_INTERVAL_MAX. The reason is if the user don't poll the consumer within the internal, the member needs to leave the group. Currently, we send an acknowledgement event to the network thread per poll. It needs to do two things 1. update autocommit state 2. update max poll interval timer The current logic looks like this: {code:java} if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(). log.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + "was longer than the configured max.poll.interval.ms, which typically implies that " + "the poll loop is spending too much time processing messages. You can address this " + "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + "returned in poll() with max.poll.records."); maybeLeaveGroup("consumer poll timeout has expired."); } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391771537 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2323,15 +2326,56 @@ class ReplicaManager(val config: KafkaConfig, } logManager.handleLogDirFailure(dir) -if (sendZkNotification) +if (notifyController) { + if (config.migrationEnabled) { +fatal(s"Shutdown broker because some log directory has failed during migration mode: $dir") +Exit.halt(1) Review Comment: This seems wrong as our long-term solution, but I guess it's OK for now. (We can discuss more later I guess) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391770284 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -269,7 +270,9 @@ class ReplicaManager(val config: KafkaConfig, delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None, threadNamePrefix: Option[String] = None, val brokerEpochSupplier: () => Long = () => -1, - addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None + addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, + assignmentManager: Option[AssignmentsManager] = None, Review Comment: I know this is a common pattern in `ReplicaManager` but it just seems so bad. We don't really care about the details of `AssignmentsManager` or `BrokerLifecycleManager` here. Shouldn't we just be passing a reference to an interface like `AssignmentHandler`? ``` interface AssignmentHandler { void onAssignment(TopicIdPartition, Uuid); void propagateDirectoryFailure(Uuid directoryId); } ``` etc. Then we can initialize a dummy version by default, to keep all the unit tests working without changes (if desired). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
cmccabe commented on code in PR #14369: URL: https://github.com/apache/kafka/pull/14369#discussion_r1391763417 ## clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java: ## @@ -27,6 +27,8 @@ public class AssignReplicasToDirsRequest extends AbstractRequest { +public static final int MAX_ASSIGNMENTS_PER_REQUEST = 2250; Review Comment: Can you add JavaDoc about this? Including the part where we want to keep it under 64kb. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1391738283 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,914 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link
[jira] [Created] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available
Bob Barrett created KAFKA-15817: --- Summary: Avoid reconnecting to the same IP address if multiple addresses are available Key: KAFKA-15817 URL: https://issues.apache.org/jira/browse/KAFKA-15817 Project: Kafka Issue Type: Bug Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.3.2 Reporter: Bob Barrett In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS resolution behavior for clients to re-resolve DNS after disconnecting from a broker, rather than wait until we iterated over all addresses from a given resolution. This is useful when the IP addresses have changed between the connection and disconnection. However, with the behavior change, this does mean that clients could potentially reconnect immediately to the same IP they just disconnected from, if the IPs have not changed. In cases where the disconnection happened because that IP was unhealthy (such as a case where a load balancer has instances in multiple availability zones and one zone is unhealthy, or a case where an intermediate component in the network path is going through a rolling restart), this will delay the client successfully reconnecting. To address this, clients should remember the IP they just disconnected from and skip that IP when reconnecting, as long as the address resolved to multiple addresses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available
[ https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Barrett reassigned KAFKA-15817: --- Assignee: Bob Barrett > Avoid reconnecting to the same IP address if multiple addresses are available > - > > Key: KAFKA-15817 > URL: https://issues.apache.org/jira/browse/KAFKA-15817 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Bob Barrett >Assignee: Bob Barrett >Priority: Major > > In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS > resolution behavior for clients to re-resolve DNS after disconnecting from a > broker, rather than wait until we iterated over all addresses from a given > resolution. This is useful when the IP addresses have changed between the > connection and disconnection. > However, with the behavior change, this does mean that clients could > potentially reconnect immediately to the same IP they just disconnected from, > if the IPs have not changed. In cases where the disconnection happened > because that IP was unhealthy (such as a case where a load balancer has > instances in multiple availability zones and one zone is unhealthy, or a case > where an intermediate component in the network path is going through a > rolling restart), this will delay the client successfully reconnecting. To > address this, clients should remember the IP they just disconnected from and > skip that IP when reconnecting, as long as the address resolved to multiple > addresses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1809158196 Another point I want to make here is that the wakeup call also wakes-up the blocking client. I wonder if we also need to do that to the network thread - @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1809152957 Hi @cadonna - Thank you for putting time into this PR. Based on my understanding this PR does 2 things: if wakeup() is invoked before calling poll(), the consumer will return immediately. If wakeup() is invoked during poll(), we should get a wakeupException and return. Overall I think it looks right. *while writing this I think @kirktrue has asked the questions I wanted to ask. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on PR #14724: URL: https://github.com/apache/kafka/pull/14724#issuecomment-1809112163 > Hi @apoorvmittal10 - I left some questions while making the first pass of the PR. They are all aesthetics. Will follow up with more questions. Thanks! Thanks @philipnee , I have addressed the comments. Can you please re-review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1391701379 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,922 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.MetricsProvider; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1391700892 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java: ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.MetricsData; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; + +import static org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS; + +public class ClientTelemetryUtils { + +private final static Logger log = LoggerFactory.getLogger(ClientTelemetryUtils.class); + +public final static Predicate SELECTOR_NO_METRICS = k -> false; + +public final static Predicate SELECTOR_ALL_METRICS = k -> true; + +/** + * Examine the response data and handle different error code accordingly: + * + * + * Invalid Request: Disable Telemetry + * Invalid Record: Disable Telemetry + * UnknownSubscription or Unsupported Compression: Retry immediately + * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next interval + * + * + * @param errorCode response body error code + * @param intervalMs current push interval in milliseconds + * + * @return Optional of push interval in milliseconds + */ +public static Optional maybeFetchErrorIntervalMs(short errorCode, int intervalMs) { +if (errorCode == Errors.NONE.code()) +return Optional.empty(); + +int pushIntervalMs; +String reason; + +switch (Errors.forCode(errorCode)) { +case INVALID_REQUEST: +case INVALID_RECORD: { +pushIntervalMs = Integer.MAX_VALUE; +reason = "The broker response indicates the client sent an request that cannot be resolved" ++ " by re-trying, hence disable telemetry"; +break; +} +case UNKNOWN_SUBSCRIPTION_ID: +case UNSUPPORTED_COMPRESSION_TYPE: { +pushIntervalMs = 0; +reason = Errors.forCode(errorCode).message(); +break; +} +case TELEMETRY_TOO_LARGE: +case THROTTLING_QUOTA_EXCEEDED: { +reason = Errors.forCode(errorCode).message(); +pushIntervalMs = (intervalMs != -1) ? intervalMs : DEFAULT_PUSH_INTERVAL_MS; +break; +} +default: { +reason = "Unwrapped error code"; +log.error("Error code: {}, reason: {}. Unmapped error for telemetry, disable telemetry.", Review Comment: Thanks for pointing out, I have tried to improve this. Please check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1391699676 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java: ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.MetricsData; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; + +import static org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS; + +public class ClientTelemetryUtils { + +private final static Logger log = LoggerFactory.getLogger(ClientTelemetryUtils.class); + +public final static Predicate SELECTOR_NO_METRICS = k -> false; + +public final static Predicate SELECTOR_ALL_METRICS = k -> true; + +/** + * Examine the response data and handle different error code accordingly: + * + * + * Invalid Request: Disable Telemetry + * Invalid Record: Disable Telemetry + * UnknownSubscription or Unsupported Compression: Retry immediately + * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next interval + * + * + * @param errorCode response body error code + * @param intervalMs current push interval in milliseconds + * + * @return Optional of push interval in milliseconds + */ +public static Optional maybeFetchErrorIntervalMs(short errorCode, int intervalMs) { +if (errorCode == Errors.NONE.code()) +return Optional.empty(); + +int pushIntervalMs; +String reason; + +switch (Errors.forCode(errorCode)) { Review Comment: Make sense, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1391699120 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,922 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.MetricsProvider; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1391698780 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,922 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.MetricsProvider; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1391698419 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ## @@ -0,0 +1,922 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.MetricsProvider; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * + * + * In an unlikely
Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1391698213 ## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java: ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.MetricsData; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; + +import static org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS; + +public class ClientTelemetryUtils { + +private final static Logger log = LoggerFactory.getLogger(ClientTelemetryUtils.class); + +public final static Predicate SELECTOR_NO_METRICS = k -> false; + +public final static Predicate SELECTOR_ALL_METRICS = k -> true; + +/** + * Examine the response data and handle different error code accordingly: + * + * + * Invalid Request: Disable Telemetry + * Invalid Record: Disable Telemetry + * UnknownSubscription or Unsupported Compression: Retry immediately + * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next interval + * + * + * @param errorCode response body error code + * @param intervalMs current push interval in milliseconds + * + * @return Optional of push interval in milliseconds + */ +public static Optional maybeFetchErrorIntervalMs(short errorCode, int intervalMs) { +if (errorCode == Errors.NONE.code()) +return Optional.empty(); + +int pushIntervalMs; +String reason; + +switch (Errors.forCode(errorCode)) { +case INVALID_REQUEST: +case INVALID_RECORD: { +pushIntervalMs = Integer.MAX_VALUE; +reason = "The broker response indicates the client sent an request that cannot be resolved" ++ " by re-trying, hence disable telemetry"; +break; +} +case UNKNOWN_SUBSCRIPTION_ID: +case UNSUPPORTED_COMPRESSION_TYPE: { +pushIntervalMs = 0; +reason = Errors.forCode(errorCode).message(); +break; +} +case TELEMETRY_TOO_LARGE: +case THROTTLING_QUOTA_EXCEEDED: { +reason = Errors.forCode(errorCode).message(); +pushIntervalMs = (intervalMs != -1) ? intervalMs : DEFAULT_PUSH_INTERVAL_MS; +break; +} +default: { +reason = "Unwrapped error code"; +log.error("Error code: {}, reason: {}. Unmapped error for telemetry, disable telemetry.", +errorCode, Errors.forCode(errorCode).message()); +pushIntervalMs = Integer.MAX_VALUE; +} +} + +log.debug("Error code: {}, reason: {}. Retry automatically in {} ms.", errorCode, reason, pushIntervalMs); +return Optional.of(pushIntervalMs); +} + +public static Predicate getSelectorFromRequestedMetrics(List requestedMetrics) { +if (requestedMetrics == null || requestedMetrics.isEmpty()) { +log.debug("Telemetry subscription has specified no metric names; telemetry will record no metrics"); +return SELECTOR_NO_METRICS; +} else if (requestedMetrics.size() == 1 && requestedMetrics.get(0) != null && requestedMetrics.get(0).isEmpty()) { +log.debug("Telemetry subscription has specified a single empty metric name; using all metrics"); +return SELECTOR_ALL_METRICS; +} else { +log.debug("Telemetry subscription has specified
[jira] [Resolved] (KAFKA-15532) ZkWriteBehindLag should not be reported by inactive controllers
[ https://issues.apache.org/jira/browse/KAFKA-15532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-15532. -- Resolution: Fixed > ZkWriteBehindLag should not be reported by inactive controllers > --- > > Key: KAFKA-15532 > URL: https://issues.apache.org/jira/browse/KAFKA-15532 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Minor > > Since only the active controller is performing the dual-write to ZK during a > migration, it should be the only controller to report the ZkWriteBehindLag > metric. > > Currently, if the controller fails over during a migration, the previous > active controller will incorrectly report its last value for ZkWriteBehindLag > forever. Instead, it should report zero. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15532) ZkWriteBehindLag should not be reported by inactive controllers
[ https://issues.apache.org/jira/browse/KAFKA-15532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-15532: Assignee: David Arthur > ZkWriteBehindLag should not be reported by inactive controllers > --- > > Key: KAFKA-15532 > URL: https://issues.apache.org/jira/browse/KAFKA-15532 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Minor > > Since only the active controller is performing the dual-write to ZK during a > migration, it should be the only controller to report the ZkWriteBehindLag > metric. > > Currently, if the controller fails over during a migration, the previous > active controller will incorrectly report its last value for ZkWriteBehindLag > forever. Instead, it should report zero. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1809013650 > @apoorvmittal10 : Thanks for the PR. Made a pass of non-testing files. Left a few comments. Thanks a lot for the review @junrao . I have addressed the comments and have a question related to throttleTimeMs for errors in the comments. Please if you can re-review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391660088 ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final List RECEIVERS = new ArrayList<>(); Review Comment: Done. ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391659830 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391659127 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391658586 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391658438 ## core/src/main/java/kafka/metrics/ClientMetricsInstance.java: ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.protocol.Errors; + +import java.util.Objects; +import java.util.Set; + +/** + * Contains the metrics instance metadata and the state of the client instance. + */ +public class ClientMetricsInstance { + +private final Uuid clientInstanceId; +private final ClientMetricsInstanceMetadata instanceMetadata; +private final int subscriptionId; +private final long subscriptionUpdateEpoch; +private final Set metrics; +private final int pushIntervalMs; + +private boolean terminating; +private long lastGetRequestEpoch; +private long lastPushRequestEpoch; +private Errors lastKnownError; Review Comment: I have marked 2 of them volatile and other 2 are protected now with `synchronized`. I have added additional concurrency tests in to validate the behaviour that a single request can process when multiple are received for same client instance, others get rejected by throttling error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391656533 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; Review Comment: Make sense, thanks. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391656238 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391654569 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -16,12 +16,48 @@ */ package kafka.server; +import java.util.Collections; Review Comment: The reason I kept this at `kafka.server` package as I see all managers (in scala) processing API calls from KafkaApis.scala resides `kafka.server` package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391652781 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -71,6 +73,31 @@ public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse createResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +responseData.setThrottleTimeMs(throttleTimeMs); +return new PushTelemetryResponse(responseData); +} + +public String getMetricsContentType() { Review Comment: Missed these, thanks. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391651232 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception));
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391650483 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); Review Comment: I have asked the same question in other comment but shouldn't we set that as API call goes through `sendMaybeThrottle` in KafkaApis which throttles the request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391648051 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: > Do we need to make this an LRU cache? If a client is terminated or idle, we should remove them from the cache I have started with LRU cache and planning to improve this with cache which timebounds the connection. The KIP says: `client instance specific state is maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds`. I ll add improvement on the cache to respect that: https://issues.apache.org/jira/browse/KAFKA-15813 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15813) Improve implementation of client instance cache
[ https://issues.apache.org/jira/browse/KAFKA-15813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-15813: -- Summary: Improve implementation of client instance cache (was: Improve implementation of client instnce cache) > Improve implementation of client instance cache > --- > > Key: KAFKA-15813 > URL: https://issues.apache.org/jira/browse/KAFKA-15813 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > > In the current implementation the ClientMetricsManager uses LRU cache but we > should alos support expiring stale clients i.e. client which haven't reported > metrics for a while. > > The KIP mentions: This client instance specific state is maintained in broker > memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds and is used to > enforce the push interval rate-limiting. There is no persistence of client > instance metrics state across broker restarts or between brokers -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391640685 ## core/src/main/java/kafka/metrics/ClientMetricsInstanceMetadata.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import java.util.regex.Pattern; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.requests.RequestContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Information from the client's metadata is gathered from the client's request. + */ +public class ClientMetricsInstanceMetadata { + +private final Map attributesMap; + +public ClientMetricsInstanceMetadata(Uuid clientInstanceId, RequestContext requestContext) { +Objects.requireNonNull(clientInstanceId); +Objects.requireNonNull(requestContext); + +attributesMap = new HashMap<>(); + +attributesMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); +attributesMap.put(ClientMetricsConfigs.CLIENT_ID, requestContext.clientId()); +attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, requestContext.clientInformation != null ? +requestContext.clientInformation.softwareName() : null); +attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, requestContext.clientInformation != null ? +requestContext.clientInformation.softwareVersion() : null); +attributesMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, requestContext.clientAddress != null ? +requestContext.clientAddress.getHostAddress() : null); +// KIP-714 mentions client source port should be the client connection's source port from the +// broker's point of view. But the broker does not have this information rather the port could be +// the broker's port where the client connection is established. We might want to consider removing +// the client source port from the KIP or use broker port if that can be helpful. +// TODO: fix port Review Comment: Thanks a lot @junrao this is helpful. I will make the changes in subsequent PR to address this. I have created following jira for same: https://issues.apache.org/jira/browse/KAFKA-15811 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391639061 ## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ## @@ -80,6 +82,11 @@ public class ClientMetricsConfigs { public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; public static final String CLIENT_SOURCE_PORT = "client_source_port"; +// Empty string in client-metrics resource configs indicates that all the metrics are subscribed. +public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\""; Review Comment: The KIP mentions that the response in subscriptions should be just empty string i.e. `""`. But to create `client-metrics` resource the ConfigDef parses `""` as no data hence to specify empty string through `kafka-configs.sh` we need to pass them as empty string enclosed in string i.e. `"\"\""` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391636101 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -71,6 +73,31 @@ public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse createResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +responseData.setThrottleTimeMs(throttleTimeMs); +return new PushTelemetryResponse(responseData); +} + +public String getMetricsContentType() { +// Future versions of PushTelemetryRequest and GetTelemetrySubscriptionsRequest may include a content-type +// field to allow for updated OTLP format versions (or additional formats), but this field is currently not +// included since only one format is specified in the current proposal of the kip-714 +return OTLP_CONTENT_TYPE; +} + +public ByteBuffer getMetricsData() { +CompressionType cType = CompressionType.forId(this.data.compressionType()); +return (cType == CompressionType.NONE) ? +ByteBuffer.wrap(this.data.metrics()) : decompressMetricsData(cType, this.data.metrics()); +} + +private static ByteBuffer decompressMetricsData(CompressionType compressionType, byte[] metrics) { +// TODO: Add support for decompression of metrics data Review Comment: Yes, I have created jira in parent KIP-714 task to address this: https://issues.apache.org/jira/browse/KAFKA-15807. I am planning to get end-to-end metrics flow without compression first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391634085 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -71,6 +73,31 @@ public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse createResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +responseData.setThrottleTimeMs(throttleTimeMs); Review Comment: > Could we redirect getErrorResponse to here and rename it to errorResponse? Done > If error code is not ThrottlingQuotaExceededException, we should ignore throttleTimeMs. It might be naive but sorry I didn't understand as why throttleTimeMs should not be passed in response for other exceptions. Isn't all requests goes through common throttling code where requests might be throttled for sometime based on the throughput? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15756: Migrate existing integration tests to run old protocol in new coordinator [kafka]
dongnuo123 commented on code in PR #14675: URL: https://github.com/apache/kafka/pull/14675#discussion_r1391629976 ## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ## @@ -29,7 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource class ListConsumerGroupTest extends ConsumerGroupCommandTest { @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) + @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) def testListConsumerGroups(quorum: String): Unit = { Review Comment: It doesn't work for kraft+kip848. If changing GroupCoordinatorService#ListGroups with the same way of adding the futures as describeGroups, this test passes. Not sure why -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on PR #14670: URL: https://github.com/apache/kafka/pull/14670#issuecomment-1808930992 > re-triggered a build as the last one did not look good. I only just now updated my branch to include the revert commits, so I wouldn't necessarily expect that test run to successfully pass. I have started yet another build with the previous reverts and will keep an eye on the results. Thanks @dajac! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets
[ https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15816: Description: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: https://github.com/apache/kafka/pull/14750 * KafkaConsumerTest * KafkaProducerTest * ConfigResourceTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest Core: * DescribeAuthorizedOperationsTest * SslGssapiSslEndToEndAuthorizationTest * SaslMultiMechanismConsumerTest * SaslPlaintextConsumerTest * SaslSslAdminIntegrationTest * SaslSslConsumerTest * MultipleListenersWithDefaultJaasContextTest * DescribeClusterRequestTest Trogdor: * AgentTest These can be addressed by just fixing the tests. was: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: * KafkaConsumerTest * KafkaProducerTest * ConfigResourceTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest Core: * DescribeAuthorizedOperationsTest * SslGssapiSslEndToEndAuthorizationTest * SaslMultiMechanismConsumerTest * SaslPlaintextConsumerTest * SaslSslAdminIntegrationTest * SaslSslConsumerTest * MultipleListenersWithDefaultJaasContextTest * DescribeClusterRequestTest Trogdor: * AgentTest These can be addressed by just fixing the tests. > Typos in tests leak network sockets > --- > > Key: KAFKA-15816 > URL: https://issues.apache.org/jira/browse/KAFKA-15816 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > There are a few tests which leak network sockets due to small typos in the > tests themselves. > Clients: https://github.com/apache/kafka/pull/14750 > * KafkaConsumerTest > * KafkaProducerTest > * ConfigResourceTest > * SelectorTest > * SslTransportLayerTest > * SslTransportTls12Tls13Test > * SslVersionsTransportLayerTest > Core: > * DescribeAuthorizedOperationsTest > * SslGssapiSslEndToEndAuthorizationTest > * SaslMultiMechanismConsumerTest > * SaslPlaintextConsumerTest > * SaslSslAdminIntegrationTest > * SaslSslConsumerTest > * MultipleListenersWithDefaultJaasContextTest > * DescribeClusterRequestTest > Trogdor: > * AgentTest > These can be addressed by just fixing the tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15816: Fix leaked sockets in clients tests [kafka]
gharris1727 opened a new pull request, #14750: URL: https://github.com/apache/kafka/pull/14750 These tests leak network sockets unnecessarily, and should instead properly close the resources they instantiate. I found these via some tests which are not merge-able, as they rely on reflection and JDK-version-specific mechanisms to operate. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15816) Typos in tests leak network sockets
Greg Harris created KAFKA-15816: --- Summary: Typos in tests leak network sockets Key: KAFKA-15816 URL: https://issues.apache.org/jira/browse/KAFKA-15816 Project: Kafka Issue Type: Bug Components: unit tests Affects Versions: 3.6.0 Reporter: Greg Harris Assignee: Greg Harris There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: * KafkaConsumerTest * KafkaProducerTest * ConfigResourceTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest Core: * DescribeAuthorizedOperationsTest * SslGssapiSslEndToEndAuthorizationTest * SaslMultiMechanismConsumerTest * SaslPlaintextConsumerTest * SaslSslAdminIntegrationTest * SaslSslConsumerTest * MultipleListenersWithDefaultJaasContextTest * DescribeClusterRequestTest Trogdor: * AgentTest These can be addressed by just fixing the tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KIP-1001: Add CurrentControllerId metric [kafka]
cmccabe opened a new pull request, #14749: URL: https://github.com/apache/kafka/pull/14749 As discussed on KIP-1001 (not yet approved) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15815) JsonRestServer leaks sockets via HttpURLConnection when keep-alive enabled
Greg Harris created KAFKA-15815: --- Summary: JsonRestServer leaks sockets via HttpURLConnection when keep-alive enabled Key: KAFKA-15815 URL: https://issues.apache.org/jira/browse/KAFKA-15815 Project: Kafka Issue Type: Bug Affects Versions: 3.6.0 Reporter: Greg Harris By default HttpURLConnection has keep-alive enabled, which allows a single HttpURLConnection to be left open in order to be re-used for later requests. This means that despite JsonRestServer calling `close()` on the relevant InputStream, and calling `disconnect()` on the connection itself, the HttpURLConnection does not call `close()` on the underlying socket. This affects the Trogdor AgentTest and CoordinatorTest suites, where most of the methods make HTTP requests using the JsonRestServer. The effect is that ~32 sockets are leaked per test run, all remaining in the CLOSE_WAIT state (half closed) after the test. This is because the JettyServer has correctly closed the connections, but the HttpURLConnection has not. There does not appear to be a way to locally override the HttpURLConnection's behavior in this case, and only disabling keep-alive overall (via the system property `http.keepAlive=false`) seems to resolve the socket leaks. To prevent the leaks, we can move JsonRestServer to an alternative HTTP implementation, perhaps the jetty-client that Connect uses, or disable keepAlive during tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391496243 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import java.util.Collections; +import java.util.regex.Pattern; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.CRC32; + +/** + * Handles client telemetry metrics requests/responses, subscriptions and instance information. + */ +public class ClientMetricsManager implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); +private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + +public static ClientMetricsManager instance() { +return INSTANCE; +} +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; Review Comment: The challenge is that the connections can vary depending on the type of broker host. Larger instance typically can accommodate more connections. Do we need to make this an LRU cache? If a client is terminated or idle, we should remove them from the cache. Otherwise, we probably should just rely on the existing `max.connections `to control the client connections? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391490591 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable { public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; + +// The last subscription updated time is used to determine if the next telemetry request needs +// to re-evaluate the subscription id as per changes subscriptions. +private long lastSubscriptionUpdateEpoch; + +// Visible for testing +ClientMetricsManager() { +subscriptionMap = new ConcurrentHashMap<>(); +clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +updateLastSubscriptionUpdateEpoch(); +} +return; +} + +ClientMetricsConfigs configs = new ClientMetricsConfigs(properties); +updateClientSubscription(subscriptionName, configs); +/* + Update last subscription updated time to current time to indicate that there is a change + in the subscription. This will be used to determine if the next telemetry request needs + to re-evaluate the subscription id as per changes subscriptions. +*/ +updateLastSubscriptionUpdateEpoch(); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = System.currentTimeMillis(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issued get telemetry + request prior to push interval, then the client should get a throttle error but if the + subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} finally { +clientInstance.lastGetRequestEpoch(now); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = System.currentTimeMillis(); +ClientMetricsInstance clientInstance = getClientInstance(clientInstanceId, requestContext, now); + +try { +// Validate the push request parameters for the client instance. +validatePushRequest(request, telemetryMaxBytes, clientInstance); +} catch (ApiException exception) { +clientInstance.lastKnownError(Errors.forException(exception)); +
[PR] KAFKA-15814 Patched the SASL/GSSAPI server construction [kafka]
piotrsmolinski opened a new pull request, #14748: URL: https://github.com/apache/kafka/pull/14748 The patch fixes how the SASL/GSSAPI server is created in Kafka brokers. As described in the JIRA ticket, Kafka forces client JAAS configuration for both client and server side. This patch removes the constraint and allows to accept authentication addressed at different hosts. If the listener is used for the inter-broker communication, it must stay in the current form. Assume that Kafka cluster is accessible using `CLIENT` listener that each broker is accessible at `b-N.kafka.sample.home.arpa:9092` (N is broker id). For convenience the bootstrap uses load balanced endpoint `kafka.sample.home.arpa:9092` that distributes the traffic to the active brokers. After this patch it is possible to configure the listener's JAAS configuration to use wildcard principal: ``` listener.name.client.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/kafka/security/kafka.keytab" principal="*" isInitiator=false ; ``` The goal of this patch is to enable wildcard principal and acceptor-only mode (isInitiator=false). The keytab must contain entries for SPNs on all acceptable service FQDNs, i.e. on broker 0 we must have entries for both `kafka/kafka.sample.home.arpa@DOMAIN` and `kafka/b-0.kafka.sample.home.arpa@DOMAIN`. The patch does not change behavior of the existing configurations. For this patch the only reasonable testing strategy is manual one. That's the reason that even if the solution was already known 3 years ago, the code has not been submitted. Any ideas for automated testing of this change are warmly welcome. The comparison for various ways to configure SASL: https://github.com/piotrsmolinski/kerberos-sandbox/blob/main/src/test/java/io/confluent/sandbox/kerberos/KerberosTest.java ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15481) Concurrency bug in RemoteIndexCache leads to IOException
[ https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785597#comment-17785597 ] Divij Vaidya commented on KAFKA-15481: -- Yes, it has lots of merge conflicts because of another functionality (dynamically change cache size) that was added to 3.7 and not to 3.6.1. I plan to manually resolve those merge conflicts in next 2 days and merge into 3.6 > Concurrency bug in RemoteIndexCache leads to IOException > > > Key: KAFKA-15481 > URL: https://issues.apache.org/jira/browse/KAFKA-15481 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Assignee: Jeel Jotaniya >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > RemoteIndexCache has a concurrency bug which leads to IOException while > fetching data from remote tier. > Below events in order of timeline - > Thread 1 (cache thread): invalidates the entry, removalListener is invoked > async, so the files have not been renamed to "deleted" suffix yet. > Thread 2: (fetch thread): tries to find entry in cache, doesn't find it > because it has been removed by 1, fetches the entry from S3, writes it to > existing file (using replace existing) > Thread 1: async removalListener is invoked, acquires a lock on old entry > (which has been removed from cache), it renames the file to "deleted" and > starts deleting it > Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file > and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM > returns an error as it won't allow creation of 2GB random access file. > *Potential Fix* > Use EvictionListener instead of RemovalListener in Caffeine cache as per the > documentation: > {quote} When the operation must be performed synchronously with eviction, use > {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will > only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit > removal, {{Cache.asMap()}} offers compute methods that are performed > atomically.{quote} > This will ensure that removal from cache and marking the file with delete > suffix is synchronously done, hence the above race condition will not occur. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]
divijvaidya commented on code in PR #14727: URL: https://github.com/apache/kafka/pull/14727#discussion_r1391297279 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -988,30 +997,33 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE return; } RemoteLogSegmentMetadata metadata = segmentsIterator.next(); -if (segmentsToDelete.contains(metadata)) { -continue; -} -// When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated -// as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those -// remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether -// the epochs present in the segment lies in the checkpoint file. It will always return false -// since the checkpoint file was already truncated. -boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset( + +if (SEGMENT_DELETION_VALID_STATES.contains(metadata.state())) { Review Comment: since there is no "else" part to it, perhaps short circuiting the if condition with ``` if (!SEGMENT_DELETION_VALID_STATES.contains(metadata.state())) { continue; } ``` would provide more readability. (I don't have a strong opinion on this, hence, let me know if you wish to keep it as as it) ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; +private static final Set SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of( Review Comment: In this diagram: https://github.com/apache/kafka/blob/49d3122d425171b6a59a2b6f02d3fe63d3ac2397/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L74 copy_started -> delete_started is also a valid transition. Although, I can't think of a scenario where this would be valid because we always complete copy before calling expiration. Are we missing something here by not including copy_started? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix ClusterConnectionStatesTest.testSingleIP [kafka]
jolshan commented on PR #14741: URL: https://github.com/apache/kafka/pull/14741#issuecomment-1808642447 Shall we backport this to 3.6 as well? Maybe not a huge deal, but I see it causing some issues there too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] cherrypick KAFKA-15780: Wait for consistent KRaft metadata when creating or deleting topics [kafka]
jolshan merged PR #14713: URL: https://github.com/apache/kafka/pull/14713 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
dajac commented on PR #14670: URL: https://github.com/apache/kafka/pull/14670#issuecomment-1808636945 re-triggered a build as the last one did not look good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
pprovenzano commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1808630427 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15814) SASL Kerberos authentication cannot be used with load balanced bootstrap
Piotr Smolinski created KAFKA-15814: --- Summary: SASL Kerberos authentication cannot be used with load balanced bootstrap Key: KAFKA-15814 URL: https://issues.apache.org/jira/browse/KAFKA-15814 Project: Kafka Issue Type: Bug Components: core, security Affects Versions: 3.6.0 Reporter: Piotr Smolinski Actually it is a very old problem still unresolved. When access to Kafka is done over load balanced bootstrap (like in Kubernetes, or when the number of brokers is inpractical to enlist them in the bootstrap, or when we want to give a single access address), the broker endpoint can be accessed using at least two addresses: one for connection bootstrap (load balanced) and another one for broker connection (direct). The problem is that Kafka Kerberos configuration forces JAAS to use only one SPN (like kafka/b-0.kafka@DOMAIN). In weaker algorithms (like RC4) the same keytab entry can be used for multiple server names. The problem arises when we use stronger algorithms (like AES128 or AES256), the SPN is used to compute the messages and keytab entries for kafka/b-0.kafka@DOMAIN and kafka/kafka@DOMAIN are not compatible. JAAS configuration for Kerberos can be specified in two ways depending whether we are using it for service client or server: {code:java} com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/kafka/security/kafka.keytab" principal="kafka/node-0.kafka.home.arpa@LOCALDOMAIN" ; {code} {code:java} com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/kafka/security/kafka.keytab" principal="*" isInitiator=false ; {code} While the former one can be used on both sides, it forces only one principal to be selected from the keytab. The latter form cannot be used on the client side, but it dynamically selects the correct SPN based on the client request. Kafka Kerberos implementation does not distinguish between client and server property. In particular the same JAAS configuration entry is used when the broker uses Kerberos for inter-broker communication. Even if the listener property in the broker is known to be not used, the code currently does not allow to specify wildcard principal. Some time ago I have created a patch that solves the problem preserving the current semantics, but I did not have time to describe the submission. This ticket is a tracker for the Pull Request. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15481) Concurrency bug in RemoteIndexCache leads to IOException
[ https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785578#comment-17785578 ] Mickael Maison commented on KAFKA-15481: [~divijvaidya] Is there an issue that's preventing the backport to 3.6? > Concurrency bug in RemoteIndexCache leads to IOException > > > Key: KAFKA-15481 > URL: https://issues.apache.org/jira/browse/KAFKA-15481 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Divij Vaidya >Assignee: Jeel Jotaniya >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > RemoteIndexCache has a concurrency bug which leads to IOException while > fetching data from remote tier. > Below events in order of timeline - > Thread 1 (cache thread): invalidates the entry, removalListener is invoked > async, so the files have not been renamed to "deleted" suffix yet. > Thread 2: (fetch thread): tries to find entry in cache, doesn't find it > because it has been removed by 1, fetches the entry from S3, writes it to > existing file (using replace existing) > Thread 1: async removalListener is invoked, acquires a lock on old entry > (which has been removed from cache), it renames the file to "deleted" and > starts deleting it > Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file > and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM > returns an error as it won't allow creation of 2GB random access file. > *Potential Fix* > Use EvictionListener instead of RemovalListener in Caffeine cache as per the > documentation: > {quote} When the operation must be performed synchronously with eviction, use > {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will > only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit > removal, {{Cache.asMap()}} offers compute methods that are performed > atomically.{quote} > This will ensure that removal from cache and marking the file with delete > suffix is synchronously done, hence the above race condition will not occur. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]
agavra commented on PR #14708: URL: https://github.com/apache/kafka/pull/14708#issuecomment-1808460098 Ah same! Apologies, I made the same mistake. Will remember next time to look over the individual JDKs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane
[ https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785540#comment-17785540 ] Proven Provenzano commented on KAFKA-15513: --- SCRAM between controllers will need work as we need to solve the problem of how does a controller access the metadata topic before it is part of the controller quorum. SASL doesn't have this problem because both the all the authentication info is in the config and not part of the metadata. > KRaft cluster fails with SCRAM authentication enabled for control-plane > --- > > Key: KAFKA-15513 > URL: https://issues.apache.org/jira/browse/KAFKA-15513 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0, 3.5.1 >Reporter: migruiz4 >Priority: Major > > We have observed a scenario where a KRaft cluster fails to bootstrap when > using SCRAM authentication for controller-to-controller communications. > The steps to reproduce are simple: > * Deploy (at least) 2 Kafka servers using latest version 3.5.1. > * Configure a KRaft cluster, where the controller listener uses > SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the > recommended in-line jaas config > '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}' > * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create > the SCRAM user. > When initialized, Controllers will fail to connect to each other with an > authentication error: > > {code:java} > [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: > Failed to send the following request due to authentication error: > ClientRequest(expectResponse=true, > callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075, > destination=0, correlationId=129, clientId=raft-client-1, > createdTimeMs=1690888364960, > requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', > topics=[TopicData(topicName='__cluster_metadata', > partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, > lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code} > Some additional details about the scenario that we tested out: > * Controller listener does work when configured with SASL+PLAIN > * The issue only affects the Controller listener, SCRAM users created using > the same method work for data-plane listeners and inter-broker listeners. > > Below you can find the exact configuration and command used to deploy: > * server.properties > {code:java} > listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093 > advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091 > listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT > num.network.threads=3 > num.io.threads=8 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/bitnami/kafka/data > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=1 > transaction.state.log.replication.factor=1 > transaction.state.log.min.isr=1 > log.retention.hours=168 > log.retention.check.interval.ms=30 > controller.listener.names=CONTROLLER > controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093 > inter.broker.listener.name=INTERNAL > node.id=0 > process.roles=controller,broker > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 > sasl.mechanism.controller.protocol=SCRAM-SHA-512 > listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512 > listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule > required username="controller_user" password="controller_password";{code} > * kafka-storage.sh command > {code:java} > kafka-storage.sh format --config /path/to/server.properties > --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram > SCRAM-SHA-512=[name=controller_user,password=controller_password] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KIP-988 [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391293607 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; +@SuppressWarnings("ClassFanOutComplexity") Review Comment: We are open for suggestion to solve this :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15420) Kafka Tiered Storage V1
[ https://issues.apache.org/jira/browse/KAFKA-15420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15420: - Affects Version/s: 3.6.0 > Kafka Tiered Storage V1 > --- > > Key: KAFKA-15420 > URL: https://issues.apache.org/jira/browse/KAFKA-15420 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.0 >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785526#comment-17785526 ] Ismael Juma commented on KAFKA-15605: - The PR was merged, can this be closed [~davidarthur] ? > Topics marked for deletion in ZK are incorrectly migrated to KRaft > -- > > Key: KAFKA-15605 > URL: https://issues.apache.org/jira/browse/KAFKA-15605 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.6.1 > > > When migrating topics from ZooKeeper, the KRaft controller reads all the > topic and partition metadata from ZK directly. This includes topics which > have been marked for deletion by the ZK controller. After being migrated to > KRaft, the pending topic deletions are never completed, so it is as if the > delete topic request never happened. > Since the client request to delete these topics has already been returned as > successful, it would be confusing to the client that the topic still existed. > An operator or application would need to issue another topic deletion to > remove these topics once the controller had moved to KRaft. If they tried to > create a new topic with the same name, they would receive a > TOPIC_ALREADY_EXISTS error. > The migration logic should carry over pending topic deletions and resolve > them either as part of the migration or shortly after. > *Note to operators:* > To determine if a migration was affected by this, an operator can check the > contents of {{/admin/delete_topics}} after the KRaft controller has migrated > the metadata. If any topics are listed under this ZNode, they were not > deleted and will still be present in KRaft. At this point the operator can > make a determination if the topics should be re-deleted (using > "kafka-topics.sh --delete") or left in place. In either case, the topics > should be removed from {{/admin/delete_topics}} to prevent unexpected topic > deletion in the event of a fallback to ZK. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15552) Duplicate Producer ID blocks during ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785522#comment-17785522 ] Ismael Juma commented on KAFKA-15552: - [~showuon] Can this be closed then? > Duplicate Producer ID blocks during ZK migration > > > Key: KAFKA-15552 > URL: https://issues.apache.org/jira/browse/KAFKA-15552 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.5.2, 3.6.1 > > > When migrating producer ID blocks from ZK to KRaft, we are taking the current > producer ID block from ZK and writing it's "firstProducerId" into the > producer IDs KRaft record. However, in KRaft we store the _next_ producer ID > block in the log rather than storing the current block like ZK does. The end > result is that the first block given to a caller of AllocateProducerIds is a > duplicate of the last block allocated in ZK mode. > > This can result in duplicate producer IDs being given to transactional or > idempotent producers. In the case of transactional producers, this can cause > long term problems since the producer IDs are persisted and reused for a long > time. > The time between the last producer ID block being allocated by the ZK > controller and all the brokers being restarted following the metadata > migration is when this bug is possible. > > Symptoms of this bug will include ReplicaManager OutOfOrderSequenceException > and possibly some producer epoch validation errors. To see if a cluster is > affected by this bug, search for the offending producer ID and see if it is > being used by more than one producer. > > For example, the following error was observed > {code} > Out of order sequence number for producer 376000 at offset 381338 in > partition REDACTED: 0 (incoming seq. number), 21 (current end sequence > number) > {code} > Then searching for "376000" on > org.apache.kafka.clients.producer.internals.TransactionManager logs, two > brokers both show the same producer ID being provisioned > {code} > Broker 0 [Producer clientId=REDACTED-0] ProducerId set to 376000 with epoch 1 > Broker 5 [Producer clientId=REDACTED-1] ProducerId set to 376000 with epoch 1 > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Revert "Revert "KAFKA-15661: KIP-951: Server side changes (#14444)" [kafka]
chb2ab opened a new pull request, #14747: URL: https://github.com/apache/kafka/pull/14747 Investigating `org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest` failure This reverts commit a98bd7d65fb5a3a188ff524db7619dc7fe4257fa. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13627) Topology changes shouldn't require a full reset of local state
[ https://issues.apache.org/jira/browse/KAFKA-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785518#comment-17785518 ] Nicholas Telford commented on KAFKA-13627: -- Hi, I've temporarily parked KIP-816 to focus on KIP-892, but if you'd like to take it over, I'm happy for you to do so. My team have an implementation of "Option B" in production, which appears to work well and has proved very reliable. It's written in Kotlin, but should be trivial to translate to Java if you would like to use it as the basis of an implementation. You can see it here: [https://gist.github.com/nicktelford/15cc596a25de33a673bb5bd4c81edd0f] When I explored Options A and C, I found many difficulties, owing to places that either depended on the TaskId being present in the state directory path, or that depended on the format of the TaskId, so I would highly recommend pursuing Option B, since it's easy to implement, reliable, and the logic can be isolated from the rest of Kafka Streams, making it easy to maintain. > Topology changes shouldn't require a full reset of local state > -- > > Key: KAFKA-13627 > URL: https://issues.apache.org/jira/browse/KAFKA-13627 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0 >Reporter: Nicholas Telford >Priority: Major > > [KIP-816|https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset] > When changes are made to a Topology that modifies its structure, users must > use the Application Reset tool to reset the local state of their application > prior to deploying the change. Consequently, these changes require rebuilding > all local state stores from their changelog topics in Kafka. > The time and cost of rebuilding state stores is determined by the size of the > state stores, and their recent write history, as rebuilding a store entails > replaying all recent writes to the store. For applications that have very > large stores, or stores with extremely high write-rates, the time and cost of > rebuilding all state in the application can be prohibitively expensive. This > is a significant barrier to building highly scalable applications with good > availability. > Changes to the Topology that do not directly affect a state store should not > require the local state of that store to be reset/deleted. This would allow > applications to scale to very large data sets, whilst permitting the > application behaviour to evolve over time. > h1. Background > Tasks in a Kafka Streams Topology are logically grouped by “Topic Group'' > (aka. Subtopology). Topic Groups are assigned an ordinal (number), based on > their position in the Topology. This Topic Group ordinal is used as the > prefix for all Task IDs: {{{}_{}}}, > e.g. {{2_14}} > If new Topic Groups are added, old Topic Groups are removed, or existing > Topic Groups are re-arranged, this can cause the assignment of ordinals to > change {_}even for Topic Groups that have not been modified{_}. > When the assignment of ordinals to Topic Groups changes, existing Tasks are > invalidated, as they no longer correspond to the correct Topic Groups. Local > state is located in directories that include the Task ID (e.g. > {{{}/state/dir/2_14/mystore/rocksdb/…{}}}), and since the Tasks have all been > invalidated, all existing local state directories are also invalid. > Attempting to start an application that has undergone these ordinal changes, > without first clearing the local state, will cause Kafka Streams to attempt > to use the existing local state for the wrong Tasks. Kafka Streams detects > this discrepancy and prevents the application from starting. -- This message was sent by Atlassian Jira (v8.20.10#820010)