[jira] [Commented] (KAFKA-15802) Trying to access uncopied segments metadata on listOffsets

2023-11-09 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784705#comment-17784705
 ] 

Luke Chen commented on KAFKA-15802:
---

+1 to go with option 1 first. 

> Trying to access uncopied segments metadata on listOffsets
> --
>
> Key: KAFKA-15802
> URL: https://issues.apache.org/jira/browse/KAFKA-15802
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Priority: Major
>
> We have a tiered storage cluster running with Aiven s3 plugin. 
> On our cluster, we have a process doing regular listOffsets requests. 
> This triggers the following exception:
> {code:java}
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> Requested remote resource was not found
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$6(RemoteIndexCache.java:355)
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:318)
> Nov 09, 2023 1:42:01 PM com.github.benmanes.caffeine.cache.LocalAsyncCache 
> lambda$handleCompletion$7
> WARNING: Exception thrown during asynchronous load
> java.util.concurrent.CompletionException: 
> io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:107)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at io.aiven.kafka.tieredstorage.storage.s3.S3Storage.fetch(S3Storage.java:80)
> at 
> io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.lambda$new$1(SegmentManifestProvider.java:59)
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:103)
> ... 7 more
> Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The 
> specified key does not exist. (Service: S3, Status Code: 404, Request ID: 
> CFMP27PVC9V2NNEM, Extended Request ID: 
> F5qqlV06qQJ5qCuWl91oueBaha0QLMBURJudnOnFDQk+YbgFcAg70JBATcARDxN44DGo+PpfZHAsum+ioYMoOw==)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:52)
> at 
> 

[jira] [Commented] (KAFKA-15802) Trying to access uncopied segments metadata on listOffsets

2023-11-09 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784698#comment-17784698
 ] 

Satish Duggana commented on KAFKA-15802:


[~jeqo] [~divijvaidya] +1 to go with filtering the targeted segments that 
indicate the segment is available for now. This is the existing  approach in 
2.8.x internal tiered storage implementation branches.  
We can discuss later whether a general filtering API is required for some of 
the methods or a very specific API is needed. 


> Trying to access uncopied segments metadata on listOffsets
> --
>
> Key: KAFKA-15802
> URL: https://issues.apache.org/jira/browse/KAFKA-15802
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Priority: Major
>
> We have a tiered storage cluster running with Aiven s3 plugin. 
> On our cluster, we have a process doing regular listOffsets requests. 
> This triggers the following exception:
> {code:java}
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> Requested remote resource was not found
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$6(RemoteIndexCache.java:355)
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:318)
> Nov 09, 2023 1:42:01 PM com.github.benmanes.caffeine.cache.LocalAsyncCache 
> lambda$handleCompletion$7
> WARNING: Exception thrown during asynchronous load
> java.util.concurrent.CompletionException: 
> io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:107)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at io.aiven.kafka.tieredstorage.storage.s3.S3Storage.fetch(S3Storage.java:80)
> at 
> io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.lambda$new$1(SegmentManifestProvider.java:59)
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:103)
> ... 7 more
> Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The 
> specified key does not exist. (Service: S3, Status Code: 404, Request ID: 
> CFMP27PVC9V2NNEM, Extended Request ID: 
> F5qqlV06qQJ5qCuWl91oueBaha0QLMBURJudnOnFDQk+YbgFcAg70JBATcARDxN44DGo+PpfZHAsum+ioYMoOw==)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
> at 
> 

[jira] [Updated] (KAFKA-15802) Trying to access uncopied segments metadata on listOffsets

2023-11-09 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15802:
---
Attachment: (was: screenshot-1.png)

> Trying to access uncopied segments metadata on listOffsets
> --
>
> Key: KAFKA-15802
> URL: https://issues.apache.org/jira/browse/KAFKA-15802
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Priority: Major
>
> We have a tiered storage cluster running with Aiven s3 plugin. 
> On our cluster, we have a process doing regular listOffsets requests. 
> This triggers the following exception:
> {code:java}
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> Requested remote resource was not found
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$6(RemoteIndexCache.java:355)
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:318)
> Nov 09, 2023 1:42:01 PM com.github.benmanes.caffeine.cache.LocalAsyncCache 
> lambda$handleCompletion$7
> WARNING: Exception thrown during asynchronous load
> java.util.concurrent.CompletionException: 
> io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:107)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at io.aiven.kafka.tieredstorage.storage.s3.S3Storage.fetch(S3Storage.java:80)
> at 
> io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.lambda$new$1(SegmentManifestProvider.java:59)
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:103)
> ... 7 more
> Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The 
> specified key does not exist. (Service: S3, Status Code: 404, Request ID: 
> CFMP27PVC9V2NNEM, Extended Request ID: 
> F5qqlV06qQJ5qCuWl91oueBaha0QLMBURJudnOnFDQk+YbgFcAg70JBATcARDxN44DGo+PpfZHAsum+ioYMoOw==)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:52)
> at 
> 

[jira] [Updated] (KAFKA-15802) Trying to access uncopied segments metadata on listOffsets

2023-11-09 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15802:
---
Attachment: screenshot-1.png

> Trying to access uncopied segments metadata on listOffsets
> --
>
> Key: KAFKA-15802
> URL: https://issues.apache.org/jira/browse/KAFKA-15802
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Priority: Major
> Attachments: screenshot-1.png
>
>
> We have a tiered storage cluster running with Aiven s3 plugin. 
> On our cluster, we have a process doing regular listOffsets requests. 
> This triggers the following exception:
> {code:java}
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> Requested remote resource was not found
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$6(RemoteIndexCache.java:355)
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:318)
> Nov 09, 2023 1:42:01 PM com.github.benmanes.caffeine.cache.LocalAsyncCache 
> lambda$handleCompletion$7
> WARNING: Exception thrown during asynchronous load
> java.util.concurrent.CompletionException: 
> io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:107)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at io.aiven.kafka.tieredstorage.storage.s3.S3Storage.fetch(S3Storage.java:80)
> at 
> io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.lambda$new$1(SegmentManifestProvider.java:59)
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:103)
> ... 7 more
> Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The 
> specified key does not exist. (Service: S3, Status Code: 404, Request ID: 
> CFMP27PVC9V2NNEM, Extended Request ID: 
> F5qqlV06qQJ5qCuWl91oueBaha0QLMBURJudnOnFDQk+YbgFcAg70JBATcARDxN44DGo+PpfZHAsum+ioYMoOw==)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:52)
> at 
> 

Re: [PR] Add junit properties to display parameterized test names [kafka]

2023-11-09 Thread via GitHub


alok123t commented on PR #14687:
URL: https://github.com/apache/kafka/pull/14687#issuecomment-1805185143

   @divijvaidya I think it looks reasonable JUnit 4 test - 
[ConsistencyVectorIntegrationTest](
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14687/5/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/)
 running on CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-09 Thread via GitHub


artemlivshits commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1388948636


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   To be honest, if we implement proper concurrency granularity for groups 
(serialize group updates [not whole partition], keep read "lock" on groups 
during commit updates) I'm not sure if we'd get much extra perf gain from 
piercing the appendRecords abstraction to implement pipelining.  Then we could 
get rid of the timeline snapshot structure and hooking into replication 
pipeline to listen for HWM updates; we could just do appendRecords and wait for 
completion.  Then we could completely decouple group coordinator logic from the 
storage stack and make it simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: [WIP] debugging [kafka]

2023-11-09 Thread via GitHub


showuon opened a new pull request, #14730:
URL: https://github.com/apache/kafka/pull/14730

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-10840) Need way to catch auth issues in poll method of Java Kafka client

2023-11-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-10840:
---
Labels: authentication client  (was: )

> Need way to catch auth issues in poll method of Java Kafka client
> -
>
> Key: KAFKA-10840
> URL: https://issues.apache.org/jira/browse/KAFKA-10840
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Devin G. Bost
>Priority: Blocker
>  Labels: authentication, client
>
> We recently implemented SSL authentication at our company, and when certs 
> expire, the Kafka client poll method silently fails without throwing any kind 
> of exception. This is a problem because the data flow could stop at any time 
> (due to certificate expiration) without us being able to handle it. The auth 
> issue shows up in Kafka broker logs, but we don't see any indication on the 
> client-side that there was an auth issue. As a consequence, the auth failure 
> happens 10 times a second forever. 
> We need a way to know on the client-side if an auth issue is blocking the 
> connection to Kafka so we can handle the exception and refresh the certs 
> (keystore/truststore) when the certs expire. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10840) Need way to catch auth issues in poll method of Java Kafka client

2023-11-09 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784681#comment-17784681
 ] 

Philip Nee commented on KAFKA-10840:


Hey [~devin.bost] Are you still seeing this issue? I recently ran in to a 
similar issue in Kafka Connect that the connector didn't catch the failed 
authentication causing the task running for days on "Failed authentication with 
"

> Need way to catch auth issues in poll method of Java Kafka client
> -
>
> Key: KAFKA-10840
> URL: https://issues.apache.org/jira/browse/KAFKA-10840
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Devin G. Bost
>Priority: Blocker
>
> We recently implemented SSL authentication at our company, and when certs 
> expire, the Kafka client poll method silently fails without throwing any kind 
> of exception. This is a problem because the data flow could stop at any time 
> (due to certificate expiration) without us being able to handle it. The auth 
> issue shows up in Kafka broker logs, but we don't see any indication on the 
> client-side that there was an auth issue. As a consequence, the auth failure 
> happens 10 times a second forever. 
> We need a way to know on the client-side if an auth issue is blocking the 
> connection to Kafka so we can handle the exception and refresh the certs 
> (keystore/truststore) when the certs expire. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] cherrypick KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-09 Thread via GitHub


jolshan merged PR #14712:
URL: https://github.com/apache/kafka/pull/14712


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] cherrypick KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #14712:
URL: https://github.com/apache/kafka/pull/14712#issuecomment-1805099279

   I saw all those selector tests failing on trunk in a different PR build. I 
don't know how I get so lucky with these  
   
   Test failures look ok now so i will go ahead and merge. Maybe tomorrow I can 
try to figure out the Selector issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: Server side changes [kafka]

2023-11-09 Thread via GitHub


jolshan merged PR #1:
URL: https://github.com/apache/kafka/pull/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: Server side changes [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #1:
URL: https://github.com/apache/kafka/pull/1#issuecomment-1805088928

   I've seen all these failures on other builds (both 3.6 and trunk) today and 
yesterday.
   I've commented on the applicable JIRAs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15552 Fix Producer ID ZK migration [kafka]

2023-11-09 Thread via GitHub


showuon commented on PR #14506:
URL: https://github.com/apache/kafka/pull/14506#issuecomment-1805023823

   I had a look, and think this flaky test should be the infra's issue, not our 
code. Like you said, in 3.6 branch, after I backported this PR, it started to 
fail (on Nov. 9). Now, checking trunk build, it also failed on Nov. 9 and 10. 
   
   
https://ge.apache.org/scans/tests?search.names=git%20branch=kafka=1699587001012=169608960=Asia%2FTaipei=trunk=org.apache.kafka.clients.ClusterConnectionStatesTest#
   
   Back to the test code, it failed at this line: 
   `assertEquals(1, ClientUtils.resolve("localhost", 
singleIPHostResolver).size());`
   which basically our kafka logic won't impact this host resolver results. 
   
   So, I think we can keep monitoring it for now, and hope it can be 
self-healing. If not, we raise a issue to infra team, maybe? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Disable Tests causing exit issues during the builds [kafka]

2023-11-09 Thread via GitHub


github-actions[bot] commented on PR #14191:
URL: https://github.com/apache/kafka/pull/14191#issuecomment-1805022042

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-09 Thread via GitHub


apoorvmittal10 commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1804996531

   > @apoorvmittal10 : Thanks for the updated PR. LGTM
   > 
   > JDK 17 and Scala 2.13 didn't finish. Could you trigger another test run? 
This can typically be done by closing the PR, waiting for 20 secs and reopening 
it.
   
   Thanks @junrao, I have merged upstream/trunk branch to trigger the build for 
now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]

2023-11-09 Thread via GitHub


mjsax commented on code in PR #14596:
URL: https://github.com/apache/kafka/pull/14596#discussion_r1388813779


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##
@@ -90,9 +96,7 @@ public void beforeTest() throws InterruptedException {
 configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class.getName());
 configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class.getName());
-kafkaStreams = new KafkaStreams(builder.build(), configs);
-kafkaStreams.start();
-Thread.sleep(2000);
+kafkaStreams = IntegrationTestUtils.getStartedStreams(configs, 
builder, true);

Review Comment:
   Using this helper is actually not strictly necessary, but seems "cleaner".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]

2023-11-09 Thread via GitHub


mjsax commented on code in PR #14596:
URL: https://github.com/apache/kafka/pull/14596#discussion_r1388813585


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##
@@ -75,13 +79,15 @@ public static void before() throws Exception {
 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
 producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
-final KafkaProducer producer = new 
KafkaProducer<>(producerProps);
-producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMP_OLD, RECORD_KEY, RECORD_VALUE_OLD)).get();
-producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMP_NEW, RECORD_KEY, RECORD_VALUE_NEW)).get();
+try (final KafkaProducer producer = new 
KafkaProducer<>(producerProps)) {
+producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMP_OLD, RECORD_KEY, RECORD_VALUE_OLD)).get();
+producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMP_NEW, RECORD_KEY, RECORD_VALUE_NEW)).get();
+}
+INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 1);

Review Comment:
   We write 2 input records, thus we want to query at position (ie, offset) 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]

2023-11-09 Thread via GitHub


mjsax commented on code in PR #14596:
URL: https://github.com/apache/kafka/pull/14596#discussion_r1388813361


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##
@@ -75,13 +79,15 @@ public static void before() throws Exception {
 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
 producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
-final KafkaProducer producer = new 
KafkaProducer<>(producerProps);
-producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMP_OLD, RECORD_KEY, RECORD_VALUE_OLD)).get();
-producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, 
RECORD_TIMESTAMP_NEW, RECORD_KEY, RECORD_VALUE_NEW)).get();
+try (final KafkaProducer producer = new 
KafkaProducer<>(producerProps)) {

Review Comment:
   Side cleanup to close the producer properly using `try-with-recourse` 
construct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]

2023-11-09 Thread via GitHub


ableegoldman commented on code in PR #14708:
URL: https://github.com/apache/kafka/pull/14708#discussion_r1388793799


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingStoreMaterializer.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+
+public class SlidingStoreMaterializer extends 
MaterializedStoreFactory> {

Review Comment:
   nit: I actually did mean to leave the `Window` part of the name for this 
class, because sliding windows are a subset of the WindowStore -- whereas 
session windows are for a different store type, ie `SessionStore`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]

2023-11-09 Thread via GitHub


ableegoldman commented on code in PR #14708:
URL: https://github.com/apache/kafka/pull/14708#discussion_r1388791856


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowStoreMaterializer.java:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+
+public class TimeWindowStoreMaterializer extends 
MaterializedStoreFactory> {
+
+private final Windows windows;
+private final EmitStrategy emitStrategy;
+
+public TimeWindowStoreMaterializer(
+final MaterializedInternal> 
materialized,
+final Windows windows,
+final EmitStrategy emitStrategy
+) {
+super(materialized);
+this.windows = windows;
+this.emitStrategy = emitStrategy;
+}
+
+@Override
+public StateStore build() {
+WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
+if (supplier == null) {
+final long retentionPeriod = retentionPeriod();
+
+if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
+throw new IllegalArgumentException("The retention period of 
the window store "
++ materialized.storeName() + " must be no smaller than 
its window size plus the grace period."
++ " Got size=[" + windows.size() + "],"
++ " grace=[" + windows.gracePeriodMs() + "],"
++ " retention=[" + retentionPeriod + "]");
+}
+
+switch (defaultStoreType) {
+case IN_MEMORY:
+supplier = Stores.inMemoryWindowStore(
+materialized.storeName(),
+Duration.ofMillis(retentionPeriod),
+Duration.ofMillis(windows.size()),
+false
+);
+break;
+case ROCKS_DB:
+supplier = emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+materialized.storeName(),
+Duration.ofMillis(retentionPeriod),
+Duration.ofMillis(windows.size()),
+false,
+false
+) :
+Stores.persistentTimestampedWindowStore(
+materialized.storeName(),
+Duration.ofMillis(retentionPeriod),
+Duration.ofMillis(windows.size()),
+false
+);
+break;
+default:
+throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
+}
+}
+
+final StoreBuilder> builder = 
Stores.timestampedWindowStoreBuilder(
+supplier,
+materialized.keySerde(),
+materialized.valueSerde()
+);
+
+if (materialized.loggingEnabled()) {
+builder.withLoggingEnabled(materialized.logConfig());
+} else {
+builder.withLoggingDisabled();
+}
+
+// TODO(agavra): remove before merging, should we do what we do with 
other stores
+// and disable caching in the 

Re: [PR] KAFKA-15451: Send offline dirs in broker heartbeat [kafka]

2023-11-09 Thread via GitHub


cmccabe closed pull request #14368: KAFKA-15451: Send offline dirs in broker 
heartbeat
URL: https://github.com/apache/kafka/pull/14368


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #14387:
URL: https://github.com/apache/kafka/pull/14387#issuecomment-1804890304

   Hmm tests are  
   I see a lot of `java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException: CoordinatorMetrics must be set.`
   
   Did we miss this somewhere?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15451: Send offline dirs in broker heartbeat [kafka]

2023-11-09 Thread via GitHub


cmccabe commented on PR #14368:
URL: https://github.com/apache/kafka/pull/14368#issuecomment-1804886740

   I believe this is a duplicate of #14392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]

2023-11-09 Thread via GitHub


gharris1727 opened a new pull request, #14729:
URL: https://github.com/apache/kafka/pull/14729

   The KafkaServer startup() method instantiates a SocketServer, and then waits 
to call enableRequestProcessing. In the intervening time, an exception may be 
thrown which prevents the enableRequestProcessing from ever being called, and 
the KafkaServer skips to calling shutdown() instead.
   
   In this situation, the current SocketServer Acceptor and Processor 
implementations do not close their sockets. This causes the sockets to be 
leaked, potentially interfering with other tests. This change makes calling 
close() without first calling enableRequestProcessing perform the cleanup that 
the acceptor and processor threads would have performed. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15804) Broker leaks ServerSocketChannel when exception is thrown from ZkConfigManager during startup

2023-11-09 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784630#comment-17784630
 ] 

Greg Harris commented on KAFKA-15804:
-

I believe what is happening is:

1. The SocketServer is created
2. The exception is thrown from the dynamicConfigManager
3. SocketServer.enableRequestProcessing is never called, so the SocketServer is 
never started
4. Because the SocketServer is never started, the SocketServer main runnable 
never exits, and so the SocketServerChannel is never closed.

I think that when the SocketServer beginShutdown()/shutdown() is called without 
calling enableRequestProcessing() first, the SocketServer should still close 
these resources.

> Broker leaks ServerSocketChannel when exception is thrown from 
> ZkConfigManager during startup
> -
>
> Key: KAFKA-15804
> URL: https://issues.apache.org/jira/browse/KAFKA-15804
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Minor
>
> This exception is thrown during the 
> RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic
>  test in zk mode:
> {noformat}
> org.apache.kafka.common.config.ConfigException: You have to delete all topics 
> with the property remote.storage.enable=true before disabling tiered storage 
> cluster-wide
> at 
> org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566)
>         at kafka.log.LogManager.updateTopicConfig(LogManager.scala:956)
>         at 
> kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73)
> at 
> kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:360)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166)
> at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115)
> at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:575)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
> at scala.collection.immutable.List.foreach(List.scala:333)
> at 
> kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
> at 
> kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
> at 
> kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
> at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
> at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
> at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
> at 
> kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
> This leak only occurs for this one test in the RemoteTopicCrudTest; all other 
> tests including the kraft-mode version do not exhibit a leaked socket.
> Here is where the ServerSocket is instantiated:
> {noformat}
> at 
> java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113)
>         at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724)
>         at kafka.network.Acceptor.(SocketServer.scala:608)
>         at kafka.network.DataPlaneAcceptor.(SocketServer.scala:454)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249)
>         at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
>         at 
> kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
>         at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
>         at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>         at kafka.network.SocketServer.(SocketServer.scala:175)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:344)
>      

Re: [PR] MINOR; Fix cluster size for migration tests [kafka]

2023-11-09 Thread via GitHub


jsancio merged PR #14726:
URL: https://github.com/apache/kafka/pull/14726


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Fix cluster size for migration tests [kafka]

2023-11-09 Thread via GitHub


jsancio commented on PR #14726:
URL: https://github.com/apache/kafka/pull/14726#issuecomment-1804852674

   Green system tests:
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.14
   session_id:   2023-11-09--001
   run time: 3 minutes 37.922 seconds
   tests run:5
   passed:   5
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_pre_migration_mode_3_4.metadata_quorum=ISOLATED_KRAFT
   status: PASS
   run time:   1 minute 2.460 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_reconcile_kraft_to_zk
   status: PASS
   run time:   1 minute 51.317 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_upgrade_after_3_4_migration
   status: PASS
   run time:   1 minute 1.152 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True
   status: PASS
   run time:   3 minutes 19.610 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False
   status: PASS
   run time:   3 minutes 37.895 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: Server side changes [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #1:
URL: https://github.com/apache/kafka/pull/1#issuecomment-1804848080

   These build failures are out of control. I have to rebuild again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15804) Broker leaks ServerSocketChannel when exception is thrown from ZkConfigManager during startup

2023-11-09 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15804:

Component/s: (was: Tiered-Storage)

> Broker leaks ServerSocketChannel when exception is thrown from 
> ZkConfigManager during startup
> -
>
> Key: KAFKA-15804
> URL: https://issues.apache.org/jira/browse/KAFKA-15804
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Minor
>
> This exception is thrown during the 
> RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic
>  test in zk mode:
> {noformat}
> org.apache.kafka.common.config.ConfigException: You have to delete all topics 
> with the property remote.storage.enable=true before disabling tiered storage 
> cluster-wide
> at 
> org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566)
>         at kafka.log.LogManager.updateTopicConfig(LogManager.scala:956)
>         at 
> kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73)
> at 
> kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:360)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166)
> at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115)
> at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:575)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
> at scala.collection.immutable.List.foreach(List.scala:333)
> at 
> kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
> at 
> kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
> at 
> kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
> at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
> at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
> at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
> at 
> kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
> This leak only occurs for this one test in the RemoteTopicCrudTest; all other 
> tests including the kraft-mode version do not exhibit a leaked socket.
> Here is where the ServerSocket is instantiated:
> {noformat}
> at 
> java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113)
>         at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724)
>         at kafka.network.Acceptor.(SocketServer.scala:608)
>         at kafka.network.DataPlaneAcceptor.(SocketServer.scala:454)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249)
>         at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
>         at 
> kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
>         at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
>         at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>         at kafka.network.SocketServer.(SocketServer.scala:175)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:344)
>         at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
>         at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
>         at 
> kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
>         at 
> 

[jira] [Updated] (KAFKA-15804) Broker leaks ServerSocketChannel when exception is thrown from ZkConfigManager during startup

2023-11-09 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15804:

Description: 
This exception is thrown during the 
RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic
 test in zk mode:
{noformat}
org.apache.kafka.common.config.ConfigException: You have to delete all topics 
with the property remote.storage.enable=true before disabling tiered storage 
cluster-wide
at 
org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566)
        at kafka.log.LogManager.updateTopicConfig(LogManager.scala:956)
        at 
kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73)
at 
kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94)
at 
kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176)
at 
kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:360)
at 
kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175)
at 
kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166)
at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115)
at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166)
at kafka.server.KafkaServer.startup(KafkaServer.scala:575)
at 
kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
at 
kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
at scala.collection.immutable.List.foreach(List.scala:333)
at 
kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
at 
kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
at 
kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
at 
kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
This leak only occurs for this one test in the RemoteTopicCrudTest; all other 
tests including the kraft-mode version do not exhibit a leaked socket.

Here is where the ServerSocket is instantiated:
{noformat}
at 
java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113)
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724)
        at kafka.network.Acceptor.(SocketServer.scala:608)
        at kafka.network.DataPlaneAcceptor.(SocketServer.scala:454)
        at 
kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249)
        at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
        at 
kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
        at kafka.network.SocketServer.(SocketServer.scala:175)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:344)
        at 
kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
        at 
kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
        at scala.collection.immutable.List.foreach(List.scala:333)
        at 
kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
        at 
kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
        at 
kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
        at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
        at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
        at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
        at 
kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
And the associated DataPlaneAcceptor:
{noformat}
         at java.base/java.nio.channels.Selector.open(Selector.java:295)
         at 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388700690


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same 

[jira] [Created] (KAFKA-15804) Broker leaks ServerSocketChannel when exception is thrown from ZkConfigManager during startup

2023-11-09 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15804:
---

 Summary: Broker leaks ServerSocketChannel when exception is thrown 
from ZkConfigManager during startup
 Key: KAFKA-15804
 URL: https://issues.apache.org/jira/browse/KAFKA-15804
 Project: Kafka
  Issue Type: Bug
  Components: core, Tiered-Storage, unit tests
Affects Versions: 3.6.0
Reporter: Greg Harris


This exception is thrown during the 
RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic
 test in zk mode:
{noformat}
org.apache.kafka.common.config.ConfigException: You have to delete all topics 
with the property remote.storage.enable=true before disabling tiered storage 
cluster-wide

org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566)
kafka.log.LogManager.updateTopicConfig(LogManager.scala:956)
kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73)
kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94)
kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176)
kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175)
scala.collection.immutable.Map$Map2.foreach(Map.scala:360)
kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175)
kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166)
scala.collection.immutable.HashMap.foreach(HashMap.scala:1115)
kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166)
kafka.server.KafkaServer.startup(KafkaServer.scala:575)
kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
scala.collection.immutable.List.foreach(List.scala:333)
kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
This leak only occurs for this one test in the RemoteTopicCrudTest; all other 
tests including the kraft-mode version do not exhibit a leaked socket.

Here is where the ServerSocket is instantiated:
{noformat}
at 
java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113)
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724)
        at kafka.network.Acceptor.(SocketServer.scala:608)
        at kafka.network.DataPlaneAcceptor.(SocketServer.scala:454)
        at 
kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249)
        at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
        at 
kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
        at kafka.network.SocketServer.(SocketServer.scala:175)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:344)
        at 
kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
        at 
kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
        at scala.collection.immutable.List.foreach(List.scala:333)
        at 
kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
        at 
kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
        at 
kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
        at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
        at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
        at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
        at 
kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
And the associated DataPlaneAcceptor:
{noformat}
        at java.base/java.nio.channels.Selector.open(Selector.java:295)         
at 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-09 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1388693648


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -0,0 +1,2554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static 

Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]

2023-11-09 Thread via GitHub


ableegoldman commented on code in PR #14708:
URL: https://github.com/apache/kafka/pull/14708#discussion_r1388683082


##
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##
@@ -217,8 +217,19 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 }
 }
 
+public static Materialized.StoreType parse(final String storeType) {

Review Comment:
   I hate that we moved this into a public package and made it a public API 
(even though I'm sure I ok'ed it at the time)  
   
   I don't really have time to do a KIP to move it back (or just delete it 
outright) but we should really do that. Maybe we can file a newbie ticket and 
someone will pick it up..



##
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##
@@ -217,8 +217,19 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 }
 }
 
+public static Materialized.StoreType parse(final String storeType) {

Review Comment:
   I hate that we moved this into a public package and made it a public API 
(even though I'm sure I ok'ed it at the time)  
   
   I don't really have time to do a KIP to move it back (or just delete it 
outright) but we should really do that. Maybe we can file a newbie ticket and 
someone will pick it up..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] cherrypick KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #14712:
URL: https://github.com/apache/kafka/pull/14712#issuecomment-1804821028

   Commented on 
[KAFKA-14806](https://issues.apache.org/jira/browse/KAFKA-14806) which saw the 
same Selector + SocketServer problems.
   
   I also commented on https://github.com/apache/kafka/pull/14506 since the 
failures seem consistent to this branch after merging. 
   
   I will also rerun the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14806) Add connection timeout in PlaintextSender used by SelectorTests

2023-11-09 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784619#comment-17784619
 ] 

Justine Olshan commented on KAFKA-14806:


Had the same sort of failures on my run recently: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/] 
Funny how the SocketServerTests also all fail too.

> Add connection timeout in PlaintextSender used by SelectorTests
> ---
>
> Key: KAFKA-14806
> URL: https://issues.apache.org/jira/browse/KAFKA-14806
> Project: Kafka
>  Issue Type: Test
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Minor
>
> Tests in {{SelectorTest}} can fail due to spurious connection timeouts. One 
> example can be found in [this 
> build|https://github.com/apache/kafka/pull/13378/checks?check_run_id=11970595528]
>  where the client connection the {{PlaintextSender}} tried to open could not 
> be established before the test timed out.
> It may be worth enforcing connection timeout and retries if this can add to 
> the selector tests resiliency. Note that {{PlaintextSender}} is only used by 
> the {{SelectorTest}} so the scope of the change would remain local.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388669816


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -16,22 +16,135 @@
  */
 package org.apache.kafka.common.telemetry.internals;
 
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /**
- * This class represents a metric that does not yet contain resource tags.
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
  * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
  */
 public class SinglePointMetric implements MetricKeyable {
 
 private final MetricKey key;
+private final Metric.Builder metricBuilder;
 
-private SinglePointMetric(MetricKey key) {
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
 this.key = key;
+this.metricBuilder = metricBuilder;
 }
 
 @Override
 public MetricKey key() {
 return key;
 }
 
-// TODO: Implement methods for serializing/deserializing metrics in 
required format.
+public Metric.Builder builder() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+return new SinglePointMetric(metricKey, metric);
+}
+
+/*
+Methods to construct gauge metric type.
+ */
+public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+/*
+Methods to construct sum metric type.
+ */
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+return sum(metricKey, value, monotonic, timestamp, null);
+}
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+if (startTimestamp != null) {
+point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+}
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+}
+
+public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+Instant timestamp, Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value)
+.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+}
+
+/*
+Helper methods to support metric construction.
+ */
+private static SinglePointMetric sum(MetricKey metricKey, 
AggregationTemporality aggregationTemporality,
+boolean monotonic, NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric
+.getSumBuilder()
+.setAggregationTemporality(aggregationTemporality)
+.setIsMonotonic(monotonic)
+.addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static SinglePointMetric gauge(MetricKey metricKey, 
NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric.getGaugeBuilder().addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static NumberDataPoint.Builder point(Instant timestamp, Number 
value) {
+if (value instanceof Long || value instanceof Integer) {
+return point(timestamp, value.longValue());
+} else {

Review Comment:
   oh I see, you mean just the else, sure that's fair



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388668684


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -16,22 +16,135 @@
  */
 package org.apache.kafka.common.telemetry.internals;
 
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /**
- * This class represents a metric that does not yet contain resource tags.
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
  * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
  */
 public class SinglePointMetric implements MetricKeyable {
 
 private final MetricKey key;
+private final Metric.Builder metricBuilder;
 
-private SinglePointMetric(MetricKey key) {
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
 this.key = key;
+this.metricBuilder = metricBuilder;
 }
 
 @Override
 public MetricKey key() {
 return key;
 }
 
-// TODO: Implement methods for serializing/deserializing metrics in 
required format.
+public Metric.Builder builder() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+return new SinglePointMetric(metricKey, metric);
+}
+
+/*
+Methods to construct gauge metric type.
+ */
+public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+/*
+Methods to construct sum metric type.
+ */
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+return sum(metricKey, value, monotonic, timestamp, null);
+}
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+if (startTimestamp != null) {
+point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+}
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+}
+
+public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+Instant timestamp, Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value)
+.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+}
+
+/*
+Helper methods to support metric construction.
+ */
+private static SinglePointMetric sum(MetricKey metricKey, 
AggregationTemporality aggregationTemporality,
+boolean monotonic, NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric
+.getSumBuilder()
+.setAggregationTemporality(aggregationTemporality)
+.setIsMonotonic(monotonic)
+.addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static SinglePointMetric gauge(MetricKey metricKey, 
NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric.getGaugeBuilder().addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static NumberDataPoint.Builder point(Instant timestamp, Number 
value) {
+if (value instanceof Long || value instanceof Integer) {
+return point(timestamp, value.longValue());
+} else {

Review Comment:
   why not? This covers the case where a gauge/counter may be of type Double / 
Float and ensures we preserve the underlying precision instead of truncating 
the values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388668684


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -16,22 +16,135 @@
  */
 package org.apache.kafka.common.telemetry.internals;
 
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /**
- * This class represents a metric that does not yet contain resource tags.
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
  * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
  */
 public class SinglePointMetric implements MetricKeyable {
 
 private final MetricKey key;
+private final Metric.Builder metricBuilder;
 
-private SinglePointMetric(MetricKey key) {
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
 this.key = key;
+this.metricBuilder = metricBuilder;
 }
 
 @Override
 public MetricKey key() {
 return key;
 }
 
-// TODO: Implement methods for serializing/deserializing metrics in 
required format.
+public Metric.Builder builder() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+return new SinglePointMetric(metricKey, metric);
+}
+
+/*
+Methods to construct gauge metric type.
+ */
+public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+/*
+Methods to construct sum metric type.
+ */
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+return sum(metricKey, value, monotonic, timestamp, null);
+}
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+if (startTimestamp != null) {
+point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+}
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+}
+
+public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+Instant timestamp, Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value)
+.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+}
+
+/*
+Helper methods to support metric construction.
+ */
+private static SinglePointMetric sum(MetricKey metricKey, 
AggregationTemporality aggregationTemporality,
+boolean monotonic, NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric
+.getSumBuilder()
+.setAggregationTemporality(aggregationTemporality)
+.setIsMonotonic(monotonic)
+.addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static SinglePointMetric gauge(MetricKey metricKey, 
NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric.getGaugeBuilder().addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static NumberDataPoint.Builder point(Instant timestamp, Number 
value) {
+if (value instanceof Long || value instanceof Integer) {
+return point(timestamp, value.longValue());
+} else {

Review Comment:
   why not? This covers the case where a gauge may be of type Double / Float 
and ensures we preserve the underlying precision instead of truncating the 
values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact 

Re: [PR] KAFKA-15552 Fix Producer ID ZK migration [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #14506:
URL: https://github.com/apache/kafka/pull/14506#issuecomment-1804803577

   Hey I was taking a look at 3.6 builds and it looks like 
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.6/107/testReport/junit/org.apache.kafka.clients/ClusterConnectionStatesTest/Build___JDK_8_and_Scala_2_12___testSingleIP__/
 has been failing a lot more since this change.
   
   
https://ge.apache.org/scans/tests?search.names=git%20branch=P28D=kafka=America%2FLos_Angeles=3.6=org.apache.kafka.clients.ClusterConnectionStatesTest
   
   It has flaked on the build for 3.6 that it was merged, the next build, and 
my PR build targeted to 3.6. Can we make sure this change didn't cause issues?
   
   https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.6/107/
   
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.6/108/#showFailuresLink
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/#showFailuresLink
   
   @showuon @mumrah @cmccabe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388666021


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388666021


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way 

Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-11-09 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1388662985


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals 
of the {@link Consumer} for
+ * various tests.
+ */
+public interface ConsumerDelegate extends Consumer {
+
+String getClientId();
+
+Metrics metricsInternal();

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] cherrypick KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #14712:
URL: https://github.com/apache/kafka/pull/14712#issuecomment-1804796567

   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.SelectorTest.testConnectionsByClientMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/SelectorTest/Build___JDK_17_and_Scala_2_13___testConnectionsByClientMetric___2/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.SelectorTest.testInboundConnectionsCountInConnectionCreationMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/SelectorTest/Build___JDK_17_and_Scala_2_13___testInboundConnectionsCountInConnectionCreationMetric__/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.SelectorTest.testMuteOnOOM()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/SelectorTest/Build___JDK_17_and_Scala_2_13___testMuteOnOOM__/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.SelectorTest.testConnectionsByClientMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/SelectorTest/Build___JDK_17_and_Scala_2_13___testConnectionsByClientMetric__/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.SelectorTest.testInboundConnectionsCountInConnectionCreationMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/SelectorTest/Build___JDK_17_and_Scala_2_13___testInboundConnectionsCountInConnectionCreationMetric___2/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.SelectorTest.testMuteOnOOM()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/SelectorTest/Build___JDK_17_and_Scala_2_13___testMuteOnOOM___2/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls12SelectorTest.testConnectionsByClientMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/Tls12SelectorTest/Build___JDK_17_and_Scala_2_13___testConnectionsByClientMetric__/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls12SelectorTest.testInboundConnectionsCountInConnectionCreationMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/Tls12SelectorTest/Build___JDK_17_and_Scala_2_13___testInboundConnectionsCountInConnectionCreationMetric__/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls12SelectorTest.testMuteOnOOM()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/Tls12SelectorTest/Build___JDK_17_and_Scala_2_13___testMuteOnOOM__/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls12SelectorTest.testConnectionsByClientMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/Tls12SelectorTest/Build___JDK_17_and_Scala_2_13___testConnectionsByClientMetric___2/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls12SelectorTest.testInboundConnectionsCountInConnectionCreationMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/Tls12SelectorTest/Build___JDK_17_and_Scala_2_13___testInboundConnectionsCountInConnectionCreationMetric___2/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls12SelectorTest.testMuteOnOOM()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/Tls12SelectorTest/Build___JDK_17_and_Scala_2_13___testMuteOnOOM___2/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls13SelectorTest.testConnectionsByClientMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/Tls13SelectorTest/Build___JDK_17_and_Scala_2_13___testConnectionsByClientMetric__/)
   *[Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls13SelectorTest.testInboundConnectionsCountInConnectionCreationMetric()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14712/6/testReport/junit/org.apache.kafka.common.network/Tls13SelectorTest/Build___JDK_17_and_Scala_2_13___testInboundConnectionsCountInConnectionCreationMetric__/)
   *[Build / JDK 17 and Scala 2.13 / 

Re: [PR] cherrypick KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #14712:
URL: https://github.com/apache/kafka/pull/14712#issuecomment-1804781724

   I will look at the build. The test failures don't look good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388649353


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388645527


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsProvider.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.resource.v1.Resource;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides specification which are used to collect metrics.
+ */
+public interface MetricsProvider extends Configurable {

Review Comment:
   this class seems unused, it's not clear to me what its purpose is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-09 Thread via GitHub


artemlivshits commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1388644791


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   > This is completely unrelated in my opinion as this is true for both the 
old and the new coordinator.
   
   It's true that it's a problem with the old coordinator, and we should make 
the whatever minimal fixes required for the old coordinator to work (and if it 
happens to work end-to-end, which I think it might, we won't need to fix it), 
but that code is going away and shouldn't define the forward-looking 
architecture.
   
   As we build the new coordinator, we should build it in a way that improves 
forward-looking architecture.  Keeping the right abstraction is good, 
coincidentally it helps with the timelines -- we can use this proposal and use 
the work that already has been done instead of doing new work of bringing 
implementation details into group coordinator.
   
   Moreover, I wonder if we need yet another thread pool to handle group 
coordinator logic, I think it would be good to just re-use the request handler 
threads to run this functionality.  This would avoid thread pools proliferation 
and also reuse various useful improvements that work only on request pool 
threads, e.g. RequestLocal (hopefully we'll make it into a real thread local to 
be used at the point of use instead of passing the argument), various 
observability things, etc.  Here is a PoC that does that using 
NonBlockingSynchronizer and KafkaRequestHandler.wrap 
   
   
https://github.com/apache/kafka/pull/14728/commits/46acf0220434926305b343299d2780a34bf8a7de
   
   The NonBlockingSynchronizer replaces EventAccumulator and 
MultiThreadedEventProcessor (I didn't remove them to keep the change small), it 
has some perf benefits e.g. in uncontended cases, the processing continues 
running on the request thread instead of being rescheduled on the gc thread 
pool.  I can also easily implement read-write synchronization for the 
NonBlockingSynchronizer (so that readers won't block each other out), e.g. to 
implement non-blocking read "lock" on group when committing offsets.
   
   It's not to say I don't like the current code, but it feels like we 
re-building functionality that we already have elsewhere in Kafka and we we 
could re-use the existing building blocks so that the gc focuses on group 
coordination rather than managing thread pools, getting into the details of 
transactional protocol, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388643692


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param  The type of the value.
+ */
+public class LastValueTracker {
+private final ConcurrentMap>> counters = new ConcurrentHashMap<>();
+
+/**
+ * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+ *
+ * @param metricKey the key for which to calculate a getAndSet.
+ * @param now the timestamp for the new value.
+ * @param value the current value.
+ * @return the timestamp of the previous entry and its value. If there
+ * isn't a previous entry, then this method returns {@link 
Optional#empty()}
+ */
+public Optional> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+InstantAndValue instantAndValue = new InstantAndValue<>(now, value);
+AtomicReference> valueOrNull = counters

Review Comment:
   Agree, I think in this case the put without atomic reference should be 
sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388643692


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/LastValueTracker.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A LastValueTracker uses a ConcurrentMap to maintain historic values for a 
given key, and return
+ * a previous value and an Instant for that value.
+ *
+ * @param  The type of the value.
+ */
+public class LastValueTracker {
+private final ConcurrentMap>> counters = new ConcurrentHashMap<>();
+
+/**
+ * Return the last instant/value for the given MetricKey, or 
Optional.empty if there isn't one.
+ *
+ * @param metricKey the key for which to calculate a getAndSet.
+ * @param now the timestamp for the new value.
+ * @param value the current value.
+ * @return the timestamp of the previous entry and its value. If there
+ * isn't a previous entry, then this method returns {@link 
Optional#empty()}
+ */
+public Optional> getAndSet(MetricKey metricKey, Instant 
now, T value) {
+InstantAndValue instantAndValue = new InstantAndValue<>(now, value);
+AtomicReference> valueOrNull = counters

Review Comment:
   Agree, I think in this case the put should be sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-09 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1388623720


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -656,48 +586,43 @@ public void testRestartConnectorAndTasksOnlyTasks() 
throws Exception {
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
+ArgumentCaptor taskStatus = 
ArgumentCaptor.forClass(TaskStatus.class);
+verify(statusBackingStore).put(taskStatus.capture());
+assertEquals(AbstractStatus.State.RESTARTING, 
taskStatus.getValue().state());

Review Comment:
   Can we also assert the task ID here?
   ```java
   assertEquals(taskId, taskStatus.getValue().id());
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-09 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1388627971


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -838,39 +733,32 @@ public void testPutConnectorConfig() throws Exception {
 Map newConnConfig = new HashMap<>(connConfig);
 newConnConfig.put("foo", "bar");
 
-Callback> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-// Callback> putConnectorConfigCb = 
PowerMock.createMock(Callback.class);
+Callback> connectorConfigCb = mock(Callback.class);
 
 // Create
-connector = PowerMock.createMock(BogusSourceConnector.class);
+connector = mock(BogusSourceConnector.class);
 expectAdd(SourceSink.SOURCE);
-Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+Connector connectorMock = mock(SourceConnector.class);
 expectConfigValidation(connectorMock, true, connConfig);
 
 // Should get first config
-connectorConfigCb.onCompletion(null, connConfig);
-EasyMock.expectLastCall();
+doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
 // Update config, which requires stopping and restarting
-worker.stopAndAwaitConnector(CONNECTOR_NAME);
-EasyMock.expectLastCall();
-Capture> capturedConfig = EasyMock.newCapture();
-Capture> onStart = EasyMock.newCapture();
-worker.startConnector(eq(CONNECTOR_NAME), 
EasyMock.capture(capturedConfig), EasyMock.anyObject(),
-eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
-EasyMock.expectLastCall().andAnswer(() -> {
+doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
+final ArgumentCaptor> capturedConfig = 
ArgumentCaptor.forClass(Map.class);
+final ArgumentCaptor> onStart = 
ArgumentCaptor.forClass(Callback.class);
+doAnswer(invocation -> {
 onStart.getValue().onCompletion(null, TargetState.STARTED);
 return true;
-});
+}).when(worker).startConnector(eq(CONNECTOR_NAME), 
capturedConfig.capture(), any(),
+eq(herder), eq(TargetState.STARTED), onStart.capture());
 // Generate same task config, which should result in no additional 
action to restart tasks
-EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
-.andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
+when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, newConnConfig, true)))
+.thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
 
 expectConfigValidation(connectorMock, false, newConnConfig);
-connectorConfigCb.onCompletion(null, newConnConfig);
-EasyMock.expectLastCall();
-EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
-
-PowerMock.replayAll();
+doNothing().when(connectorConfigCb).onCompletion(null, newConnConfig);
+when(worker.getPlugins()).thenReturn(plugins);

Review Comment:
   We can remove this.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -722,89 +646,68 @@ public void testRestartConnectorAndTasksBoth() throws 
Exception {
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
+
+verifyConnectorStatusRestart();
+verify(statusBackingStore).put(taskStatus.capture());

Review Comment:
   Can we also verify the state and task ID? E.g.:
   ```java
   assertEquals(AbstractStatus.State.RESTARTING, taskStatus.getValue().state());
   assertEquals(taskId, taskStatus.getValue().id());
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -656,48 +586,43 @@ public void testRestartConnectorAndTasksOnlyTasks() 
throws Exception {
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
+ArgumentCaptor taskStatus = 
ArgumentCaptor.forClass(TaskStatus.class);
+verify(statusBackingStore).put(taskStatus.capture());
+assertEquals(AbstractStatus.State.RESTARTING, 
taskStatus.getValue().state());

Review Comment:
   Can we also assert the task ID here?
   ``java
   assertEquals(taskId, taskStatus.getValue().id());
   ```
   



##

[PR] POC: run group coordinator state machine on request handler pool [kafka]

2023-11-09 Thread via GitHub


artemlivshits opened a new pull request, #14728:
URL: https://github.com/apache/kafka/pull/14728

   This is a PoC (not tested and probably needs some corner cases
   addressed) for running group coordinator on the request handler pool
   instead of dedicated thread pool.
   
   The main new piece of code is NonBlockingSynchronizer that is an async
   equivalent of locking -- it makes sure that async tasks with a given
   key don't run concurrently.  The actual work is scheduled back on the
   request thread pool using KafkaRequestHandler.wrap primitive that was
   first introduced for KIP-890 work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388624127


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same 

Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-09 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1388621758


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -817,19 +731,11 @@ public void testAccessors() throws Exception {
 Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
 assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
-EasyMock.reset(transformer);
-EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.anyObject()))

Review Comment:
   I agree with @mimaison, we should keep this part. Otherwise, there's no 
reason to issue duplicate calls to `Herder::connectors`, 
`Herder::connectorInfo`, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-11-09 Thread via GitHub


C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1388621193


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -722,89 +646,68 @@ public void testRestartConnectorAndTasksBoth() throws 
Exception {
 FutureCallback restartCallback = new 
FutureCallback<>();
 herder.restartConnectorAndTasks(restartRequest, restartCallback);
 assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-PowerMock.verifyAll();
+
+verifyConnectorStatusRestart();
+verify(statusBackingStore).put(taskStatus.capture());
 }
 
 @Test
 public void testCreateAndStop() throws Exception {
-connector = PowerMock.createMock(BogusSourceConnector.class);
+connector = mock(BogusSourceConnector.class);
 expectAdd(SourceSink.SOURCE);
 
 Map connectorConfig = 
connectorConfig(SourceSink.SOURCE);
-Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+Connector connectorMock = mock(SourceConnector.class);
 expectConfigValidation(connectorMock, true, connectorConfig);
 
-// herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
 expectStop();
 
-statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
-EasyMock.expectLastCall();
-
-statusBackingStore.stop();
-EasyMock.expectLastCall();
-worker.stop();
-EasyMock.expectLastCall();
-
-PowerMock.replayAll();
-
 herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
 Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
 assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
+// herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
 herder.stop();
 assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
-
-PowerMock.verifyAll();
+verify(worker).stop();
+verify(statusBackingStore).stop();
+verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
 }
 
 @Test
 public void testAccessors() throws Exception {
 Map connConfig = connectorConfig(SourceSink.SOURCE);
 System.out.println(connConfig);
 
-Callback> listConnectorsCb = 
PowerMock.createMock(Callback.class);
-Callback connectorInfoCb = 
PowerMock.createMock(Callback.class);
-Callback> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-Callback> taskConfigsCb = 
PowerMock.createMock(Callback.class);
-Callback>> tasksConfigCb = 
PowerMock.createMock(Callback.class);
+Callback> listConnectorsCb = mock(Callback.class);
+Callback connectorInfoCb = mock(Callback.class);
+Callback> connectorConfigCb = mock(Callback.class);
+Callback> taskConfigsCb = mock(Callback.class);
+Callback>> tasksConfigCb = 
mock(Callback.class);
 
 // Check accessors with empty worker
-listConnectorsCb.onCompletion(null, Collections.EMPTY_SET);
-EasyMock.expectLastCall();
-connectorInfoCb.onCompletion(EasyMock.anyObject(), 
EasyMock.isNull());
-EasyMock.expectLastCall();
-
connectorConfigCb.onCompletion(EasyMock.anyObject(), 
EasyMock.isNull());
-EasyMock.expectLastCall();

Review Comment:
   Aren't we missing a corresponding `doNoting().when(...)` line for this 
expectation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388613020


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388613020


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-09 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1387273811


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,12 +16,48 @@
  */
 package kafka.server;
 
+import java.util.Collections;

Review Comment:
   Should this class be in the same package as other client metric related 
classes like `DefaultClientTelemetryPayload`?



##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388606440


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -16,22 +16,135 @@
  */
 package org.apache.kafka.common.telemetry.internals;
 
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /**
- * This class represents a metric that does not yet contain resource tags.
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
  * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
  */
 public class SinglePointMetric implements MetricKeyable {
 
 private final MetricKey key;
+private final Metric.Builder metricBuilder;
 
-private SinglePointMetric(MetricKey key) {
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
 this.key = key;
+this.metricBuilder = metricBuilder;
 }
 
 @Override
 public MetricKey key() {
 return key;
 }
 
-// TODO: Implement methods for serializing/deserializing metrics in 
required format.
+public Metric.Builder builder() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {
+return new SinglePointMetric(metricKey, metric);
+}
+
+/*
+Methods to construct gauge metric type.
+ */
+public static SinglePointMetric gauge(MetricKey metricKey, Number value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+public static SinglePointMetric gauge(MetricKey metricKey, double value, 
Instant timestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+return gauge(metricKey, point);
+}
+
+/*
+Methods to construct sum metric type.
+ */
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp) {
+return sum(metricKey, value, monotonic, timestamp, null);
+}
+
+public static SinglePointMetric sum(MetricKey metricKey, double value, 
boolean monotonic, Instant timestamp,
+Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value);
+if (startTimestamp != null) {
+point.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+}
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE, monotonic, point);
+}
+
+public static SinglePointMetric deltaSum(MetricKey metricKey, double 
value, boolean monotonic,
+Instant timestamp, Instant startTimestamp) {
+NumberDataPoint.Builder point = point(timestamp, value)
+.setStartTimeUnixNano(toTimeUnixNanos(startTimestamp));
+
+return sum(metricKey, 
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, monotonic, point);
+}
+
+/*
+Helper methods to support metric construction.
+ */
+private static SinglePointMetric sum(MetricKey metricKey, 
AggregationTemporality aggregationTemporality,
+boolean monotonic, NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric
+.getSumBuilder()
+.setAggregationTemporality(aggregationTemporality)
+.setIsMonotonic(monotonic)
+.addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static SinglePointMetric gauge(MetricKey metricKey, 
NumberDataPoint.Builder point) {
+point.addAllAttributes(asAttributes(metricKey.tags()));
+
+Metric.Builder metric = Metric.newBuilder().setName(metricKey.name());
+metric.getGaugeBuilder().addDataPoints(point);
+return create(metricKey, metric);
+}
+
+private static NumberDataPoint.Builder point(Instant timestamp, Number 
value) {
+if (value instanceof Long || value instanceof Integer) {
+return point(timestamp, value.longValue());
+} else {

Review Comment:
   else is not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388600305


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388599565


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class represents a telemetry metric that does not yet contain resource 
tags.
+ * These additional resource tags will be added before emitting metrics by the 
telemetry reporter.
+ */
+public class SinglePointMetric implements MetricKeyable {
+
+private final MetricKey key;
+private final Metric.Builder metricBuilder;
+
+private SinglePointMetric(MetricKey key, Metric.Builder metricBuilder) {
+this.key = key;
+this.metricBuilder = metricBuilder;
+}
+
+@Override
+public MetricKey key() {
+return key;
+}
+
+public Metric.Builder builder() {
+return metricBuilder;
+}
+
+public static SinglePointMetric create(MetricKey metricKey, Metric.Builder 
metric) {

Review Comment:
   that's fine we can make the constructor public, it might just be a remnant 
of some refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388591055


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on PR #14724:
URL: https://github.com/apache/kafka/pull/14724#issuecomment-1804683635

   Hi @apoorvmittal10 - I left some questions while making the first pass of 
the PR. They are all aesthetics.  Will follow up with more questions. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15802) Trying to access uncopied segments metadata on listOffsets

2023-11-09 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784606#comment-17784606
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-15802:
--

Hi [~divijvaidya] , see [https://github.com/apache/kafka/pull/14727] for a test 
case and a quick implementation of the option 1.

We may consider option 1 fix for 3.6.1 and work on a KIP to extend RLMM API for 
3.7, what do you think?

> Trying to access uncopied segments metadata on listOffsets
> --
>
> Key: KAFKA-15802
> URL: https://issues.apache.org/jira/browse/KAFKA-15802
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Priority: Major
>
> We have a tiered storage cluster running with Aiven s3 plugin. 
> On our cluster, we have a process doing regular listOffsets requests. 
> This triggers the following exception:
> {code:java}
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> Requested remote resource was not found
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$6(RemoteIndexCache.java:355)
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:318)
> Nov 09, 2023 1:42:01 PM com.github.benmanes.caffeine.cache.LocalAsyncCache 
> lambda$handleCompletion$7
> WARNING: Exception thrown during asynchronous load
> java.util.concurrent.CompletionException: 
> io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:107)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at io.aiven.kafka.tieredstorage.storage.s3.S3Storage.fetch(S3Storage.java:80)
> at 
> io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.lambda$new$1(SegmentManifestProvider.java:59)
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:103)
> ... 7 more
> Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The 
> specified key does not exist. (Service: S3, Status Code: 404, Request ID: 
> CFMP27PVC9V2NNEM, Extended Request ID: 
> F5qqlV06qQJ5qCuWl91oueBaha0QLMBURJudnOnFDQk+YbgFcAg70JBATcARDxN44DGo+PpfZHAsum+ioYMoOw==)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
> at 
> 

Re: [PR] KAFKA-15774: refactor windowed stores to use StoreFactory [kafka]

2023-11-09 Thread via GitHub


agavra commented on code in PR #14708:
URL: https://github.com/apache/kafka/pull/14708#discussion_r1388579633


##
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##
@@ -217,8 +217,19 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 }
 }
 
+public static Materialized.StoreType parse(final String storeType) {

Review Comment:
   Moved to `MaterializedInternal` for it's final resting place  



##
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##
@@ -217,8 +217,19 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 }
 }
 
+public static Materialized.StoreType parse(final String storeType) {

Review Comment:
   Moved to `MaterializedInternal` for it's final resting place  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15802: validate remote segment state before fetching index [kafka]

2023-11-09 Thread via GitHub


jeqo opened a new pull request, #14727:
URL: https://github.com/apache/kafka/pull/14727

   See commits for more details
   
   [[KAFKA-15802](https://issues.apache.org/jira/browse/KAFKA-15802)]
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388576337


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {
+case INVALID_REQUEST:
+case INVALID_RECORD: {
+pushIntervalMs = Integer.MAX_VALUE;
+reason = "The broker response indicates the client sent an 
request that cannot be resolved"
++ " by re-trying, hence disable telemetry";
+break;
+}
+case UNKNOWN_SUBSCRIPTION_ID:
+case UNSUPPORTED_COMPRESSION_TYPE: {
+pushIntervalMs = 0;
+reason = Errors.forCode(errorCode).message();
+break;
+}
+case TELEMETRY_TOO_LARGE:
+case THROTTLING_QUOTA_EXCEEDED: {
+reason = Errors.forCode(errorCode).message();
+pushIntervalMs = (intervalMs != -1) ? intervalMs : 
DEFAULT_PUSH_INTERVAL_MS;
+break;
+}
+default: {
+reason = "Unwrapped error code";
+log.error("Error code: {}, reason: {}. Unmapped error for 
telemetry, disable telemetry.",

Review Comment:
   The next error message: log.debug("Error code: {}, reason: {}. Retry 
automatically in {} ms.", errorCode, reason, pushIntervalMs);
   
   But we aren't actually trying to retry no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388575600


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {
+case INVALID_REQUEST:
+case INVALID_RECORD: {
+pushIntervalMs = Integer.MAX_VALUE;
+reason = "The broker response indicates the client sent an 
request that cannot be resolved"
++ " by re-trying, hence disable telemetry";
+break;
+}
+case UNKNOWN_SUBSCRIPTION_ID:
+case UNSUPPORTED_COMPRESSION_TYPE: {
+pushIntervalMs = 0;
+reason = Errors.forCode(errorCode).message();
+break;
+}
+case TELEMETRY_TOO_LARGE:
+case THROTTLING_QUOTA_EXCEEDED: {
+reason = Errors.forCode(errorCode).message();
+pushIntervalMs = (intervalMs != -1) ? intervalMs : 
DEFAULT_PUSH_INTERVAL_MS;
+break;
+}
+default: {
+reason = "Unwrapped error code";
+log.error("Error code: {}, reason: {}. Unmapped error for 
telemetry, disable telemetry.",

Review Comment:
   Also, is the client going to retry on the unexpected case? Or should it 
throw back to the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388575600


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {
+case INVALID_REQUEST:
+case INVALID_RECORD: {
+pushIntervalMs = Integer.MAX_VALUE;
+reason = "The broker response indicates the client sent an 
request that cannot be resolved"
++ " by re-trying, hence disable telemetry";
+break;
+}
+case UNKNOWN_SUBSCRIPTION_ID:
+case UNSUPPORTED_COMPRESSION_TYPE: {
+pushIntervalMs = 0;
+reason = Errors.forCode(errorCode).message();
+break;
+}
+case TELEMETRY_TOO_LARGE:
+case THROTTLING_QUOTA_EXCEEDED: {
+reason = Errors.forCode(errorCode).message();
+pushIntervalMs = (intervalMs != -1) ? intervalMs : 
DEFAULT_PUSH_INTERVAL_MS;
+break;
+}
+default: {
+reason = "Unwrapped error code";
+log.error("Error code: {}, reason: {}. Unmapped error for 
telemetry, disable telemetry.",

Review Comment:
   Also, is the client going to retry on the unexpected case? Or should it 
throw back to the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388574457


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {
+case INVALID_REQUEST:
+case INVALID_RECORD: {
+pushIntervalMs = Integer.MAX_VALUE;
+reason = "The broker response indicates the client sent an 
request that cannot be resolved"
++ " by re-trying, hence disable telemetry";
+break;
+}
+case UNKNOWN_SUBSCRIPTION_ID:
+case UNSUPPORTED_COMPRESSION_TYPE: {
+pushIntervalMs = 0;
+reason = Errors.forCode(errorCode).message();
+break;
+}
+case TELEMETRY_TOO_LARGE:
+case THROTTLING_QUOTA_EXCEEDED: {
+reason = Errors.forCode(errorCode).message();
+pushIntervalMs = (intervalMs != -1) ? intervalMs : 
DEFAULT_PUSH_INTERVAL_MS;
+break;
+}
+default: {
+reason = "Unwrapped error code";
+log.error("Error code: {}, reason: {}. Unmapped error for 
telemetry, disable telemetry.",

Review Comment:
   Not sure if we need a reason here.  Can we just say Error code: {}. 
Unexpected error ...
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388571702


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {

Review Comment:
   nit about clean up:
   
error = Errors.forCode(errorCode)
   switch(error) {
 reason = error.message();
 break;
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388563259


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388562526


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388550188


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388545752


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388543003


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if 

Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388542056


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java:
##
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.Metric;
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
+import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequest.Builder;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.telemetry.ClientTelemetryState;
+import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.apache.kafka.common.telemetry.internals.MetricsCollector;
+import org.apache.kafka.common.telemetry.internals.MetricsEmitter;
+import org.apache.kafka.common.telemetry.internals.MetricsProvider;
+import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import 
org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+/**
+ * The implementation of the {@link MetricsReporter} for client telemetry 
which manages the life-cycle
+ * of the client telemetry collection process. The client telemetry reporter 
is responsible for
+ * collecting the client telemetry data and sending it to the broker.
+ * 
+ *
+ * The client telemetry reporter is configured with a {@link 
ClientTelemetrySender} which is
+ * responsible for sending the client telemetry data to the broker. The client 
telemetry reporter
+ * will attempt to fetch the telemetry subscription information from the 
broker and send the
+ * telemetry data to the broker based on the subscription information.
+ * 
+ *
+ * The full life-cycle of the metric collection process is defined by a state 
machine in
+ * {@link ClientTelemetryState}. Each state is associated with a different set 
of operations.
+ * For example, the client telemetry reporter will attempt to fetch the 
telemetry subscription
+ * from the broker when in the {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED} state.
+ * If the push operation fails, the client telemetry reporter will attempt to 
re-fetch the
+ * subscription information by setting the state back to {@link 
ClientTelemetryState#SUBSCRIPTION_NEEDED}.
+ * 
+ *
+ * In an unlikely scenario, if 

Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-09 Thread via GitHub


dajac commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1388535147


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   Thanks for looking into this. Here is my take:
   
   > That is correct, it may become a perf problem
   
   I strongly disagree on blocking the event loop. It will not become a perf 
problem. It is one. It is also an anti-pattern.
   
   > Right now it is a functional problem
   
   It is technically not a functional problem, at least not yet, because I 
haven't not implemented the transactional offset commit in the new coordinator. 
;)
   
   > appendRecords has async interface, thus adding async stages under such an 
interface can be done without inspection and understanding all callers (that's 
what an interface is -- any compliant implementation is valid), but doing so 
will break the current logic (so from the proper interface usage perspective it 
is a bug in the caller, which this proposal fixes)
   
   I will change this to not use appendRecords, this will make the contract 
clear.
   
   > now all of a sudden KIP-848 got a new work to do before release, just 
because there is some independent work is going on in transaction area
   
   This is incorrect. We knew about this and we always had an implementation in 
mind which works. I will basically decouple the write in two stages: 1) 
validate/prepare the transaction; and 2) update state and write. As we 
discussed in the other PR, this is also required for the old coordinator to 
work correctly.
   
   > KIP-890 part2 design is still under discussion, the verification protocol 
is likely to change, so any changes in KIP-890 protocol are going to have 
ripple effects on KIP-848
   
   I don't agree with this. As we just saw, we already failed to make it work 
correctly for the existing coordinator so the dependency was already there. 
Again, we can do better, I agree.
   
   > the work needs to be duplicated in group coordinator (and the protocol is 
going slightly different for different client versions) which becomes a likely 
source of bugs
   
   This is completely unrelated in my opinion as this is true for both the old 
and the new coordinator.
   
   Overall, I agree that we could do better but I think that it is not the 
right time to change this. We are already under high time pressure and actually 
changing this in the right way puts even more pressure. We should look for a 
proper solution afterwards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: telemetry reporter and request handling (KIP-714) [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1388532678


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryUtils.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.metrics.v1.MetricsData;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.telemetry.internals.MetricKeyable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.clients.ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS;
+
+public class ClientTelemetryUtils {
+
+private final static Logger log = 
LoggerFactory.getLogger(ClientTelemetryUtils.class);
+
+public final static Predicate SELECTOR_NO_METRICS = 
k -> false;
+
+public final static Predicate SELECTOR_ALL_METRICS 
= k -> true;
+
+/**
+ * Examine the response data and handle different error code accordingly:
+ *
+ * 
+ * Invalid Request: Disable Telemetry
+ * Invalid Record: Disable Telemetry
+ * UnknownSubscription or Unsupported Compression: Retry 
immediately
+ * TelemetryTooLarge or ThrottlingQuotaExceeded: Retry as per next 
interval
+ * 
+ *
+ * @param errorCode response body error code
+ * @param intervalMs current push interval in milliseconds
+ *
+ * @return Optional of push interval in milliseconds
+ */
+public static Optional maybeFetchErrorIntervalMs(short errorCode, 
int intervalMs) {
+if (errorCode == Errors.NONE.code())
+return Optional.empty();
+
+int pushIntervalMs;
+String reason;
+
+switch (Errors.forCode(errorCode)) {
+case INVALID_REQUEST:
+case INVALID_RECORD: {
+pushIntervalMs = Integer.MAX_VALUE;
+reason = "The broker response indicates the client sent an 
request that cannot be resolved"
++ " by re-trying, hence disable telemetry";
+break;
+}
+case UNKNOWN_SUBSCRIPTION_ID:
+case UNSUPPORTED_COMPRESSION_TYPE: {
+pushIntervalMs = 0;
+reason = Errors.forCode(errorCode).message();
+break;
+}
+case TELEMETRY_TOO_LARGE:
+case THROTTLING_QUOTA_EXCEEDED: {
+reason = Errors.forCode(errorCode).message();
+pushIntervalMs = (intervalMs != -1) ? intervalMs : 
DEFAULT_PUSH_INTERVAL_MS;
+break;
+}
+default: {
+reason = "Unwrapped error code";
+log.error("Error code: {}, reason: {}. Unmapped error for 
telemetry, disable telemetry.",
+errorCode, Errors.forCode(errorCode).message());
+pushIntervalMs = Integer.MAX_VALUE;
+}
+}
+
+log.debug("Error code: {}, reason: {}. Retry automatically in {} ms.", 
errorCode, reason, pushIntervalMs);
+return Optional.of(pushIntervalMs);
+}
+
+public static Predicate 
getSelectorFromRequestedMetrics(List requestedMetrics) {
+if (requestedMetrics == null || requestedMetrics.isEmpty()) {
+log.debug("Telemetry subscription has specified no metric names; 
telemetry will record no metrics");
+return SELECTOR_NO_METRICS;
+} else if (requestedMetrics.size() == 1 && requestedMetrics.get(0) != 
null && requestedMetrics.get(0).isEmpty()) {
+log.debug("Telemetry subscription has specified a single empty 
metric name; using all metrics");
+return SELECTOR_ALL_METRICS;
+} else {
+log.debug("Telemetry subscription has specified to 

Re: [PR] MINOR; Fix KRaft metadata version system tests [kafka]

2023-11-09 Thread via GitHub


jsancio merged PR #14722:
URL: https://github.com/apache/kafka/pull/14722


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Fix KRaft metadata version system tests [kafka]

2023-11-09 Thread via GitHub


jsancio commented on PR #14722:
URL: https://github.com/apache/kafka/pull/14722#issuecomment-1804567979

   System tests are green.
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.14
   session_id:   2023-11-09--001
   run time: 10 minutes 45.434 seconds
   tests run:12
   passed:   12
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.3.2.metadata_quorum=ISOLATED_KRAFT
   status: PASS
   run time:   3 minutes 22.675 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_combined_mode_upgrade.from_kafka_version=3.1.2.metadata_quorum=COMBINED_KRAFT
   status: PASS
   run time:   3 minutes 26.013 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.metadata_quorum=ISOLATED_KRAFT
   status: PASS
   run time:   3 minutes 34.273 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.2.3.metadata_quorum=ISOLATED_KRAFT
   status: PASS
   run time:   3 minutes 56.827 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_combined_mode_upgrade.from_kafka_version=3.2.3.metadata_quorum=COMBINED_KRAFT
   status: PASS
   run time:   3 minutes 8.597 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.5.1.metadata_quorum=ISOLATED_KRAFT
   status: PASS
   run time:   3 minutes 17.410 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=dev.metadata_quorum=ISOLATED_KRAFT
   status: PASS
   run time:   3 minutes 13.048 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.4.1.metadata_quorum=ISOLATED_KRAFT
   status: PASS
   run time:   3 minutes 52.438 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_combined_mode_upgrade.from_kafka_version=3.3.2.metadata_quorum=COMBINED_KRAFT
   status: PASS
   run time:   3 minutes 3.473 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_combined_mode_upgrade.from_kafka_version=3.4.1.metadata_quorum=COMBINED_KRAFT
   status: PASS
   run time:   2 minutes 52.462 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_combined_mode_upgrade.from_kafka_version=3.5.1.metadata_quorum=COMBINED_KRAFT
   status: PASS
   run time:   3 minutes 1.467 seconds
   

   test_id:
kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_combined_mode_upgrade.from_kafka_version=dev.metadata_quorum=COMBINED_KRAFT
   status: PASS
   run time:   3 minutes 35.485 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR; Fix cluster size for migration tests [kafka]

2023-11-09 Thread via GitHub


jsancio opened a new pull request, #14726:
URL: https://github.com/apache/kafka/pull/14726

   Use a smaller cluster sizes instead of the default cluster size
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388518531


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388515966


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way 

Re: [PR] KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member under certain error conditions [kafka]

2023-11-09 Thread via GitHub


wolfchimneyrock commented on PR #14635:
URL: https://github.com/apache/kafka/pull/14635#issuecomment-1804548652

   looks like all the jenkins agents ran out of disk space.  After your 
comments, the tests still pass locally for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388504600


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388503750


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a

Review Comment:
   KafkaExporter refers to a class that doesn't exist in this codebase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2023-11-09 Thread via GitHub


artemlivshits commented on code in PR #14705:
URL: https://github.com/apache/kafka/pull/14705#discussion_r1388499287


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -659,9 +660,21 @@ public TopicPartition key() {
  */
 @Override
 public void run() {
+try {
+runAsync().get();

Review Comment:
   > if one client's log append is blocked for additional async check
   
   That is correct, it may become a perf problem, we can measure and see if 
it's worth fixing in practice, we'll have this choice (as well as the choice to 
postpone the fix, if we have time pressure to release).  But it won't be a 
functional problem.  Right now it is a functional problem, which is suboptimal 
in many ways:
   - appendRecords has async interface, thus adding async stages under such an 
interface can be done without inspection and understanding all callers (that's 
what an interface is -- any compliant implementation is valid), but doing so 
will break the current logic (so from the proper interface usage perspective it 
is a bug in the caller, which this proposal fixes)
   - we cannot release new transaction protocol (or new coordinator) without 
implementing new logic, which makes hard dependencies and pushes against 
timelines (now all of a sudden KIP-848 got a new work to do before release, 
just because there is some independent work is going on in transaction area)
   - KIP-890 part2 design is still under discussion, the verification protocol 
is likely to change, so any changes in KIP-890 protocol are going to have 
ripple effects on KIP-848
   - 2 fairly complex components are now tied together -- we cannot just 
innovate on transaction protocol implementation details (or to be broader -- on 
the whole IO subsystem implementation details -- e.g. Async IO) without 
understanding group coordinator implementation detail and we cannot innovate on 
group coordinator implementation detail without understanding implementation 
details of transaction protocol
   - to make the previous point worse, the dependency is not visible at the 
"point of use" -- someone tasked with improving transaction protocol (or IO in 
general) would have no indication from the appendRecords interface, that adding 
an async stage would need to have a corresponding change in group coordinator
   - the work needs to be duplicated in group coordinator (and the protocol is 
going slightly different for different client versions) which becomes a likely 
source of bugs
   
   IMO, the fact that transaction verification implementation just doesn't work 
out-of-box with the new group coordinator (and in fact requires quite 
non-trivial follow-up work that will block the release) is an architectural 
issue.  We should strive to make the system more decoupled, so that the context 
an engineer needs to understand to make local changes in a part of system is 
less.
   
   > Each new group coordinator thread handles requests from multiple groups 
and multiple clients within the same group.
   
   I don't think it's bound to a thread, but indeed the concurrency is limited 
to partition -- we don't let operations on the same partition run concurrently, 
so all the groups that are mapped to the same partition are contending.  This 
is, however, a specific implementation choice, it should be possible to make a 
group to be a unit of concurrency, and if that's not enough, we can let offset 
commits for different partitions go concurrently as well (they just need to 
make sure that group doesn't change, which is sort of a "read lock" on the 
group), at which point there probably wouldn't be any contention in the common 
path.
   
   Now, one might ask a question, implementing per-group synchronization adds 
complexity and handling transaction verification as an explicit state 
transition in group coordinator adds complexity, what the difference?  I'd say 
the difference is fundamental -- per-group synchronization complexity is 
encapsulated in one component and keeps the system decoupled: an engineer 
tasked to improve transaction protocol, doesn't need understand implementation 
details of group coordinator and vice versa.  Changes are smaller, can be made 
faster, and less bug prone.  Win-win-win.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member under certain error conditions [kafka]

2023-11-09 Thread via GitHub


dajac commented on code in PR #14635:
URL: https://github.com/apache/kafka/pull/14635#discussion_r1388489571


##
clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java:
##
@@ -165,4 +169,55 @@ public void testEqualityWithMemberResponses() {
 assertEquals(primaryResponse.hashCode(), 
reversedResponse.hashCode());
 }
 }
+
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+public void testNoErrorNoMembersResponses(short version) {
+LeaveGroupResponseData data = new LeaveGroupResponseData()
+.setErrorCode(Errors.NONE.code())
+.setMembers(Collections.emptyList());
+
+if (version < 3) {
+assertThrows(UnsupportedVersionException.class,
+() -> new LeaveGroupResponse(data, version));
+} else {
+LeaveGroupResponse response = new LeaveGroupResponse(data, 
version);
+assertEquals(Errors.NONE, response.topLevelError());

Review Comment:
   nit: Should we also assert the members here?



##
clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java:
##
@@ -165,4 +169,55 @@ public void testEqualityWithMemberResponses() {
 assertEquals(primaryResponse.hashCode(), 
reversedResponse.hashCode());
 }
 }
+
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+public void testNoErrorNoMembersResponses(short version) {
+LeaveGroupResponseData data = new LeaveGroupResponseData()
+.setErrorCode(Errors.NONE.code())
+.setMembers(Collections.emptyList());
+
+if (version < 3) {
+assertThrows(UnsupportedVersionException.class,
+() -> new LeaveGroupResponse(data, version));
+} else {
+LeaveGroupResponse response = new LeaveGroupResponse(data, 
version);
+assertEquals(Errors.NONE, response.topLevelError());
+}
+}
+
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+public void testNoErrorMultipleMembersResponses(short version) {
+LeaveGroupResponseData data = new LeaveGroupResponseData()
+.setErrorCode(Errors.NONE.code())
+.setMembers(memberResponses);
+
+if (version < 3) {
+assertThrows(UnsupportedVersionException.class,
+ () -> new LeaveGroupResponse(data, version));

Review Comment:
   nit: Could we also intend this one with 4 spaces?



##
clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java:
##
@@ -165,4 +169,55 @@ public void testEqualityWithMemberResponses() {
 assertEquals(primaryResponse.hashCode(), 
reversedResponse.hashCode());
 }
 }
+
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+public void testNoErrorNoMembersResponses(short version) {
+LeaveGroupResponseData data = new LeaveGroupResponseData()
+.setErrorCode(Errors.NONE.code())
+.setMembers(Collections.emptyList());
+
+if (version < 3) {
+assertThrows(UnsupportedVersionException.class,
+() -> new LeaveGroupResponse(data, version));

Review Comment:
   nit: Could we indent this line with 4 spaces in order to remain consistent 
with the existing code?



##
clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java:
##
@@ -165,4 +169,55 @@ public void testEqualityWithMemberResponses() {
 assertEquals(primaryResponse.hashCode(), 
reversedResponse.hashCode());
 }
 }
+
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+public void testNoErrorNoMembersResponses(short version) {
+LeaveGroupResponseData data = new LeaveGroupResponseData()
+.setErrorCode(Errors.NONE.code())
+.setMembers(Collections.emptyList());
+
+if (version < 3) {
+assertThrows(UnsupportedVersionException.class,
+() -> new LeaveGroupResponse(data, version));
+} else {
+LeaveGroupResponse response = new LeaveGroupResponse(data, 
version);
+assertEquals(Errors.NONE, response.topLevelError());
+}
+}
+
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)
+public void testNoErrorMultipleMembersResponses(short version) {
+LeaveGroupResponseData data = new LeaveGroupResponseData()
+.setErrorCode(Errors.NONE.code())
+.setMembers(memberResponses);
+
+if (version < 3) {
+assertThrows(UnsupportedVersionException.class,
+ () -> new LeaveGroupResponse(data, version));
+} else {
+LeaveGroupResponse response = new LeaveGroupResponse(data, 
version);
+assertEquals(Errors.NONE, 

Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-09 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1388489436


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *

Re: [PR] KAFKA-15360: Include dirs in BrokerRegistration [kafka]

2023-11-09 Thread via GitHub


cmccabe closed pull request #14392: KAFKA-15360: Include dirs in 
BrokerRegistration 
URL: https://github.com/apache/kafka/pull/14392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15360: Include dirs in BrokerRegistration [kafka]

2023-11-09 Thread via GitHub


cmccabe commented on PR #14392:
URL: https://github.com/apache/kafka/pull/14392#issuecomment-1804419672

   Committed, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix GroupCoordinatorShardTest stubbing [kafka]

2023-11-09 Thread via GitHub


dajac commented on PR #14637:
URL: https://github.com/apache/kafka/pull/14637#issuecomment-1804364013

   Re-triggered a build because the last one failed due to, it seems, unrelated 
things. I merge if the next build passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] cherrypick KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-11-09 Thread via GitHub


jolshan commented on PR #14712:
URL: https://github.com/apache/kafka/pull/14712#issuecomment-1804347578

   I see this multiple times a day. Might be time to file another Infra ticket 
:( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1388420510


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -712,6 +729,10 @@ private void close(Duration timeout, boolean 
swallowException) {
 if (applicationEventHandler != null)
 closeQuietly(() -> applicationEventHandler.close(timeout), "Failed 
to close application event handler with a timeout(ms)=" + timeout, 
firstException);
 
+// Invoke all callbacks after the background thread exists in case if 
there are unsent async
+// commits
+maybeInvokeCallbacks();

Review Comment:
   When the consumer is closing, we don't really need to throw a fenced id 
exception, because when the consumer is fenced we want the consumer to close 
right?
   
   To your second question: yes, the work is inflight, KAFKA-15327.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member under certain error conditions [kafka]

2023-11-09 Thread via GitHub


dajac commented on PR #14635:
URL: https://github.com/apache/kafka/pull/14635#issuecomment-1804333051

   @wolfchimneyrock Understood. If you agree with my suggestion, I think that 
we could just do it. Sorry for the confusion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1388411256


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -1096,4 +,76 @@ private void subscribeInternal(Collection 
topics, Optional 0) {
+invoker.executeCallbacks();
+}
+}
+
+// Visible for testing
+int callbacks() {
+return invoker.callbackQueue.size();
+}
+
+/**
+ * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
+ * achieved by having the background thread to register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+ * future completion, and execute the callbacks when user 
polls/commits/closes the consumer.
+ */
+private class OffsetCommitCallbackInvoker {

Review Comment:
   Similar to OffsetCommitCallbackTask



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]

2023-11-09 Thread via GitHub


philipnee commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1388410485


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -1096,4 +,76 @@ private void subscribeInternal(Collection 
topics, Optional 0) {
+invoker.executeCallbacks();
+}
+}
+
+// Visible for testing
+int callbacks() {
+return invoker.callbackQueue.size();
+}
+
+/**
+ * Utility class that helps the application thread to invoke user 
registered {@link OffsetCommitCallback}. This is
+ * achieved by having the background thread to register a {@link 
OffsetCommitCallbackTask} to the invoker upon the
+ * future completion, and execute the callbacks when user 
polls/commits/closes the consumer.
+ */
+private class OffsetCommitCallbackInvoker {
+// Thread-safe queue to store callbacks
+private final BlockingQueue callbackQueue = 
new LinkedBlockingQueue<>();
+
+public void submit(final OffsetCommitCallbackTask callback) {
+try {
+callbackQueue.offer(callback);
+} catch (Exception e) {
+log.error("Failed to enqueue OffsetCommitCallback", e);
+}
+}
+
+public void executeCallbacks() {
+LinkedList callbacks = new 
LinkedList<>();
+callbackQueue.drainTo(callbacks);
+while (!callbacks.isEmpty()) {
+OffsetCommitCallbackTask callback = callbacks.poll();
+if (callback != null) {
+callback.invoke();
+}
+}
+}
+}
+
+private class OffsetCommitCallbackTask {

Review Comment:
   Hi, it is not static for the reason of the use of isFenced.  Let me know 
what do you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >