[jira] [Created] (KAFKA-15821) Active topics for deleted connectors are not reset in standalone mode

2023-11-13 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15821:
-

 Summary: Active topics for deleted connectors are not reset in 
standalone mode
 Key: KAFKA-15821
 URL: https://issues.apache.org/jira/browse/KAFKA-15821
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.3.2, 3.3.1, 3.2.3, 3.2.2, 
3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 
2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0, 2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.5.0, 
3.7.0
Reporter: Chris Egerton


In 
[KIP-558|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect],
 a new REST endpoint was added to report the set of active topics for a 
connector. The KIP specified that "Deleting a connector will reset this 
connector's set of active topics", and this logic was successfully implemented 
in distributed mode. However, in standalone mode, active topics for deleted 
connectors are not deleted, and if a connector is re-created, it will inherit 
the active topics of its predecessor(s) unless they were manually reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15802) Trying to access uncopied segments metadata on listOffsets

2023-11-13 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15802:
---
Fix Version/s: 3.7.0
   3.6.1

> Trying to access uncopied segments metadata on listOffsets
> --
>
> Key: KAFKA-15802
> URL: https://issues.apache.org/jira/browse/KAFKA-15802
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> We have a tiered storage cluster running with Aiven s3 plugin. 
> On our cluster, we have a process doing regular listOffsets requests. 
> This triggers the following exception:
> {code:java}
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> Requested remote resource was not found
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$6(RemoteIndexCache.java:355)
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:318)
> Nov 09, 2023 1:42:01 PM com.github.benmanes.caffeine.cache.LocalAsyncCache 
> lambda$handleCompletion$7
> WARNING: Exception thrown during asynchronous load
> java.util.concurrent.CompletionException: 
> io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:107)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at io.aiven.kafka.tieredstorage.storage.s3.S3Storage.fetch(S3Storage.java:80)
> at 
> io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.lambda$new$1(SegmentManifestProvider.java:59)
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:103)
> ... 7 more
> Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The 
> specified key does not exist. (Service: S3, Status Code: 404, Request ID: 
> CFMP27PVC9V2NNEM, Extended Request ID: 
> F5qqlV06qQJ5qCuWl91oueBaha0QLMBURJudnOnFDQk+YbgFcAg70JBATcARDxN44DGo+PpfZHAsum+ioYMoOw==)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
> at 
> 

Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]

2023-11-13 Thread via GitHub


satishd commented on code in PR #14727:
URL: https://github.com/apache/kafka/pull/14727#discussion_r1392031838


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
 private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
+private static final Set 
SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of(

Review Comment:
   COPY_SEGMENT_STARTED segments are eligible for deletion when those segments 
were not able to be copied by the leader as the leader went through ungraceful 
shutdown or for any oher reasons. New leader may pickup the resepctive segments 
for the targeted offsets that need to be copied and the earlier failed segment 
will remain in the COPY_SEGMENT_STARTED state and it will eventually be deleted 
by retention cleanup logic. 
   
   So, COPY_SEGMENT_STARTED is a valid transition even now when copy and 
deletion are happening sequentially. 



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -1027,8 +1027,32 @@ void testFindOffsetByTimestampWithInvalidEpochSegments() 
throws IOException, Rem
 assertEquals(Optional.empty(), maybeTimestampAndOffset3);
 }
 
+@Test
+void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, 
RemoteStorageException {
+TopicPartition tp = leaderTopicIdPartition.topicPartition();
+
+long ts = time.milliseconds();
+long startOffset = 120;
+int targetLeaderEpoch = 10;
+
+TreeMap validSegmentEpochs = new TreeMap<>();
+validSegmentEpochs.put(targetLeaderEpoch, startOffset);
+
+LeaderEpochFileCache leaderEpochFileCache = new 
LeaderEpochFileCache(tp, checkpoint);
+leaderEpochFileCache.assign(4, 99L);
+leaderEpochFileCache.assign(5, 99L);
+leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
+leaderEpochFileCache.assign(12, 500L);
+
+doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+
+Optional maybeTimestampAndOffset1 = 
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, 
leaderEpochFileCache);

Review Comment:
   Can you rename `maybeTimestampAndOffset1` as `maybeTimestampAndOffset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-13 Thread via GitHub


philipnee commented on PR #14680:
URL: https://github.com/apache/kafka/pull/14680#issuecomment-1809546161

   Hi @lucasbru - Thanks for your feedback, the PR was updated.  Would you have 
time to go over the comments again? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1809524348

   There are different tests failing from previous runs and none related to the 
PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15820) Add a metric to track the number of partitions under min ISR

2023-11-13 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15820:
--

 Summary: Add a metric to track the number of partitions under min 
ISR
 Key: KAFKA-15820
 URL: https://issues.apache.org/jira/browse/KAFKA-15820
 Project: Kafka
  Issue Type: Sub-task
Reporter: Calvin Liu
Assignee: Calvin Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits [kafka]

2023-11-13 Thread via GitHub


github-actions[bot] commented on PR #13911:
URL: https://github.com/apache/kafka/pull/13911#issuecomment-1809490953

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]

2023-11-13 Thread via GitHub


kirktrue commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1391862343


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -1093,6 +1099,10 @@ private void subscribeInternal(Collection 
topics, Optional(topics), listener))
 metadata.requestUpdateForNewTopics();
+
+// Trigger subscribe event to effectively join the group if not 
already part of it,
+// or just send the new subscription to the broker.
+applicationEventHandler.add(new 
SubscriptionChangeApplicationEvent());

Review Comment:
   There's another `subscribeInternal()` for the topic pattern path. We want 
this there too, right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() {
 
 @Override
 public void enforceRebalance() {
-throw new KafkaException("method not implemented");

Review Comment:
   Good call!



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * Application event triggered when a user calls the unsubscribe API. This 
will make the consumer
+ * release all its assignments and send a heartbeat request to leave the 
consumer group.
+ * This event holds a future that will complete when the invocation of 
callbacks to release
+ * complete and the heartbeat to leave the group is sent out (minimal effort 
to send the
+ * leave group heartbeat, without waiting for any response or considering 
timeouts).
+ */
+public class UnsubscribeApplicationEvent extends 
CompletableApplicationEvent {

Review Comment:
   The intention of the `CompleteableApplicationEvent` was to have a way for 
the consumer to block on the results of operations performed in the background 
thread. Since the `Consumer.unsubscribe()` API call is non-blocking, I'm 
thinking this should be a subclass of `ApplicationEvent`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java:
##
@@ -25,7 +25,8 @@ public abstract class ApplicationEvent {
 
 public enum Type {
 COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
-LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA
+LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,

Review Comment:
   ```suggestion
   LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIBED,
   ```
   
   `SUBSCRIPTION_CHANGE` is a bit vague. Does it encompass more than the event 
of the user calling `Consumer.subscribe()`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -181,63 +347,537 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
 public void transitionToFenced() {
 resetEpoch();
 transitionTo(MemberState.FENCED);
+
+// Release assignment
+CompletableFuture callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+callbackResult.whenComplete((result, error) -> {
+if (error != null) {
+log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+"after member got fenced. Member will rejoin the group 
anyways.", error);
+}
+subscriptions.assignFromSubscribed(Collections.emptySet());
+transitionToJoining();
+});
+
+clearPendingAssignmentsAndLocalNamesCache();
 }
 
 /**
  * {@inheritDoc}
  */
 @Override
-public void transitionToFailed() {
-log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-transitionTo(MemberState.FAILED);
+public void transitionToFatal() {
+log.error("Member {} transitioned to {} state", 

Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-13 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391891296


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
+@SuppressWarnings("ClassFanOutComplexity")

Review Comment:
   Well actually after reading through the whole PR, I do think it's 
unnecessary complexity for the `TaskManager` after all -- see [this 
comment](https://github.com/apache/kafka/pull/14735#discussion_r1391855192). 
But that's more because of the complexity of determining when to invoke 
`#onUpdateSuspended` and to keep everything in the StoreChangelogReader class. 
Hopefully that all makes sense to you too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]

2023-11-13 Thread via GitHub


sean-rossignol commented on PR #13283:
URL: https://github.com/apache/kafka/pull/13283#issuecomment-1809404175

   Hey, anyway I could help unblock this pr @vamossagar12 ?
   
   We have an internal kstreams app with a wildcard consumer and one bottleneck 
we're hitting is as the number of source topics grows so does our initial total 
heap size when we need to rehydrate the components down stream of our cooker.  
The problem is that we set the max heap size across all instances of the 
kstreams app to handle the initial expected load when we first deploy the 
application.  Over time the number of source topics grow and come rehydrate 
time we will have to increase the resources dedicated to the instance, futz 
with the buffered.records.per.partition config, or reset the offsets of the 
source topic partitions in batches, otherwise all instances of our application 
will go OOM.  Being able to define and enforce a maximum input buffer size at 
the instance/thread level would allow us to handle these rehydration events 
without needing to change any other elements of our deployments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]

2023-11-13 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391877914


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
+@SuppressWarnings("ClassFanOutComplexity")

Review Comment:
   Oh, I see. I was thinking that this might be an unnecessary complexity for 
the `TaskManager` class, and instead we can pass standby callback to the 
`ActiveTaskCreator`. But not sure if Standby callback API is compatible with 
the terminology used in the  `ActiveTaskCreator` class.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
+@SuppressWarnings("ClassFanOutComplexity")

Review Comment:
   Oh, I see. I was thinking that this might be an unnecessary complexity for 
the `TaskManager` class, and instead we can pass standby callback to the 
`ActiveTaskCreator`. But not sure if Standby callback API is compatible with 
the terminology used in the  `ActiveTaskCreator` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15816: Fix leaked sockets in core tests [kafka]

2023-11-13 Thread via GitHub


gharris1727 opened a new pull request, #14754:
URL: https://github.com/apache/kafka/pull/14754

   These tests leak sockets due to various typos. Additionally, many tests 
leaked sockets because MiniKdc stops asynchronously, leaving sockets open at 
the end of the test which are eventually cleaned up. This patch uses the 
alternative dispose method which awaits termination of the internal resources 
of the KDC before proceeding with the normal shutdown.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-988 [kafka]

2023-11-13 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391815838


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -580,6 +583,17 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
 }
 }
 
+public void setStandbyUpdateListener(final StandbyUpdateListener 
globalStandbyListener) {

Review Comment:
   Can you add javadocs (you can pretty much copy/paste from the above API for 
reference)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15819) KafkaServer leaks KafkaRaftManager when ZK migration enabled

2023-11-13 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15819:

Priority: Minor  (was: Major)

> KafkaServer leaks KafkaRaftManager when ZK migration enabled
> 
>
> Key: KAFKA-15819
> URL: https://issues.apache.org/jira/browse/KAFKA-15819
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> In SharedServer, TestRaftServer, and MetadataShell, the KafkaRaftManager is 
> maintained as an instance variable, and shutdown when the outer instance is 
> shutdown. However, in the KafkaServer, the KafkaRaftManager is instantiated 
> and started, but then the reference is lost.
> [https://github.com/apache/kafka/blob/49d3122d425171b6a59a2b6f02d3fe63d3ac2397/core/src/main/scala/kafka/server/KafkaServer.scala#L416-L442]
> Instead, the KafkaServer should behave like the other call-sites of 
> KafkaRaftManager, and shutdown the KafkaRaftManager during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KIP-988 [kafka]

2023-11-13 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391848915


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map 
tasks,
 // no records to restore; in this case we just initialize the 
sensor to zero
 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
 task.recordRestoration(time, recordsToRestore, true);
+}  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY && storeMetadata.endOffset() != null) {

Review Comment:
   Why do we only invoke the standby listener when we have the `endOffset` 
filled out? I don't know off the top of my head when it would be empty or not, 
but I would think the listener callbacks are still useful even without this one 
field filled in. Also, users might rely on the standby listener callbacks being 
invoked. I think we should guarantee that the listener is always called, at 
least in the absence of app-wide errors that cause a shutdown)
   
   If you're wondering what to do in the case of it being null, I would suggest 
just passing in  `-1L` as a sentinel value. As long as we mention that this is 
a possibility in the javadocs for the `endOffset` argument, I don't see any 
problem with leaving it up to the user to decide how to react in this case



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
 }
 
 private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set partitions) {
-return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+final ProcessorStateManager stateManager = standbyTask.stateManager();
+for (final TopicPartition partition : partitions) {
+final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+if (storeMetadata != null && storeMetadata.endOffset() != null) {
+standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   Seems like this is the only place where we invoke the `onUpdateSuspended` 
callback, ie we're missing the `SuspendReason.MIGRATED` case right?
   
   We can just look to the active restore listener and follow that same example 
-- looks like it invokes the analogous `#onRestoreSuspended` callback in the 
StoreChangelogReader, specifically in the `#unregister` method. Personally, I 
think that would be the best place to invoke the standby listener's 
`#onUpdateSuspended`, not just in the `MIGRATED` case but also for `PROMOTED`, 
so we can keep the logic in one place. And `#unregister` is the perfect place 
to do so, since it always gets invoked whether the task is being closed or 
recycled. You can just add a parameter to the `#unregister` method to pass in 
which of those two options it was.
   
   Keeping everything in the StoreChangelogReader also helps us avoid some 
gnarly questions about special case handling, because the question of when a 
task is closed can actually be pretty complicated when you look at it within 
the TaskManager: for example corrupted tasks may be closed and revived, opening 
up questions about whether and when to invoke `#onUpdateSuspend` if it's going 
to be revived again. But within the StoreChangelogReader we know that each task 
should have a 1:1 ratio of calls to `prepareChangelogs` and `unregister`, so 
it's much easier to reason about



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-988 [kafka]

2023-11-13 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391818555


##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.

Review Comment:
   ```suggestion
* Method called upon the initialization of the standby task, just 
before it begins to load from the changelog.
   ```
   nit: I won't comment this everywhere, and frankly it's not a big deal and 
I'm only pointing it out because it should be a quick fix with find, 
but we usually don't capitalize terms like "Standby Task". Mainly to help 
differentiate between the common name and the class name. So for example 
"TopicPartition" is fine, but "Topic Partition" would be weird. Similarly, I 
would use "standby task" here, although "StandbyTask" is also correct 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-988 [kafka]

2023-11-13 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391822284


##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 
Standby Task.
+ * @param startingOffset   the offset from which the Standby Task starts 
watching.
+ * @param currentEndOffset the current latest offset on the associated 
changelog partition.
+ */
+void onUpdateStart(final TopicPartition topicPartition,
+   final String storeName,
+   final long startingOffset,
+   final long currentEndOffset);
+
+/**
+ * Method called after restoring a batch of records.  In this case the 
maximum size of the batch is whatever
+ * the value of the MAX_POLL_RECORDS is set to.
+ *

Review Comment:
   formatting nit: add a single `` between paragraphs in the javadocs 
(although you don't need a closing `` tag, nor do you need a `` between 
the last paragraph and the `@param` section)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-988 [kafka]

2023-11-13 Thread via GitHub


ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391818555


##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.

Review Comment:
   ```suggestion
* Method called upon the initialization of the standby task, just 
before it begins to load from the changelog.
   ```
   nit: I won't comment this everywhere, and frankly it's not a big deal and 
I'm only pointing it out because it should be a quick fix with find, 
but we usually don't capitalize terms like "Standby Task". Mainly to help 
differentiate between the common name and the class name. So for example 
"TopicPartition" is fine, but "Topic Partition" would be weird. Similarly, I 
would use "standby task" here, although "StandbyTask" is also correct 



##
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+enum SuspendReason {
+MIGRATED,
+PROMOTED
+}
+
+/**
+ * Method called upon the creation of the Standby Task.
+ *
+ * @param topicPartition   the TopicPartition of the Standby Task.
+ * @param storeNamethe name of the store being watched by this 
Standby Task.
+ * @param startingOffset   the offset from which the Standby Task starts 
watching.
+ * @param currentEndOffset the current latest offset on the associated 
changelog partition.

Review Comment:
   ```suggestion
* @param currentEndOffset the current end offset on the associated 
changelog partition.
   ```
   I guess you could also say "highest offset", but "latest" feels a bit 
ambiguous, at least to me personally



##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -742,6 +756,30 @@ public void onRestoreSuspended(final TopicPartition 
topicPartition, final String
 }
 }
 
+final class DelegatingStandbyUpdateListener implements 
StandbyUpdateListener {
+
+@Override
+public void onUpdateStart(final TopicPartition topicPartition, final 
String storeName, final long startingOffset, final long currentEndOffset) {

Review Comment:
   nit: these method signatures are all really long, can you break up the 
parameters? This is the format we use:
   ```suggestion
   public void onUpdateStart(final TopicPartition topicPartition, 
 final String storeName, 
 final long startingOffset, 
 final long currentEndOffset) {
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 

Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]

2023-11-13 Thread via GitHub


cmccabe merged PR #14682:
URL: https://github.com/apache/kafka/pull/14682


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]

2023-11-13 Thread via GitHub


jolshan opened a new pull request, #14753:
URL: https://github.com/apache/kafka/pull/14753

   In 91fa196930ece7342f38a5404…dfc09cf607916eb, I accidentally removed the 
action queue paramater that was added in 
7d147cf2413e5d361422728e5c9306574658c78d
   
   I don't think this broke anything since we don't use verification for group 
coordinator commits, but I should fix it to be as it was before.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-13 Thread via GitHub


kirktrue commented on PR #14752:
URL: https://github.com/apache/kafka/pull/14752#issuecomment-1809303495

   @philipnee Would you tag this with `ctr` and review?
   
   This is a low priority clean up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-988 [kafka]

2023-11-13 Thread via GitHub


ableegoldman commented on PR #14735:
URL: https://github.com/apache/kafka/pull/14735#issuecomment-1809299774

   I'll take a look! In the meantime, can you format the PR title with the 
ticket number? I thought we had a guide on this somewhere but I can't find it 
mentioned anywhere  Anyways, it should be of the form `KAFKA-12345: title of 
the PR` ideally with the KIP number included in the "title of the PR" after the 
JIRA ticket number.
   
   (I'm not just giving you a hard time for no reason, there are a bunch of 
integrations and tools that rely on/assume this formatting to do things like 
automatically link PRs to JIRA tickets)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-13 Thread via GitHub


kirktrue opened a new pull request, #14752:
URL: https://github.com/apache/kafka/pull/14752

   Added comments, made package-visible, and removed inner interface and 
classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-13 Thread via GitHub


kirktrue closed pull request #14670: KAFKA-15277: Design & implement support 
for internal Consumer delegates
URL: https://github.com/apache/kafka/pull/14670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]

2023-11-13 Thread via GitHub


jeqo commented on code in PR #14727:
URL: https://github.com/apache/kafka/pull/14727#discussion_r1391798949


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -988,30 +997,33 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
 return;
 }
 RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
-if (segmentsToDelete.contains(metadata)) {
-continue;
-}
-// When the log-start-offset is moved by the user, the 
leader-epoch-checkpoint file gets truncated
-// as per the log-start-offset. Until the 
rlm-cleaner-thread runs in the next iteration, those
-// remote log segments won't be removed. The 
`isRemoteSegmentWithinLeaderEpoch` validates whether
-// the epochs present in the segment lies in the 
checkpoint file. It will always return false
-// since the checkpoint file was already truncated.
-boolean shouldDeleteSegment = 
remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
+
+if 
(SEGMENT_DELETION_VALID_STATES.contains(metadata.state())) {

Review Comment:
   Agree, applying this fix. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]

2023-11-13 Thread via GitHub


jeqo commented on code in PR #14727:
URL: https://github.com/apache/kafka/pull/14727#discussion_r1391797997


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
 private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
+private static final Set 
SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of(

Review Comment:
   Great catch. I guess the transition hasn't considered the current 
implementation where copying and deletion are sequential. If the implementation 
changes, it may be possible to have this scenario.
   I'd say it should be considered a valid transition. I will include it in the 
valid states and adapt the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391783416


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -277,6 +279,24 @@ class BrokerServer(
 time
   )
 
+  if (config.logDirs.size > 1) {

Review Comment:
   Should we log something here, to indicate that JBOD mode is on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15819: Fix leaked KafkaRaftManager in ZK mode during migration [kafka]

2023-11-13 Thread via GitHub


gharris1727 opened a new pull request, #14751:
URL: https://github.com/apache/kafka/pull/14751

   The other call sites for KafkaRaftManager (SharedServer, TestRaftServer, 
MetadataShell) appear to shutdown the KafkaRaftManager when shutting down 
themselves. The call-site in ZK-mode KafkaServer should behave the same way.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391782832


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);
+}
+}
+
+

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391781726


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);
+
+private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+private final Time time;
+private final NodeToControllerChannelManager channelManager;
+private final int brokerId;
+private final Supplier brokerEpochSupplier;
+private final KafkaEventQueue eventQueue;
+
+// These variables should only be mutated from the event loop thread
+private Map inflight = null;
+private Map pending = new HashMap<>();
+private final ExponentialBackoff resendExponentialBackoff =
+new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+private int failedAttempts = 0;
+
+public AssignmentsManager(Time time,
+  NodeToControllerChannelManager channelManager,
+  int brokerId,
+  Supplier brokerEpochSupplier) {
+this.time = time;
+this.channelManager = channelManager;
+this.brokerId = brokerId;
+this.brokerEpochSupplier = brokerEpochSupplier;
+this.eventQueue = new KafkaEventQueue(time,
+new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+"broker-" + brokerId + "-directory-assignments-manager-");
+}
+
+public void close() throws InterruptedException {
+eventQueue.close();
+channelManager.shutdown();
+}
+
+public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+}
+
+// only for testing
+void wakeup() {
+eventQueue.wakeup();
+}
+
+/**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+private abstract static class Event implements EventQueue.Event {
+/**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+@Override
+public void handleException(Throwable e) {
+throw new RuntimeException(e);

Review Comment:
   

Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391780510


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);

Review Comment:
   Also a lot of CPU clocks don't go down to an individual nanosecond. I don't 
know the exact details (varies a lot by platform) but basically you probably 
can't schedule an actual 1 ns delay.
   
   Something like a 1/4 a ms would be more reasonable? Though still likely to 
be padded out to a longer length by the actual platform



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15819) KafkaServer leaks KafkaRaftManager when ZK migration enabled

2023-11-13 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15819:
---

 Summary: KafkaServer leaks KafkaRaftManager when ZK migration 
enabled
 Key: KAFKA-15819
 URL: https://issues.apache.org/jira/browse/KAFKA-15819
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.6.0
Reporter: Greg Harris
Assignee: Greg Harris


In SharedServer, TestRaftServer, and MetadataShell, the KafkaRaftManager is 
maintained as an instance variable, and shutdown when the outer instance is 
shutdown. However, in the KafkaServer, the KafkaRaftManager is instantiated and 
started, but then the reference is lost.

[https://github.com/apache/kafka/blob/49d3122d425171b6a59a2b6f02d3fe63d3ac2397/core/src/main/scala/kafka/server/KafkaServer.scala#L416-L442]

Instead, the KafkaServer should behave like the other call-sites of 
KafkaRaftManager, and shutdown the KafkaRaftManager during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391779074


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+/**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ */
+private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(1);

Review Comment:
   I don't think 1 nanosecond is going to give you much batching!
   
   At 4 GHz, 1 nanosecond is like 4 CPU cycles
   
   Obviously you have piplining, instruction reordering, yadda yadda, but you 
get the idea: this is NOT a reasonable amount of time to get anything done in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15818) Implement max poll internval

2023-11-13 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15818:
---
Description: 
The consumer needs to be polled at a candance lower than MAX_POLL_INTERVAL_MAX 
otherwise the consumer should try to leave the group.  Currently, we send an 
acknowledgment event to the network thread per poll.  The event only triggers 
update on autocommit state, we need to implement updating the poll timer so 
that the consumer can leave the group when the timer expires. 

 

The current logic looks like this:
{code:java}
 if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has 
stalled
// in between calls to poll().
log.warn("consumer poll timeout has expired. This means the time between 
subsequent calls to poll() " +
"was longer than the configured max.poll.interval.ms, which typically 
implies that " +
"the poll loop is spending too much time processing messages. You can 
address this " +
"either by increasing max.poll.interval.ms or by reducing the maximum 
size of batches " +
"returned in poll() with max.poll.records.");

maybeLeaveGroup("consumer poll timeout has expired.");
} {code}

  was:
In the network thread, we need a timer configure to take MAX_POLL_INTERVAL_MAX. 
 The reason is if the user don't poll the consumer within the internal, the 
member needs to leave the group.

 

Currently, we send an acknowledgement event to the network thread per poll.  It 
needs to do two things 1. update autocommit state 2. update max poll interval 
timer 

 

The current logic looks like this:
{code:java}
 if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has 
stalled
// in between calls to poll().
log.warn("consumer poll timeout has expired. This means the time between 
subsequent calls to poll() " +
"was longer than the configured max.poll.interval.ms, which typically 
implies that " +
"the poll loop is spending too much time processing messages. You can 
address this " +
"either by increasing max.poll.interval.ms or by reducing the maximum 
size of batches " +
"returned in poll() with max.poll.records.");

maybeLeaveGroup("consumer poll timeout has expired.");
} {code}


> Implement max poll internval
> 
>
> Key: KAFKA-15818
> URL: https://issues.apache.org/jira/browse/KAFKA-15818
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Priority: Blocker
>
> The consumer needs to be polled at a candance lower than 
> MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group.  
> Currently, we send an acknowledgment event to the network thread per poll.  
> The event only triggers update on autocommit state, we need to implement 
> updating the poll timer so that the consumer can leave the group when the 
> timer expires. 
>  
> The current logic looks like this:
> {code:java}
>  if (heartbeat.pollTimeoutExpired(now)) {
> // the poll timeout has expired, which means that the foreground thread 
> has stalled
> // in between calls to poll().
> log.warn("consumer poll timeout has expired. This means the time between 
> subsequent calls to poll() " +
> "was longer than the configured max.poll.interval.ms, which typically 
> implies that " +
> "the poll loop is spending too much time processing messages. You can 
> address this " +
> "either by increasing max.poll.interval.ms or by reducing the maximum 
> size of batches " +
> "returned in poll() with max.poll.records.");
> maybeLeaveGroup("consumer poll timeout has expired.");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1809242988

   I don't know if we want to do this in this PR, but one thing I would suggest 
for ReplicaManager is to use the `FaultHandler` paradigm we have in the 
`QuorumController` code.
   
   Specifically, when `QuorumController` hits an unrecoverable condition it 
will invoke the `FaultHandler`. In normal operation this maps to logging an 
error message and exiting. In unit tests, it maps to an exception, and also 
setting a flag that will cause any integration test to always fail.
   
   This accomplishes two things:
   1. avoids calling `exit(1)` in junit tests, which will kill Jenkins dead 
(even after 3 decades of Java, we don't have the technology to intercept 
`exit()` in unit testrs >:( )
   2. allows us to always know if something is going wrong in the unit / 
integration test.
   
   There can also be non-fatal fault handlers, which tend to make point 2 even 
more important (since many times throwing an exception or logging an ERROR will 
not prevent the test from succeeding!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15818) Implement max poll internval

2023-11-13 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15818:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Task)

> Implement max poll internval
> 
>
> Key: KAFKA-15818
> URL: https://issues.apache.org/jira/browse/KAFKA-15818
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Priority: Blocker
>
> In the network thread, we need a timer configure to take 
> MAX_POLL_INTERVAL_MAX.  The reason is if the user don't poll the consumer 
> within the internal, the member needs to leave the group.
>  
> Currently, we send an acknowledgement event to the network thread per poll.  
> It needs to do two things 1. update autocommit state 2. update max poll 
> interval timer 
>  
> The current logic looks like this:
> {code:java}
>  if (heartbeat.pollTimeoutExpired(now)) {
> // the poll timeout has expired, which means that the foreground thread 
> has stalled
> // in between calls to poll().
> log.warn("consumer poll timeout has expired. This means the time between 
> subsequent calls to poll() " +
> "was longer than the configured max.poll.interval.ms, which typically 
> implies that " +
> "the poll loop is spending too much time processing messages. You can 
> address this " +
> "either by increasing max.poll.interval.ms or by reducing the maximum 
> size of batches " +
> "returned in poll() with max.poll.records.");
> maybeLeaveGroup("consumer poll timeout has expired.");
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1809239029

   I don't have a good place to put this comment (github only lets me comment 
on changed lines) but there is a problem with this code:
   
   ```
 private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: 
Boolean) extends ShutdownableThread(name) {
   override def doWork(): Unit = {
 val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
 if (haltBrokerOnDirFailure) {
   fatal(s"Halting broker because dir $newOfflineLogDir is offline")
   Exit.halt(1)
 }
 handleLogDirFailure(newOfflineLogDir)
   }
 }
   ```
   
   If the directory that failed is the metadata directory, we need to exit 
unconditionally. This is because we have not implemented any way of failing 
over to a different directory for metadata.
   
   I suppose we should have a post-3.7 follow-up JIRA for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15818) Implement max poll internval

2023-11-13 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15818:
--

 Summary: Implement max poll internval
 Key: KAFKA-15818
 URL: https://issues.apache.org/jira/browse/KAFKA-15818
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee


In the network thread, we need a timer configure to take MAX_POLL_INTERVAL_MAX. 
 The reason is if the user don't poll the consumer within the internal, the 
member needs to leave the group.

 

Currently, we send an acknowledgement event to the network thread per poll.  It 
needs to do two things 1. update autocommit state 2. update max poll interval 
timer 

 

The current logic looks like this:
{code:java}
 if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has 
stalled
// in between calls to poll().
log.warn("consumer poll timeout has expired. This means the time between 
subsequent calls to poll() " +
"was longer than the configured max.poll.interval.ms, which typically 
implies that " +
"the poll loop is spending too much time processing messages. You can 
address this " +
"either by increasing max.poll.interval.ms or by reducing the maximum 
size of batches " +
"returned in poll() with max.poll.records.");

maybeLeaveGroup("consumer poll timeout has expired.");
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391771537


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2323,15 +2326,56 @@ class ReplicaManager(val config: KafkaConfig,
 }
 logManager.handleLogDirFailure(dir)
 
-if (sendZkNotification)
+if (notifyController) {
+  if (config.migrationEnabled) {
+fatal(s"Shutdown broker because some log directory has failed during 
migration mode: $dir")
+Exit.halt(1)

Review Comment:
   This seems wrong as our long-term solution, but I guess it's OK for now. (We 
can discuss more later I guess)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391770284


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -269,7 +270,9 @@ class ReplicaManager(val config: KafkaConfig,
  delayedRemoteFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
  threadNamePrefix: Option[String] = None,
  val brokerEpochSupplier: () => Long = () => -1,
- addPartitionsToTxnManager: 
Option[AddPartitionsToTxnManager] = None
+ addPartitionsToTxnManager: 
Option[AddPartitionsToTxnManager] = None,
+ assignmentManager: Option[AssignmentsManager] = None,

Review Comment:
   I know this is a common pattern in `ReplicaManager` but it just seems so 
bad. We don't really care about the details of `AssignmentsManager` or 
`BrokerLifecycleManager` here. Shouldn't we just be passing a reference to an 
interface like `AssignmentHandler`?
   
   ```
   interface AssignmentHandler {
 void onAssignment(TopicIdPartition, Uuid);
 void propagateDirectoryFailure(Uuid directoryId);
   }
   ```
   
   etc.
   
   Then we can initialize a dummy version by default, to keep all the unit 
tests working without changes (if desired).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


cmccabe commented on code in PR #14369:
URL: https://github.com/apache/kafka/pull/14369#discussion_r1391763417


##
clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java:
##
@@ -27,6 +27,8 @@
 
 public class AssignReplicasToDirsRequest extends AbstractRequest {
 
+public static final int MAX_ASSIGNMENTS_PER_REQUEST = 2250;

Review Comment:
   Can you add JavaDoc about this? Including the part where we want to keep it 
under 64kb.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


AndrewJSchofield commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1391738283


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if a bad state transition is detected, an
+ * {@link 

[jira] [Created] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available

2023-11-13 Thread Bob Barrett (Jira)
Bob Barrett created KAFKA-15817:
---

 Summary: Avoid reconnecting to the same IP address if multiple 
addresses are available
 Key: KAFKA-15817
 URL: https://issues.apache.org/jira/browse/KAFKA-15817
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.3.2
Reporter: Bob Barrett


In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS 
resolution behavior for clients to re-resolve DNS after disconnecting from a 
broker, rather than wait until we iterated over all addresses from a given 
resolution. This is useful when the IP addresses have changed between the 
connection and disconnection.

However, with the behavior change, this does mean that clients could 
potentially reconnect immediately to the same IP they just disconnected from, 
if the IPs have not changed. In cases where the disconnection happened because 
that IP was unhealthy (such as a case where a load balancer has instances in 
multiple availability zones and one zone is unhealthy, or a case where an 
intermediate component in the network path is going through a rolling restart), 
this will delay the client successfully reconnecting. To address this, clients 
should remember the IP they just disconnected from and skip that IP when 
reconnecting, as long as the address resolved to multiple addresses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available

2023-11-13 Thread Bob Barrett (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Barrett reassigned KAFKA-15817:
---

Assignee: Bob Barrett

> Avoid reconnecting to the same IP address if multiple addresses are available
> -
>
> Key: KAFKA-15817
> URL: https://issues.apache.org/jira/browse/KAFKA-15817
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>Reporter: Bob Barrett
>Assignee: Bob Barrett
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS 
> resolution behavior for clients to re-resolve DNS after disconnecting from a 
> broker, rather than wait until we iterated over all addresses from a given 
> resolution. This is useful when the IP addresses have changed between the 
> connection and disconnection.
> However, with the behavior change, this does mean that clients could 
> potentially reconnect immediately to the same IP they just disconnected from, 
> if the IPs have not changed. In cases where the disconnection happened 
> because that IP was unhealthy (such as a case where a load balancer has 
> instances in multiple availability zones and one zone is unhealthy, or a case 
> where an intermediate component in the network path is going through a 
> rolling restart), this will delay the client successfully reconnecting. To 
> address this, clients should remember the IP they just disconnected from and 
> skip that IP when reconnecting, as long as the address resolved to multiple 
> addresses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-13 Thread via GitHub


philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1809158196

   Another point I want to make here is that the wakeup call also wakes-up the 
blocking client. I wonder if we also need to do that to the network thread - 
@kirktrue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-13 Thread via GitHub


philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1809152957

   Hi @cadonna - Thank you for putting time into this PR.  Based on my 
understanding this PR does 2 things: if wakeup() is invoked before calling 
poll(), the consumer will return immediately.  If wakeup() is invoked during 
poll(), we should get a wakeupException and return.  Overall I think it looks 
right.  
   
   *while writing this I think @kirktrue has asked the questions I wanted to 
ask.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on PR #14724:
URL: https://github.com/apache/kafka/pull/14724#issuecomment-1809112163

   > Hi @apoorvmittal10 - I left some questions while making the first pass of 
the PR. They are all aesthetics. Will follow up with more questions. Thanks!
   
   Thanks @philipnee , I have addressed the comments. Can you please re-review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1391701379


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1391700892


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {
+case INVALID_REQUEST:
+case INVALID_RECORD: {
+pushIntervalMs = Integer.MAX_VALUE;
+reason = "The broker response indicates the client sent an 
request that cannot be resolved"
++ " by re-trying, hence disable telemetry";
+break;
+}
+case UNKNOWN_SUBSCRIPTION_ID:
+case UNSUPPORTED_COMPRESSION_TYPE: {
+pushIntervalMs = 0;
+reason = Errors.forCode(errorCode).message();
+break;
+}
+case TELEMETRY_TOO_LARGE:
+case THROTTLING_QUOTA_EXCEEDED: {
+reason = Errors.forCode(errorCode).message();
+pushIntervalMs = (intervalMs != -1) ? intervalMs : 
DEFAULT_PUSH_INTERVAL_MS;
+break;
+}
+default: {
+reason = "Unwrapped error code";
+log.error("Error code: {}, reason: {}. Unmapped error for 
telemetry, disable telemetry.",

Review Comment:
   Thanks for pointing out, I have tried to improve this. Please check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1391699676


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {

Review Comment:
   Make sense, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1391699120


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1391698780


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1391698419


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1391698213


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {
+case INVALID_REQUEST:
+case INVALID_RECORD: {
+pushIntervalMs = Integer.MAX_VALUE;
+reason = "The broker response indicates the client sent an 
request that cannot be resolved"
++ " by re-trying, hence disable telemetry";
+break;
+}
+case UNKNOWN_SUBSCRIPTION_ID:
+case UNSUPPORTED_COMPRESSION_TYPE: {
+pushIntervalMs = 0;
+reason = Errors.forCode(errorCode).message();
+break;
+}
+case TELEMETRY_TOO_LARGE:
+case THROTTLING_QUOTA_EXCEEDED: {
+reason = Errors.forCode(errorCode).message();
+pushIntervalMs = (intervalMs != -1) ? intervalMs : 
DEFAULT_PUSH_INTERVAL_MS;
+break;
+}
+default: {
+reason = "Unwrapped error code";
+log.error("Error code: {}, reason: {}. Unmapped error for 
telemetry, disable telemetry.",
+errorCode, Errors.forCode(errorCode).message());
+pushIntervalMs = Integer.MAX_VALUE;
+}
+}
+
+log.debug("Error code: {}, reason: {}. Retry automatically in {} ms.", 
errorCode, reason, pushIntervalMs);
+return Optional.of(pushIntervalMs);
+}
+
+public static Predicate 
getSelectorFromRequestedMetrics(List requestedMetrics) {
+if (requestedMetrics == null || requestedMetrics.isEmpty()) {
+log.debug("Telemetry subscription has specified no metric names; 
telemetry will record no metrics");
+return SELECTOR_NO_METRICS;
+} else if (requestedMetrics.size() == 1 && requestedMetrics.get(0) != 
null && requestedMetrics.get(0).isEmpty()) {
+log.debug("Telemetry subscription has specified a single empty 
metric name; using all metrics");
+return SELECTOR_ALL_METRICS;
+} else {
+log.debug("Telemetry subscription has specified 

[jira] [Resolved] (KAFKA-15532) ZkWriteBehindLag should not be reported by inactive controllers

2023-11-13 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-15532.
--
Resolution: Fixed

> ZkWriteBehindLag should not be reported by inactive controllers
> ---
>
> Key: KAFKA-15532
> URL: https://issues.apache.org/jira/browse/KAFKA-15532
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
>
> Since only the active controller is performing the dual-write to ZK during a 
> migration, it should be the only controller to report the ZkWriteBehindLag 
> metric. 
>  
> Currently, if the controller fails over during a migration, the previous 
> active controller will incorrectly report its last value for ZkWriteBehindLag 
> forever. Instead, it should report zero.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15532) ZkWriteBehindLag should not be reported by inactive controllers

2023-11-13 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-15532:


Assignee: David Arthur

> ZkWriteBehindLag should not be reported by inactive controllers
> ---
>
> Key: KAFKA-15532
> URL: https://issues.apache.org/jira/browse/KAFKA-15532
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
>
> Since only the active controller is performing the dual-write to ZK during a 
> migration, it should be the only controller to report the ZkWriteBehindLag 
> metric. 
>  
> Currently, if the controller fails over during a migration, the previous 
> active controller will incorrectly report its last value for ZkWriteBehindLag 
> forever. Instead, it should report zero.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1809013650

   > @apoorvmittal10 : Thanks for the PR. Made a pass of non-testing files. 
Left a few comments.
   
   Thanks a lot for the review @junrao . I have addressed the comments and have 
a question related to throttleTimeMs for errors in the comments. Please if you 
can re-review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391660088


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final List RECEIVERS = new 
ArrayList<>();

Review Comment:
   Done.



##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391659830


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391659127


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391658586


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391658438


##
core/src/main/java/kafka/metrics/ClientMetricsInstance.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Contains the metrics instance metadata and the state of the client instance.
+ */
+public class ClientMetricsInstance {
+
+private final Uuid clientInstanceId;
+private final ClientMetricsInstanceMetadata instanceMetadata;
+private final int subscriptionId;
+private final long subscriptionUpdateEpoch;
+private final Set metrics;
+private final int pushIntervalMs;
+
+private boolean terminating;
+private long lastGetRequestEpoch;
+private long lastPushRequestEpoch;
+private Errors lastKnownError;

Review Comment:
   I have marked 2 of them volatile and other 2 are protected now with 
`synchronized`. I have added additional concurrency tests in to validate the 
behaviour that a single request can process when multiple are received for same 
client instance, others get rejected by throttling error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391656533


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;

Review Comment:
   Make sense, thanks. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391656238


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391654569


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,12 +16,48 @@
  */
 package kafka.server;
 
+import java.util.Collections;

Review Comment:
   The reason I kept this at `kafka.server` package as I see all managers (in 
scala) processing API calls from KafkaApis.scala resides `kafka.server` 
package. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391652781


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -71,6 +73,31 @@ public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse createResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+responseData.setThrottleTimeMs(throttleTimeMs);
+return new PushTelemetryResponse(responseData);
+}
+
+public String getMetricsContentType() {

Review Comment:
   Missed these, thanks. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391651232


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391650483


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));

Review Comment:
   I have asked the same question in other comment but shouldn't we set that as 
API call goes through `sendMaybeThrottle` in KafkaApis which throttles the 
request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391648051


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   > Do we need to make this an LRU cache? If a client is terminated or idle, 
we should remove them from the cache
   
   I have started with LRU cache and planning to improve this with cache which 
timebounds the connection. The KIP says: `client instance specific state is 
maintained in broker memory up to MAX(60*1000, PushIntervalMs * 3) 
milliseconds`. I ll add improvement on the cache to respect that: 
https://issues.apache.org/jira/browse/KAFKA-15813



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15813) Improve implementation of client instance cache

2023-11-13 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-15813:
--
Summary: Improve implementation of client instance cache  (was: Improve 
implementation of client instnce cache)

> Improve implementation of client instance cache
> ---
>
> Key: KAFKA-15813
> URL: https://issues.apache.org/jira/browse/KAFKA-15813
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>
> In the current implementation the ClientMetricsManager uses LRU cache but we 
> should alos support expiring stale clients i.e. client which haven't reported 
> metrics for a while.
>  
> The KIP mentions: This client instance specific state is maintained in broker 
> memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds and is used to 
> enforce the push interval rate-limiting. There is no persistence of client 
> instance metrics state across broker restarts or between brokers 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391640685


##
core/src/main/java/kafka/metrics/ClientMetricsInstanceMetadata.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import java.util.regex.Pattern;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.requests.RequestContext;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+public class ClientMetricsInstanceMetadata {
+
+private final Map attributesMap;
+
+public ClientMetricsInstanceMetadata(Uuid clientInstanceId, RequestContext 
requestContext) {
+Objects.requireNonNull(clientInstanceId);
+Objects.requireNonNull(requestContext);
+
+attributesMap = new HashMap<>();
+
+attributesMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+attributesMap.put(ClientMetricsConfigs.CLIENT_ID, 
requestContext.clientId());
+attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, 
requestContext.clientInformation != null ?
+requestContext.clientInformation.softwareName() : null);
+attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, 
requestContext.clientInformation != null ?
+requestContext.clientInformation.softwareVersion() : null);
+attributesMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, 
requestContext.clientAddress != null ?
+requestContext.clientAddress.getHostAddress() : null);
+// KIP-714 mentions client source port should be the client 
connection's source port from the
+// broker's point of view. But the broker does not have this 
information rather the port could be
+// the broker's port where the client connection is established. We 
might want to consider removing
+// the client source port from the KIP or use broker port if that can 
be helpful.
+// TODO: fix port

Review Comment:
   Thanks a lot @junrao this is helpful. I will make the changes in subsequent 
PR to address this. I have created following jira for same: 
https://issues.apache.org/jira/browse/KAFKA-15811



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391639061


##
core/src/main/java/kafka/metrics/ClientMetricsConfigs.java:
##
@@ -80,6 +82,11 @@ public class ClientMetricsConfigs {
 public static final String CLIENT_SOURCE_ADDRESS = "client_source_address";
 public static final String CLIENT_SOURCE_PORT = "client_source_port";
 
+// Empty string in client-metrics resource configs indicates that all the 
metrics are subscribed.
+public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "\"\"";

Review Comment:
   The KIP mentions that the response in subscriptions should be just empty 
string i.e. `""`. But to create `client-metrics` resource the ConfigDef parses 
`""` as no data hence to specify empty string through `kafka-configs.sh` we 
need to pass them as empty string enclosed in string i.e. `"\"\""`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391636101


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -71,6 +73,31 @@ public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse createResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+responseData.setThrottleTimeMs(throttleTimeMs);
+return new PushTelemetryResponse(responseData);
+}
+
+public String getMetricsContentType() {
+// Future versions of PushTelemetryRequest and 
GetTelemetrySubscriptionsRequest may include a content-type
+// field to allow for updated OTLP format versions (or additional 
formats), but this field is currently not
+// included since only one format is specified in the current proposal 
of the kip-714
+return OTLP_CONTENT_TYPE;
+}
+
+public ByteBuffer getMetricsData() {
+CompressionType cType = 
CompressionType.forId(this.data.compressionType());
+return (cType == CompressionType.NONE) ?
+ByteBuffer.wrap(this.data.metrics()) : 
decompressMetricsData(cType, this.data.metrics());
+}
+
+private static ByteBuffer decompressMetricsData(CompressionType 
compressionType, byte[] metrics) {
+// TODO: Add support for decompression of metrics data

Review Comment:
   Yes, I have created jira in parent KIP-714 task to address this: 
https://issues.apache.org/jira/browse/KAFKA-15807. I am planning to get 
end-to-end metrics flow without compression first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391634085


##
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java:
##
@@ -71,6 +73,31 @@ public PushTelemetryRequestData data() {
 return data;
 }
 
+public PushTelemetryResponse createResponse(int throttleTimeMs, Errors 
errors) {
+PushTelemetryResponseData responseData = new 
PushTelemetryResponseData();
+responseData.setErrorCode(errors.code());
+responseData.setThrottleTimeMs(throttleTimeMs);

Review Comment:
   > Could we redirect getErrorResponse to here and rename it to errorResponse?
   
   Done
   
   > If error code is not ThrottlingQuotaExceededException, we should ignore 
throttleTimeMs.
   
   It might be naive but sorry I didn't understand as why throttleTimeMs should 
not be passed in response for other exceptions. Isn't all requests goes through 
common throttling code where requests might be throttled for sometime based on 
the throughput?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15756: Migrate existing integration tests to run old protocol in new coordinator [kafka]

2023-11-13 Thread via GitHub


dongnuo123 commented on code in PR #14675:
URL: https://github.com/apache/kafka/pull/14675#discussion_r1391629976


##
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##
@@ -29,7 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
+  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
   def testListConsumerGroups(quorum: String): Unit = {

Review Comment:
   It doesn't work for kraft+kip848. If changing 
GroupCoordinatorService#ListGroups with the same way of adding the futures as 
describeGroups, this test passes. Not sure why



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-13 Thread via GitHub


kirktrue commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1808930992

   > re-triggered a build as the last one did not look good.
   
   I only just now updated my branch to include the revert commits, so I 
wouldn't necessarily expect that test run to successfully pass. I have started 
yet another build with the previous reverts and will keep an eye on the results.
   
   Thanks @dajac!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets

2023-11-13 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15816:

Description: 
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: https://github.com/apache/kafka/pull/14750
 * KafkaConsumerTest
 * KafkaProducerTest
 * ConfigResourceTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest

Core:
 * DescribeAuthorizedOperationsTest
 * SslGssapiSslEndToEndAuthorizationTest
 * SaslMultiMechanismConsumerTest
 * SaslPlaintextConsumerTest
 * SaslSslAdminIntegrationTest
 * SaslSslConsumerTest
 * MultipleListenersWithDefaultJaasContextTest
 * DescribeClusterRequestTest

Trogdor:
 * AgentTest

These can be addressed by just fixing the tests.

  was:
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients:
 * KafkaConsumerTest
 * KafkaProducerTest
 * ConfigResourceTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest

Core:
 * DescribeAuthorizedOperationsTest
 * SslGssapiSslEndToEndAuthorizationTest
 * SaslMultiMechanismConsumerTest
 * SaslPlaintextConsumerTest
 * SaslSslAdminIntegrationTest
 * SaslSslConsumerTest
 * MultipleListenersWithDefaultJaasContextTest
 * DescribeClusterRequestTest

Trogdor:
 * AgentTest

These can be addressed by just fixing the tests.


> Typos in tests leak network sockets
> ---
>
> Key: KAFKA-15816
> URL: https://issues.apache.org/jira/browse/KAFKA-15816
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> There are a few tests which leak network sockets due to small typos in the 
> tests themselves.
> Clients: https://github.com/apache/kafka/pull/14750
>  * KafkaConsumerTest
>  * KafkaProducerTest
>  * ConfigResourceTest
>  * SelectorTest
>  * SslTransportLayerTest
>  * SslTransportTls12Tls13Test
>  * SslVersionsTransportLayerTest
> Core:
>  * DescribeAuthorizedOperationsTest
>  * SslGssapiSslEndToEndAuthorizationTest
>  * SaslMultiMechanismConsumerTest
>  * SaslPlaintextConsumerTest
>  * SaslSslAdminIntegrationTest
>  * SaslSslConsumerTest
>  * MultipleListenersWithDefaultJaasContextTest
>  * DescribeClusterRequestTest
> Trogdor:
>  * AgentTest
> These can be addressed by just fixing the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15816: Fix leaked sockets in clients tests [kafka]

2023-11-13 Thread via GitHub


gharris1727 opened a new pull request, #14750:
URL: https://github.com/apache/kafka/pull/14750

   These tests leak network sockets unnecessarily, and should instead properly 
close the resources they instantiate.
   
   I found these via some tests which are not merge-able, as they rely on 
reflection and JDK-version-specific mechanisms to operate.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15816) Typos in tests leak network sockets

2023-11-13 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15816:
---

 Summary: Typos in tests leak network sockets
 Key: KAFKA-15816
 URL: https://issues.apache.org/jira/browse/KAFKA-15816
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Affects Versions: 3.6.0
Reporter: Greg Harris
Assignee: Greg Harris


There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients:
 * KafkaConsumerTest
 * KafkaProducerTest
 * ConfigResourceTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest

Core:
 * DescribeAuthorizedOperationsTest
 * SslGssapiSslEndToEndAuthorizationTest
 * SaslMultiMechanismConsumerTest
 * SaslPlaintextConsumerTest
 * SaslSslAdminIntegrationTest
 * SaslSslConsumerTest
 * MultipleListenersWithDefaultJaasContextTest
 * DescribeClusterRequestTest

Trogdor:
 * AgentTest

These can be addressed by just fixing the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KIP-1001: Add CurrentControllerId metric [kafka]

2023-11-13 Thread via GitHub


cmccabe opened a new pull request, #14749:
URL: https://github.com/apache/kafka/pull/14749

   As discussed on KIP-1001 (not yet approved)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15815) JsonRestServer leaks sockets via HttpURLConnection when keep-alive enabled

2023-11-13 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15815:
---

 Summary: JsonRestServer leaks sockets via HttpURLConnection when 
keep-alive enabled
 Key: KAFKA-15815
 URL: https://issues.apache.org/jira/browse/KAFKA-15815
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.0
Reporter: Greg Harris


By default HttpURLConnection has keep-alive enabled, which allows a single 
HttpURLConnection to be left open in order to be re-used for later requests. 
This means that despite JsonRestServer calling `close()` on the relevant 
InputStream, and calling `disconnect()` on the connection itself, the 
HttpURLConnection does not call `close()` on the underlying socket.

This affects the Trogdor AgentTest and CoordinatorTest suites, where most of 
the methods make HTTP requests using the JsonRestServer. The effect is that ~32 
sockets are leaked per test run, all remaining in the CLOSE_WAIT state (half 
closed) after the test. This is because the JettyServer has correctly closed 
the connections, but the HttpURLConnection has not.

There does not appear to be a way to locally override the HttpURLConnection's 
behavior in this case, and only disabling keep-alive overall (via the system 
property `http.keepAlive=false`) seems to resolve the socket leaks.

To prevent the leaks, we can move JsonRestServer to an alternative HTTP 
implementation, perhaps the jetty-client that Connect uses, or disable 
keepAlive during tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391496243


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import java.util.Collections;
+import java.util.regex.Pattern;
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.CRC32;
+
+/**
+ * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
+ */
+public class ClientMetricsManager implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
+private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+
+public static ClientMetricsManager instance() {
+return INSTANCE;
+}
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;

Review Comment:
   The challenge is that the connections can vary depending on the type of 
broker host. Larger instance typically can accommodate more connections. Do we 
need to make this an LRU cache? If a client is terminated or idle, we should 
remove them from the cache. Otherwise, we probably should just rely on the 
existing `max.connections `to control the client connections?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-13 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1391490591


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));
+

[PR] KAFKA-15814 Patched the SASL/GSSAPI server construction [kafka]

2023-11-13 Thread via GitHub


piotrsmolinski opened a new pull request, #14748:
URL: https://github.com/apache/kafka/pull/14748

   The patch fixes how the SASL/GSSAPI server is created in Kafka brokers. 
   As described in the JIRA ticket, Kafka forces client JAAS configuration for 
both client and server side.
   This patch removes the constraint and allows to accept authentication 
addressed at different hosts.
   If the listener is used for the inter-broker communication, it must stay in 
the current form.
   
   Assume that Kafka cluster is accessible using `CLIENT` listener that each 
broker is accessible at `b-N.kafka.sample.home.arpa:9092` (N is broker id). For 
convenience the bootstrap uses load balanced endpoint 
`kafka.sample.home.arpa:9092`  that distributes the traffic to the active 
brokers.
   
   After this patch it is possible to configure the listener's JAAS 
configuration to use wildcard principal:
   
   ```
   
listener.name.client.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
 required
 useKeyTab=true
 storeKey=true
 keyTab="/etc/kafka/security/kafka.keytab"
 principal="*"
 isInitiator=false
   ; 
   ```
   
   The goal of this patch is to enable wildcard principal and acceptor-only 
mode (isInitiator=false).
   
   The keytab must contain entries for SPNs on all acceptable service FQDNs, 
i.e. on broker 0 we must have entries for both 
`kafka/kafka.sample.home.arpa@DOMAIN` and 
`kafka/b-0.kafka.sample.home.arpa@DOMAIN`.
   
   The patch does not change behavior of the existing configurations.
   
   For this patch the only reasonable testing strategy is manual one. That's 
the reason that even if the solution was already known 3 years ago, the code 
has not been submitted.
   
   Any ideas for automated testing of this change are warmly welcome.
   
   The comparison for various ways to configure SASL:
   
   
https://github.com/piotrsmolinski/kerberos-sandbox/blob/main/src/test/java/io/confluent/sandbox/kerberos/KerberosTest.java
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15481) Concurrency bug in RemoteIndexCache leads to IOException

2023-11-13 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785597#comment-17785597
 ] 

Divij Vaidya commented on KAFKA-15481:
--

Yes, it has lots of merge conflicts because of another functionality 
(dynamically change cache size) that was added to 3.7 and not to 3.6.1. I plan 
to manually resolve those merge conflicts in next 2 days and merge into 3.6

> Concurrency bug in RemoteIndexCache leads to IOException
> 
>
> Key: KAFKA-15481
> URL: https://issues.apache.org/jira/browse/KAFKA-15481
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Assignee: Jeel Jotaniya
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> RemoteIndexCache has a concurrency bug which leads to IOException while 
> fetching data from remote tier.
> Below events in order of timeline -
> Thread 1 (cache thread): invalidates the entry, removalListener is invoked 
> async, so the files have not been renamed to "deleted" suffix yet.
> Thread 2: (fetch thread): tries to find entry in cache, doesn't find it 
> because it has been removed by 1, fetches the entry from S3, writes it to 
> existing file (using replace existing)
> Thread 1: async removalListener is invoked, acquires a lock on old entry 
> (which has been removed from cache), it renames the file to "deleted" and 
> starts deleting it
> Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file 
> and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM 
> returns an error as it won't allow creation of 2GB random access file.
> *Potential Fix*
> Use EvictionListener instead of RemovalListener in Caffeine cache as per the 
> documentation:
> {quote} When the operation must be performed synchronously with eviction, use 
> {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will 
> only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit 
> removal, {{Cache.asMap()}} offers compute methods that are performed 
> atomically.{quote}
> This will ensure that removal from cache and marking the file with delete 
> suffix is synchronously done, hence the above race condition will not occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]

2023-11-13 Thread via GitHub


divijvaidya commented on code in PR #14727:
URL: https://github.com/apache/kafka/pull/14727#discussion_r1391297279


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -988,30 +997,33 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
 return;
 }
 RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
-if (segmentsToDelete.contains(metadata)) {
-continue;
-}
-// When the log-start-offset is moved by the user, the 
leader-epoch-checkpoint file gets truncated
-// as per the log-start-offset. Until the 
rlm-cleaner-thread runs in the next iteration, those
-// remote log segments won't be removed. The 
`isRemoteSegmentWithinLeaderEpoch` validates whether
-// the epochs present in the segment lies in the 
checkpoint file. It will always return false
-// since the checkpoint file was already truncated.
-boolean shouldDeleteSegment = 
remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
+
+if 
(SEGMENT_DELETION_VALID_STATES.contains(metadata.state())) {

Review Comment:
   since there is no "else" part to it, perhaps short circuiting the if 
condition with 
   ```
   if (!SEGMENT_DELETION_VALID_STATES.contains(metadata.state())) {
  continue;
   }
   ```
   would provide more readability. 
   
   (I don't have a strong opinion on this, hence, let me know if you wish to 
keep it as as it)



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
 private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
+private static final Set 
SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of(

Review Comment:
   In this diagram: 
https://github.com/apache/kafka/blob/49d3122d425171b6a59a2b6f02d3fe63d3ac2397/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L74
 copy_started -> delete_started is also a valid transition. Although, I can't 
think of a scenario where this would be valid because we always complete copy 
before calling expiration. Are we missing something here by not including 
copy_started?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix ClusterConnectionStatesTest.testSingleIP [kafka]

2023-11-13 Thread via GitHub


jolshan commented on PR #14741:
URL: https://github.com/apache/kafka/pull/14741#issuecomment-1808642447

   Shall we backport this to 3.6 as well? Maybe not a huge deal, but I see it 
causing some issues there too. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] cherrypick KAFKA-15780: Wait for consistent KRaft metadata when creating or deleting topics [kafka]

2023-11-13 Thread via GitHub


jolshan merged PR #14713:
URL: https://github.com/apache/kafka/pull/14713


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-13 Thread via GitHub


dajac commented on PR #14670:
URL: https://github.com/apache/kafka/pull/14670#issuecomment-1808636945

   re-triggered a build as the last one did not look good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-13 Thread via GitHub


pprovenzano commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1808630427

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15814) SASL Kerberos authentication cannot be used with load balanced bootstrap

2023-11-13 Thread Piotr Smolinski (Jira)
Piotr Smolinski created KAFKA-15814:
---

 Summary: SASL Kerberos authentication cannot be used with load 
balanced bootstrap
 Key: KAFKA-15814
 URL: https://issues.apache.org/jira/browse/KAFKA-15814
 Project: Kafka
  Issue Type: Bug
  Components: core, security
Affects Versions: 3.6.0
Reporter: Piotr Smolinski


Actually it is a very old problem still unresolved. When access to Kafka is 
done over load balanced bootstrap (like in Kubernetes, or when the number of 
brokers is inpractical to enlist them in the bootstrap, or when we want to give 
a single access address), the broker endpoint can be accessed using at least 
two addresses: one for connection bootstrap (load balanced) and another one for 
broker connection (direct). The problem is that Kafka Kerberos configuration 
forces JAAS to use only one SPN (like kafka/b-0.kafka@DOMAIN). In weaker 
algorithms (like RC4) the same keytab entry can be used for multiple server 
names. The problem arises when we use stronger algorithms (like AES128 or 
AES256), the SPN is used to compute the messages and keytab entries for 
kafka/b-0.kafka@DOMAIN and kafka/kafka@DOMAIN are not compatible.

JAAS configuration for Kerberos can be specified in two ways depending whether 
we are using it for service client or server:
{code:java}
com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/etc/kafka/security/kafka.keytab"
  principal="kafka/node-0.kafka.home.arpa@LOCALDOMAIN"
; {code}
{code:java}
com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/etc/kafka/security/kafka.keytab"
  principal="*"
  isInitiator=false
; {code}
While the former one can be used on both sides, it forces only one principal to 
be selected from the keytab. The latter form cannot be used on the client side, 
but it dynamically selects the correct SPN based on the client request.

Kafka Kerberos implementation does not distinguish between client and server 
property. In particular the same JAAS configuration entry is used when the 
broker uses Kerberos for inter-broker communication.

Even if the listener property in the broker is known to be not used, the code 
currently does not allow to specify wildcard principal.

Some time ago I have created a patch that solves the problem preserving the 
current semantics, but I did not have time to describe the submission. This 
ticket is a tracker for the Pull Request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15481) Concurrency bug in RemoteIndexCache leads to IOException

2023-11-13 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785578#comment-17785578
 ] 

Mickael Maison commented on KAFKA-15481:


[~divijvaidya] Is there an issue that's preventing the backport to 3.6? 

> Concurrency bug in RemoteIndexCache leads to IOException
> 
>
> Key: KAFKA-15481
> URL: https://issues.apache.org/jira/browse/KAFKA-15481
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Assignee: Jeel Jotaniya
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> RemoteIndexCache has a concurrency bug which leads to IOException while 
> fetching data from remote tier.
> Below events in order of timeline -
> Thread 1 (cache thread): invalidates the entry, removalListener is invoked 
> async, so the files have not been renamed to "deleted" suffix yet.
> Thread 2: (fetch thread): tries to find entry in cache, doesn't find it 
> because it has been removed by 1, fetches the entry from S3, writes it to 
> existing file (using replace existing)
> Thread 1: async removalListener is invoked, acquires a lock on old entry 
> (which has been removed from cache), it renames the file to "deleted" and 
> starts deleting it
> Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file 
> and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM 
> returns an error as it won't allow creation of 2GB random access file.
> *Potential Fix*
> Use EvictionListener instead of RemovalListener in Caffeine cache as per the 
> documentation:
> {quote} When the operation must be performed synchronously with eviction, use 
> {{Caffeine.evictionListener(RemovalListener)}} instead. This listener will 
> only be notified when {{RemovalCause.wasEvicted()}} is true. For an explicit 
> removal, {{Cache.asMap()}} offers compute methods that are performed 
> atomically.{quote}
> This will ensure that removal from cache and marking the file with delete 
> suffix is synchronously done, hence the above race condition will not occur.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]

2023-11-13 Thread via GitHub


agavra commented on PR #14708:
URL: https://github.com/apache/kafka/pull/14708#issuecomment-1808460098

   Ah same! Apologies, I made the same mistake. Will remember next time to look 
over the individual JDKs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane

2023-11-13 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785540#comment-17785540
 ] 

Proven Provenzano commented on KAFKA-15513:
---

SCRAM between controllers will need work as we need to solve the problem of how 
does a controller access the metadata topic before it is part of the controller 
quorum. SASL doesn't have this problem because both the all the authentication 
info is in the config and not part of the metadata.

> KRaft cluster fails with SCRAM authentication enabled for control-plane
> ---
>
> Key: KAFKA-15513
> URL: https://issues.apache.org/jira/browse/KAFKA-15513
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0, 3.5.1
>Reporter: migruiz4
>Priority: Major
>
> We have observed a scenario where a KRaft cluster fails to bootstrap when 
> using SCRAM authentication for controller-to-controller communications.
> The steps to reproduce are simple:
>  * Deploy (at least) 2 Kafka servers using latest version 3.5.1.
>  * Configure a KRaft cluster, where the controller listener uses 
> SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the 
> recommended in-line jaas config 
> '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}'
>  * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create 
> the SCRAM user.
> When initialized, Controllers will fail to connect to each other with an 
> authentication error:
>  
> {code:java}
> [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: 
> Failed to send the following request due to authentication error: 
> ClientRequest(expectResponse=true, 
> callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075,
>  destination=0, correlationId=129, clientId=raft-client-1, 
> createdTimeMs=1690888364960, 
> requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', 
> topics=[TopicData(topicName='__cluster_metadata', 
> partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, 
> lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code}
> Some additional details about the scenario that we tested out:
>  *  Controller listener does work when configured with SASL+PLAIN
>  * The issue only affects the Controller listener, SCRAM users created using 
> the same method work for data-plane listeners and inter-broker listeners.
>  
> Below you can find the exact configuration and command used to deploy:
>  * server.properties
> {code:java}
> listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093
> advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091
> listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/bitnami/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.retention.check.interval.ms=30
> controller.listener.names=CONTROLLER
> controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093
> inter.broker.listener.name=INTERNAL
> node.id=0
> process.roles=controller,broker
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
> sasl.mechanism.controller.protocol=SCRAM-SHA-512
> listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512
> listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="controller_user" password="controller_password";{code}
>  * kafka-storage.sh command
> {code:java}
> kafka-storage.sh format --config /path/to/server.properties 
> --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram 
> SCRAM-SHA-512=[name=controller_user,password=controller_password] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KIP-988 [kafka]

2023-11-13 Thread via GitHub


eduwercamacaro commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391293607


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
+@SuppressWarnings("ClassFanOutComplexity")

Review Comment:
   We are open for suggestion to solve this :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15420) Kafka Tiered Storage V1

2023-11-13 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15420:
-
Affects Version/s: 3.6.0

> Kafka Tiered Storage V1
> ---
>
> Key: KAFKA-15420
> URL: https://issues.apache.org/jira/browse/KAFKA-15420
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft

2023-11-13 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785526#comment-17785526
 ] 

Ismael Juma commented on KAFKA-15605:
-

The PR was merged, can this be closed [~davidarthur] ?

> Topics marked for deletion in ZK are incorrectly migrated to KRaft
> --
>
> Key: KAFKA-15605
> URL: https://issues.apache.org/jira/browse/KAFKA-15605
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.6.1
>
>
> When migrating topics from ZooKeeper, the KRaft controller reads all the 
> topic and partition metadata from ZK directly. This includes topics which 
> have been marked for deletion by the ZK controller. After being migrated to 
> KRaft, the pending topic deletions are never completed, so it is as if the 
> delete topic request never happened.
> Since the client request to delete these topics has already been returned as 
> successful, it would be confusing to the client that the topic still existed. 
> An operator or application would need to issue another topic deletion to 
> remove these topics once the controller had moved to KRaft. If they tried to 
> create a new topic with the same name, they would receive a 
> TOPIC_ALREADY_EXISTS error.
> The migration logic should carry over pending topic deletions and resolve 
> them either as part of the migration or shortly after.
> *Note to operators:*
> To determine if a migration was affected by this, an operator can check the 
> contents of {{/admin/delete_topics}} after the KRaft controller has migrated 
> the metadata. If any topics are listed under this ZNode, they were not 
> deleted and will still be present in KRaft. At this point the operator can 
> make a determination if the topics should be re-deleted (using 
> "kafka-topics.sh --delete") or left in place. In either case, the topics 
> should be removed from {{/admin/delete_topics}} to prevent unexpected topic 
> deletion in the event of a fallback to ZK.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15552) Duplicate Producer ID blocks during ZK migration

2023-11-13 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785522#comment-17785522
 ] 

Ismael Juma commented on KAFKA-15552:
-

[~showuon] Can this be closed then?

> Duplicate Producer ID blocks during ZK migration
> 
>
> Key: KAFKA-15552
> URL: https://issues.apache.org/jira/browse/KAFKA-15552
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.2, 3.6.1
>
>
> When migrating producer ID blocks from ZK to KRaft, we are taking the current 
> producer ID block from ZK and writing it's "firstProducerId" into the 
> producer IDs KRaft record. However, in KRaft we store the _next_ producer ID 
> block in the log rather than storing the current block like ZK does. The end 
> result is that the first block given to a caller of AllocateProducerIds is a 
> duplicate of the last block allocated in ZK mode.
>  
> This can result in duplicate producer IDs being given to transactional or 
> idempotent producers. In the case of transactional producers, this can cause 
> long term problems since the producer IDs are persisted and reused for a long 
> time.
> The time between the last producer ID block being allocated by the ZK 
> controller and all the brokers being restarted following the metadata 
> migration is when this bug is possible.
>  
> Symptoms of this bug will include ReplicaManager OutOfOrderSequenceException 
> and possibly some producer epoch validation errors. To see if a cluster is 
> affected by this bug, search for the offending producer ID and see if it is 
> being used by more than one producer.
>  
> For example, the following error was observed
> {code}
> Out of order sequence number for producer 376000 at offset 381338 in 
> partition REDACTED: 0 (incoming seq. number), 21 (current end sequence 
> number) 
> {code}
> Then searching for "376000" on 
> org.apache.kafka.clients.producer.internals.TransactionManager logs, two 
> brokers both show the same producer ID being provisioned
> {code}
> Broker 0 [Producer clientId=REDACTED-0] ProducerId set to 376000 with epoch 1
> Broker 5 [Producer clientId=REDACTED-1] ProducerId set to 376000 with epoch 1
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Revert "Revert "KAFKA-15661: KIP-951: Server side changes (#14444)" [kafka]

2023-11-13 Thread via GitHub


chb2ab opened a new pull request, #14747:
URL: https://github.com/apache/kafka/pull/14747

   Investigating 
`org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest` failure
   
   This reverts commit a98bd7d65fb5a3a188ff524db7619dc7fe4257fa.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13627) Topology changes shouldn't require a full reset of local state

2023-11-13 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785518#comment-17785518
 ] 

Nicholas Telford commented on KAFKA-13627:
--

Hi,

I've temporarily parked KIP-816 to focus on KIP-892, but if you'd like to take 
it over, I'm happy for you to do so.

My team have an implementation of "Option B" in production, which appears to 
work well and has proved very reliable. It's written in Kotlin, but should be 
trivial to translate to Java if you would like to use it as the basis of an 
implementation. You can see it here: 
[https://gist.github.com/nicktelford/15cc596a25de33a673bb5bd4c81edd0f]

When I explored Options A and C, I found many difficulties, owing to places 
that either depended on the TaskId being present in the state directory path, 
or that depended on the format of the TaskId, so I would highly recommend 
pursuing Option B, since it's easy to implement, reliable, and the logic can be 
isolated from the rest of Kafka Streams, making it easy to maintain.

> Topology changes shouldn't require a full reset of local state
> --
>
> Key: KAFKA-13627
> URL: https://issues.apache.org/jira/browse/KAFKA-13627
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nicholas Telford
>Priority: Major
>
> [KIP-816|https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset]
> When changes are made to a Topology that modifies its structure, users must 
> use the Application Reset tool to reset the local state of their application 
> prior to deploying the change. Consequently, these changes require rebuilding 
> all local state stores from their changelog topics in Kafka.
> The time and cost of rebuilding state stores is determined by the size of the 
> state stores, and their recent write history, as rebuilding a store entails 
> replaying all recent writes to the store. For applications that have very 
> large stores, or stores with extremely high write-rates, the time and cost of 
> rebuilding all state in the application can be prohibitively expensive. This 
> is a significant barrier to building highly scalable applications with good 
> availability.
> Changes to the Topology that do not directly affect a state store should not 
> require the local state of that store to be reset/deleted. This would allow 
> applications to scale to very large data sets, whilst permitting the 
> application behaviour to evolve over time.
> h1. Background
> Tasks in a Kafka Streams Topology are logically grouped by “Topic Group'' 
> (aka. Subtopology). Topic Groups are assigned an ordinal (number), based on 
> their position in the Topology. This Topic Group ordinal is used as the 
> prefix for all Task IDs: {{{}_{}}}, 
> e.g. {{2_14}}
> If new Topic Groups are added, old Topic Groups are removed, or existing 
> Topic Groups are re-arranged, this can cause the assignment of ordinals to 
> change {_}even for Topic Groups that have not been modified{_}.
> When the assignment of ordinals to Topic Groups changes, existing Tasks are 
> invalidated, as they no longer correspond to the correct Topic Groups. Local 
> state is located in directories that include the Task ID (e.g. 
> {{{}/state/dir/2_14/mystore/rocksdb/…{}}}), and since the Tasks have all been 
> invalidated, all existing local state directories are also invalid.
> Attempting to start an application that has undergone these ordinal changes, 
> without first clearing the local state, will cause Kafka Streams to attempt 
> to use the existing local state for the wrong Tasks. Kafka Streams detects 
> this discrepancy and prevents the application from starting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >