[GitHub] [kafka] showuon merged pull request #12251: MINOR: fix streams tutorial
showuon merged PR #12251: URL: https://github.com/apache/kafka/pull/12251 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13960) ERROR Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed (kafka.log.LogManager)
rohit k created KAFKA-13960: --- Summary: ERROR Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed (kafka.log.LogManager) Key: KAFKA-13960 URL: https://issues.apache.org/jira/browse/KAFKA-13960 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 3.1.1 Reporter: rohit k Fix For: 3.1.1 Attachments: kafka error.png Am using Kafka for chat application with Django rest framework and react native. while building my API I need to make an API for group deletion. Whenever , I create a group , it will make a NewTopic And i made a group deletion where i will delete that topic . from here, I got an error and cannot get back. whenever , i start kafka server am getting issue " ERROR Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed (kafka.log.LogManager)". I tried changing logs dirs but still error exists. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13960) ERROR Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed (kafka.log.LogManager)
[ https://issues.apache.org/jira/browse/KAFKA-13960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] rohit k updated KAFKA-13960: Issue Type: Test (was: Bug) > ERROR Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed > (kafka.log.LogManager) > > > Key: KAFKA-13960 > URL: https://issues.apache.org/jira/browse/KAFKA-13960 > Project: Kafka > Issue Type: Test > Components: admin >Affects Versions: 3.1.1 >Reporter: rohit k >Priority: Critical > Fix For: 3.1.1 > > Attachments: kafka error.png > > > Am using Kafka for chat application with Django rest framework and react > native. while building my API I need to make an API for group deletion. > Whenever , I create a group , it will make a NewTopic And i made a group > deletion where i will delete that topic . from here, I got an error and > cannot get back. whenever , i start kafka server am getting issue " ERROR > Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed > (kafka.log.LogManager)". I tried changing logs dirs but still error exists. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13598) idempotence producer is not enabled by default if not set explicitly
[ https://issues.apache.org/jira/browse/KAFKA-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547818#comment-17547818 ] Derek Troy-West commented on KAFKA-13598: - Very happy to [~showuon] I will just copy points 1-4 into the notes if that's ok? I'll raise a PR later this afternoon. Should I raise that PR on the apache-site github repository? > idempotence producer is not enabled by default if not set explicitly > > > Key: KAFKA-13598 > URL: https://issues.apache.org/jira/browse/KAFKA-13598 > Project: Kafka > Issue Type: Bug > Components: clients, config >Affects Versions: 3.1.0, 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.0.1, 3.2.0, 3.1.1 > > > In KAFKA-10619, we intended to enable idempotence by default, but this was > not achieved due to a bug in the config validation logic. The change from > acks=1 to acks=all worked correctly, however. > This is the following up for KIP-679: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default] > > Note: In KAFKA-13673, we'll disable idempotent producer when > acks/retries/max.in.flight config conflicts, to avoid breaking existing > producers. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ocadaruma commented on a diff in pull request #12251: MINOR: fix streams tutorial
ocadaruma commented on code in PR #12251: URL: https://github.com/apache/kafka/pull/12251#discussion_r889457352 ## docs/streams/tutorial.html: ## @@ -490,25 +490,25 @@ Writing a th Processor: KSTREAM-FLATMAPVALUES-01(stores: []) --> KSTREAM-KEY-SELECT-02 <-- KSTREAM-SOURCE-00 Processor: KSTREAM-KEY-SELECT-02(stores: []) --> KSTREAM-FILTER-05 <-- KSTREAM-FLATMAPVALUES-01 Processor: KSTREAM-FILTER-05(stores: []) --> KSTREAM-SINK-04 <-- KSTREAM-KEY-SELECT-02 -Sink: KSTREAM-SINK-04(topic: Counts-repartition) <-- KSTREAM-FILTER-05 +Sink: KSTREAM-SINK-04(topic: counts-store-repartition) <-- KSTREAM-FILTER-05 Sub-topology: 1 -Source: KSTREAM-SOURCE-06(topics: Counts-repartition) --> KSTREAM-AGGREGATE-03 -Processor: KSTREAM-AGGREGATE-03(stores: [Counts]) --> KTABLE-TOSTREAM-07 <-- KSTREAM-SOURCE-06 +Source: KSTREAM-SOURCE-06(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-03 +Processor: KSTREAM-AGGREGATE-03(stores: [counts-store]) --> KTABLE-TOSTREAM-07 <-- KSTREAM-SOURCE-06 Processor: KTABLE-TOSTREAM-07(stores: []) --> KSTREAM-SINK-08 <-- KSTREAM-AGGREGATE-03 Sink: KSTREAM-SINK-08(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-07 Global Stores: none As we can see above, the topology now contains two disconnected sub-topologies. -The first sub-topology's sink node KSTREAM-SINK-04 will write to a repartition topic Counts-repartition, +The first sub-topology's sink node KSTREAM-SINK-04 will write to a repartition topic counts-repartition, Review Comment: Oh that's right. Let me fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13926) Proposal to have "HasField" predicate for kafka connect
[ https://issues.apache.org/jira/browse/KAFKA-13926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kumud Kumar Srivatsava Tirupati resolved KAFKA-13926. - Resolution: Won't Fix Dropping in favor of improving the existing SMTs as per the discussion. https://lists.apache.org/thread/odbj7793plyz7xxyy6d71c3xn7zng49f > Proposal to have "HasField" predicate for kafka connect > --- > > Key: KAFKA-13926 > URL: https://issues.apache.org/jira/browse/KAFKA-13926 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Kumud Kumar Srivatsava Tirupati >Assignee: Kumud Kumar Srivatsava Tirupati >Priority: Major > > Hello, > Today's connect predicates enables checks on the record metadata. However, > this can be limiting considering {*}many inbuilt and custom transformations > that we (community) use are more key/value centric{*}. > Some use-cases this can solve: > * Data type conversions of certain pre-identified fields for records coming > across datasets only if those fields exist. [Ex: TimestampConverter can be > run only if the specified date field exists irrespective of the record > metadata] > * Skip running certain transform if a given field does/does not exist. A lot > of inbuilt transforms raise exceptions (Ex: InsertField transform if the > field already exists) thereby breaking the task. Giving this control enable > users to consciously configure for such cases. > * Even though some inbuilt transforms explicitly handle these cases, it > would still be an unnecessary pass-through loop. > * Considering each connector usually deals with multiple datasets (Even 100s > for a database CDC connector), metadata-centric predicate checking will be > somewhat limiting when we talk about such pre-identified custom metadata > fields in the records. > I know some of these cases can be handled within the transforms itself but > that defeats the purpose of having predicates. > We have built this predicate for us and it is found to be extremely helpful. > Please let me know your thoughts on the same so that I can raise a PR. > > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-845%3A+%27HasField%27+predicate+for+kafka+connect -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] showuon commented on a diff in pull request #12251: MINOR: fix streams tutorial
showuon commented on code in PR #12251: URL: https://github.com/apache/kafka/pull/12251#discussion_r889451065 ## docs/streams/tutorial.html: ## @@ -490,25 +490,25 @@ Writing a th Processor: KSTREAM-FLATMAPVALUES-01(stores: []) --> KSTREAM-KEY-SELECT-02 <-- KSTREAM-SOURCE-00 Processor: KSTREAM-KEY-SELECT-02(stores: []) --> KSTREAM-FILTER-05 <-- KSTREAM-FLATMAPVALUES-01 Processor: KSTREAM-FILTER-05(stores: []) --> KSTREAM-SINK-04 <-- KSTREAM-KEY-SELECT-02 -Sink: KSTREAM-SINK-04(topic: Counts-repartition) <-- KSTREAM-FILTER-05 +Sink: KSTREAM-SINK-04(topic: counts-store-repartition) <-- KSTREAM-FILTER-05 Sub-topology: 1 -Source: KSTREAM-SOURCE-06(topics: Counts-repartition) --> KSTREAM-AGGREGATE-03 -Processor: KSTREAM-AGGREGATE-03(stores: [Counts]) --> KTABLE-TOSTREAM-07 <-- KSTREAM-SOURCE-06 +Source: KSTREAM-SOURCE-06(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-03 +Processor: KSTREAM-AGGREGATE-03(stores: [counts-store]) --> KTABLE-TOSTREAM-07 <-- KSTREAM-SOURCE-06 Processor: KTABLE-TOSTREAM-07(stores: []) --> KSTREAM-SINK-08 <-- KSTREAM-AGGREGATE-03 Sink: KSTREAM-SINK-08(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-07 Global Stores: none As we can see above, the topology now contains two disconnected sub-topologies. -The first sub-topology's sink node KSTREAM-SINK-04 will write to a repartition topic Counts-repartition, +The first sub-topology's sink node KSTREAM-SINK-04 will write to a repartition topic counts-repartition, Review Comment: Should it be `counts-repartition` -> `counts-store-repartition` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13598) idempotence producer is not enabled by default if not set explicitly
[ https://issues.apache.org/jira/browse/KAFKA-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547813#comment-17547813 ] Luke Chen commented on KAFKA-13598: --- [~d-t-w] , yes, adding a note to notable changes in 3.2.0 section is a good idea. Are you interested in submitting a PR for it? Thanks. > idempotence producer is not enabled by default if not set explicitly > > > Key: KAFKA-13598 > URL: https://issues.apache.org/jira/browse/KAFKA-13598 > Project: Kafka > Issue Type: Bug > Components: clients, config >Affects Versions: 3.1.0, 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.0.1, 3.2.0, 3.1.1 > > > In KAFKA-10619, we intended to enable idempotence by default, but this was > not achieved due to a bug in the config validation logic. The change from > acks=1 to acks=all worked correctly, however. > This is the following up for KIP-679: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default] > > Note: In KAFKA-13673, we'll disable idempotent producer when > acks/retries/max.in.flight config conflicts, to avoid breaking existing > producers. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13598) idempotence producer is not enabled by default if not set explicitly
[ https://issues.apache.org/jira/browse/KAFKA-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547803#comment-17547803 ] Derek Troy-West commented on KAFKA-13598: - This represents a breaking change where: # Broker version is < 2.8.0 # Cluster has ACL configured, but no IDEMPOTENT_WRITE permission set # Producer has default configuration (or no config captured in KAFKA-13673) # Kafka Client library version is bumped to 3.2.0 The resulting production error has a ClusterAuthorizationException but it wasn't clear that a new ACL was required until we found KAFKA-13759. I think 1-3 might be quite a common scenario for Kafka users in the wild, and 4 is very easy to do on the application side. Perhaps it might be an idea to add a note to the 3.2.0 upgrade guide or release notes about this one. More info: https://kpow.io/articles/kafka-producer-breaking-change/ > idempotence producer is not enabled by default if not set explicitly > > > Key: KAFKA-13598 > URL: https://issues.apache.org/jira/browse/KAFKA-13598 > Project: Kafka > Issue Type: Bug > Components: clients, config >Affects Versions: 3.1.0, 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.0.1, 3.2.0, 3.1.1 > > > In KAFKA-10619, we intended to enable idempotence by default, but this was > not achieved due to a bug in the config validation logic. The change from > acks=1 to acks=all worked correctly, however. > This is the following up for KIP-679: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default] > > Note: In KAFKA-13673, we'll disable idempotent producer when > acks/retries/max.in.flight config conflicts, to avoid breaking existing > producers. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (KAFKA-13598) idempotence producer is not enabled by default if not set explicitly
[ https://issues.apache.org/jira/browse/KAFKA-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547803#comment-17547803 ] Derek Troy-West edited comment on KAFKA-13598 at 6/4/22 2:02 AM: - This represents a breaking change where: # Broker version is < 2.8.0 # Cluster has ACL configured, but no IDEMPOTENT_WRITE permission set # Producer has default configuration (or no config captured in KAFKA-13673) # Kafka Client library version is bumped to 3.2.0 in the producing application The resulting production error has a ClusterAuthorizationException but it wasn't clear that a new ACL was required until we found KAFKA-13759. I think 1-3 might be quite a common scenario for Kafka users in the wild, and 4 is very easy to do on the application side. Perhaps it might be an idea to add a note to the 3.2.0 upgrade guide or release notes about this one. More info: [https://kpow.io/articles/kafka-producer-breaking-change/] was (Author: d-t-w): This represents a breaking change where: # Broker version is < 2.8.0 # Cluster has ACL configured, but no IDEMPOTENT_WRITE permission set # Producer has default configuration (or no config captured in KAFKA-13673) # Kafka Client library version is bumped to 3.2.0 The resulting production error has a ClusterAuthorizationException but it wasn't clear that a new ACL was required until we found KAFKA-13759. I think 1-3 might be quite a common scenario for Kafka users in the wild, and 4 is very easy to do on the application side. Perhaps it might be an idea to add a note to the 3.2.0 upgrade guide or release notes about this one. More info: https://kpow.io/articles/kafka-producer-breaking-change/ > idempotence producer is not enabled by default if not set explicitly > > > Key: KAFKA-13598 > URL: https://issues.apache.org/jira/browse/KAFKA-13598 > Project: Kafka > Issue Type: Bug > Components: clients, config >Affects Versions: 3.1.0, 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.0.1, 3.2.0, 3.1.1 > > > In KAFKA-10619, we intended to enable idempotence by default, but this was > not achieved due to a bug in the config validation logic. The change from > acks=1 to acks=all worked correctly, however. > This is the following up for KIP-679: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default] > > Note: In KAFKA-13673, we'll disable idempotent producer when > acks/retries/max.in.flight config conflicts, to avoid breaking existing > producers. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dengziming commented on pull request #12236: MINOR: Use right enum value for broker registration change
dengziming commented on PR #12236: URL: https://github.com/apache/kafka/pull/12236#issuecomment-1146478325 > @dengziming I merge `trunk` to try to kick off a build. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
dengziming commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r889424223 ## clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.List; +import java.util.Objects; +import java.util.OptionalLong; + +/** + * This is used to describe per-partition state in the DescribeQuorumResponse. + */ +public class QuorumInfo { +private final String topic; Review Comment: I think It's better to add partition here because we are making way for multi-raft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
dengziming commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r889423970 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() { } } +@Test +public void testDescribeMetadataQuorumSuccess() throws Exception { +try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id, Review Comment: We can't use `NodeApiVersions.create()` here due to a bug here, which I'm trying to fix here: https://github.com/apache/kafka/pull/11784 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ocadaruma opened a new pull request, #12251: MINOR: fix streams tutorial
ocadaruma opened a new pull request, #12251: URL: https://github.com/apache/kafka/pull/12251 - Fix inconsistency in store name in streams tutorial ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13935) Factor out static IBP usages from broker
[ https://issues.apache.org/jira/browse/KAFKA-13935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-13935: - Summary: Factor out static IBP usages from broker (was: Factor out IBP from Partition and Log code) > Factor out static IBP usages from broker > > > Key: KAFKA-13935 > URL: https://issues.apache.org/jira/browse/KAFKA-13935 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.3.0 >Reporter: David Arthur >Priority: Major > > We pass the IBP down to the log layer for checking things like compression > support. Currently, we are still reading this from KafkaConfig. In ZK mode > this is fine, but in KRaft mode, reading the IBP from the config is not > supported. > Since KRaft only supports IBP/MetadataVersion greater than 3.0 (which > supports the compression mode we check for), we may be able to avoid using a > dynamic call and/or volatile to get the current version. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889363536 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -548,14 +549,18 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } } } else { -// For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. +// For manually assigned partitions, we do not try to pro-actively lookup coordinator; +// instead we only try to refresh metadata when necessary. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. -if (coordinatorUnknownAndUnready(timer)) { -return false; +if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { +client.awaitMetadataUpdate(timer); } + +// if there is pending coordinator requests, ensure they have a chance to be transmitted. Review Comment: This is a major change while addressing @dajac 's comment: previously the manual assignment, the `coordinator.poll` call would not call `networkClient.poll`, which means that if the coordinator discovery does not complete within the `commitAsync` (note we call `networkClient.poll` twice in that function, so it's possible that function would complete the discovery), we would have no other places to poll the network client to complete the pending requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889361765 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() { coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); -// should find an available node in next find coordinator request +// should not try to find coordinator since we are in manual assignment +// hence the prepared response should not be returned client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); +assertTrue(coordinator.coordinatorUnknown()); +} + +@Test +public void testAutoCommitAsyncWithUserAssignedType() { +try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions) +) { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error +client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); +// set timeout to 0 because we don't want to retry after the error +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); + +// elapse auto commit interval and set committable position +time.sleep(autoCommitIntervalMs); +subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L)); + +// should try to find coordinator since we are auto committing +client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.poll(time.timer(Long.MAX_VALUE)); +assertFalse(coordinator.coordinatorUnknown()); +} +} + +@Test +public void testCommitAsyncWithUserAssignedType() { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error +client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); +// set timeout to 0 because we don't want to retry after the error +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); Review Comment: Ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
ableegoldman commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r889358327 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java: ## @@ -388,6 +401,40 @@ private String nodeSensorPrefix(final String threadId, final String taskId, fina + SENSOR_PREFIX_DELIMITER + SENSOR_NODE_LABEL + SENSOR_PREFIX_DELIMITER + processorNodeName; } +public Sensor topicLevelSensor(final String threadId, + final String taskId, + final String processorNodeName, + final String topicName, Review Comment: I did update it to include the `topic` tag already, but I haven't updated it yet to clarify that we are introducing these at a new "topic-level" metrics scope. I'll do that right now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13944) Shutting down broker can be elected as partition leader in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547157#comment-17547157 ] Jose Armando Garcia Sancio commented on KAFKA-13944: Looks like this issue is addressed by https://issues.apache.org/jira/browse/KAFKA-13916 > Shutting down broker can be elected as partition leader in KRaft > > > Key: KAFKA-13944 > URL: https://issues.apache.org/jira/browse/KAFKA-13944 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > > When a broker requests shutdown, it transitions to the CONTROLLED_SHUTDOWN > state in the controller. It is possible for the broker to remain unfenced in > this state until the controlled shutdown completes. When doing an election, > the only thing we generally check is that the broker is unfenced, so this > means we can elect a broker that is in controlled shutdown. > Here are a few snippets from a recent system test in which this occurred: > {code:java} > // broker 2 starts controlled shutdown > [2022-05-26 21:17:26,451] INFO [Controller 3001] Unfenced broker 2 has > requested and been granted a controlled shutdown. > (org.apache.kafka.controller.BrokerHeartbeatManager) > > // there is only one replica, so we set leader to -1 > [2022-05-26 21:17:26,452] DEBUG [Controller 3001] partition change for _foo-1 > with topic ID _iUQ72T_R4mmZgI3WrsyXw: leader: 2 -> -1, leaderEpoch: 0 -> 1, > partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager) > // controlled shutdown cannot complete immediately > [2022-05-26 21:17:26,529] DEBUG [Controller 3001] The request from broker 2 > to shut down can not yet be granted because the lowest active offset 177 is > not greater than the broker's shutdown offset 244. > (org.apache.kafka.controller.BrokerHeartbeatManager) > [2022-05-26 21:17:26,530] DEBUG [Controller 3001] Updated the controlled > shutdown offset for broker 2 to 244. > (org.apache.kafka.controller.BrokerHeartbeatManager) > // later on we elect leader 2 again > [2022-05-26 21:17:27,703] DEBUG [Controller 3001] partition change for _foo-1 > with topic ID _iUQ72T_R4mmZgI3WrsyXw: leader: -1 -> 2, leaderEpoch: 1 -> 2, > partitionEpoch: 1 -> 2 (org.apache.kafka.controller.ReplicationControlManager) > // now controlled shutdown is stuck because of the newly elected leader > [2022-05-26 21:17:28,531] DEBUG [Controller 3001] Broker 2 is in controlled > shutdown state, but can not shut down because more leaders still need to be > moved. (org.apache.kafka.controller.BrokerHeartbeatManager) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] jsancio commented on pull request #12236: MINOR: Use right enum value for broker registration change
jsancio commented on PR #12236: URL: https://github.com/apache/kafka/pull/12236#issuecomment-1146366630 @dengziming I merge `trunk` to try to kick off a build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889351949 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() { coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); -// should find an available node in next find coordinator request +// should not try to find coordinator since we are in manual assignment +// hence the prepared response should not be returned client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); +assertTrue(coordinator.coordinatorUnknown()); +} + +@Test +public void testAutoCommitAsyncWithUserAssignedType() { +try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions) +) { Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889350182 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -553,8 +554,8 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. -if (coordinatorUnknownAndUnready(timer)) { -return false; +if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies
[ https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jim Hughes updated KAFKA-13873: --- Fix Version/s: 3.3.0 > Add ability to Pause / Resume KafkaStreams Topologies > - > > Key: KAFKA-13873 > URL: https://issues.apache.org/jira/browse/KAFKA-13873 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Major > Labels: kip > Fix For: 3.3.0 > > > In order to reduce resources used or modify data pipelines, users may want to > pause processing temporarily. Presently, this would require stopping the > entire KafkaStreams instance (or instances). > This work would add the ability to pause and resume topologies. When the > need to pause processing has passed, then users should be able to resume > processing. > KIP-834: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] aartigao closed pull request #10929: KAFKA-12995 [WIP] Allow old Broker compatibility for Metadata calls
aartigao closed pull request #10929: KAFKA-12995 [WIP] Allow old Broker compatibility for Metadata calls URL: https://github.com/apache/kafka/pull/10929 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12245: KAFKA-13410; Add a --release-version flag for storage-tool
jsancio commented on code in PR #12245: URL: https://github.com/apache/kafka/pull/12245#discussion_r889255627 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -162,7 +163,7 @@ Found problem: val stream = new ByteArrayOutputStream() assertEquals(0, StorageTool. formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = false)) - assertEquals("Formatting %s%n".format(tempDir), stream.toString()) + assertTrue(stream.toString().startsWith("Formatting %s".format(tempDir))) Review Comment: Minor but you can use Scala string interpolation. E.g. `s"Formatting $tempDir"`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13959) Controller should unfence Broker with busy metadata log
[ https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17546695#comment-17546695 ] Jose Armando Garcia Sancio commented on KAFKA-13959: [~dengziming] [~showuon] Are you interested in working on this issue? > Controller should unfence Broker with busy metadata log > --- > > Key: KAFKA-13959 > URL: https://issues.apache.org/jira/browse/KAFKA-13959 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.0 >Reporter: Jose Armando Garcia Sancio >Priority: Blocker > > https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible > for the controller to not unfence a broker if the committed offset keeps > increasing. > > One solution to this problem is to require the broker to only catch up to the > last committed offset when they last sent the heartbeat. For example: > # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit > offset is {{{}X{}}}. The controller remember this last commit offset, call it > {{X'}} > # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence > the broker if {{Z >= X}} or {{{}Z >= X'{}}}. > > This change should also set the default for MetadataMaxIdleIntervalMs back to > 500. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13959) Controller should unfence Broker with busy metadata log
[ https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-13959: --- Description: https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible for the controller to not unfence a broker if the committed offset keeps increasing. One solution to this problem is to require the broker to only catch up to the last committed offset when they last sent the heartbeat. For example: # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit offset is {{{}X{}}}. The controller remember this last commit offset, call it {{X'}} # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence the broker if {{Z >= X}} or {{{}Z >= X'{}}}. This change should also set the default for MetadataMaxIdleIntervalMs back to 500. was: https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible for the controller to not unfence a broker if the committed offset keeps increasing. One solution to this problem is to require the broker to only catch up to the last committed offset when they last sent the heartbeat. For example: # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit offset is {{{}X{}}}. The controller remember this last commit offset, call it {{X'}} # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence the broker if {{Z >= X}} or {{{}Z >= X'{}}}. > Controller should unfence Broker with busy metadata log > --- > > Key: KAFKA-13959 > URL: https://issues.apache.org/jira/browse/KAFKA-13959 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.0 >Reporter: Jose Armando Garcia Sancio >Priority: Blocker > > https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible > for the controller to not unfence a broker if the committed offset keeps > increasing. > > One solution to this problem is to require the broker to only catch up to the > last committed offset when they last sent the heartbeat. For example: > # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit > offset is {{{}X{}}}. The controller remember this last commit offset, call it > {{X'}} > # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence > the broker if {{Z >= X}} or {{{}Z >= X'{}}}. > > This change should also set the default for MetadataMaxIdleIntervalMs back to > 500. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] jsancio commented on pull request #12238: KAFKA-13955: Fix failing KRaftClusterTest tests
jsancio commented on PR #12238: URL: https://github.com/apache/kafka/pull/12238#issuecomment-1146263145 I merged this PR to fix the tests and I created https://issues.apache.org/jira/browse/KAFKA-13959. > @showuon Yes, after we reach a consensus here, we should update the KIP and note it in the KIP discussion thread. @dengziming @showuon My preference is to fix KAFKA-13959 and revert this commit before we ship 3.3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13959) Controller should unfence Broker with busy metadata log
Jose Armando Garcia Sancio created KAFKA-13959: -- Summary: Controller should unfence Broker with busy metadata log Key: KAFKA-13959 URL: https://issues.apache.org/jira/browse/KAFKA-13959 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.3.0 Reporter: Jose Armando Garcia Sancio https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible for the controller to not unfence a broker if the committed offset keeps increasing. One solution to this problem is to require the broker to only catch up to the last committed offset when they last sent the heartbeat. For example: # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit offset is {{{}X{}}}. The controller remember this last commit offset, call it {{X'}} # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence the broker if {{Z >= X}} or {{{}Z >= X'{}}}. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
niket-goel commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r889238615 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() { } } +@Test +public void testDescribeMetadataQuorumSuccess() throws Exception { +try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id, Review Comment: Not sure about this comment. I think I would need to add the `DESCRIBE_QUORUM` API to the list of known APIs even if we used the default constructor, right? If I do not do that, the test encounters an error where it claims to not know about the `DESCRIBE_QUORUM` API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #12238: KAFKA-13955: Fix failing KRaftClusterTest tests
jsancio merged PR #12238: URL: https://github.com/apache/kafka/pull/12238 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12238: KIP-835: MetadataMaxIdleIntervalMs should be much bigger than brokerHeartbeatIntervalMs
jsancio commented on PR #12238: URL: https://github.com/apache/kafka/pull/12238#issuecomment-1146244214 These are the failing tests: ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testRemovingWorker Build / JDK 8 and Scala 2.12 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() ``` They look unrelated to KRaft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
niket-goel commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r889220152 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } +@Override +public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { +NodeProvider provider = new LeastLoadedNodeProvider(); + +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +final Call call = new Call( +"describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + +private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) { +Integer partition = 0; +String topicName = response.getTopicNameByIndex(0); +Integer leaderId = response.getPartitionLeaderId(topicName, partition); +List voters = new ArrayList<>(); +List observers = new ArrayList<>(); +response.getVoterInfo(topicName, partition).forEach(v -> { +voters.add(new QuorumInfo.ReplicaState(v.replicaId(), +v.logEndOffset(), +OptionalLong.of(v.lastFetchTimestamp()), +OptionalLong.of(v.lastCaughtUpTimestamp(; +}); +response.getObserverInfo(topicName, partition).forEach(o -> { +observers.add(new QuorumInfo.ReplicaState(o.replicaId(), +o.logEndOffset(), +OptionalLong.of(o.lastFetchTimestamp()), +OptionalLong.of(o.lastCaughtUpTimestamp(; +}); +QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers); +return info; +} + +@Override +DescribeQuorumRequest.Builder createRequest(int timeoutMs) { +return new Builder(DescribeQuorumRequest.singletonRequest( +new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition(; +} + +@Override +void handleResponse(AbstractResponse response) { +final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response; +try { +if (quorumResponse.data().errorCode() != Errors.NONE.code()) { +throw Errors.forCode(quorumResponse.data().errorCode()).exception(); +} +if (quorumResponse.data().topics().size() > 1) { +log.error("DescribeMetadataQuorum received {} topics when 1 was expected", +quorumResponse.data().topics().size()); +throw new UnknownServerException(); Review Comment: Good point about adding the message to the exception being returned. The behavior of the code block is equivalent to completing the future and returning. I just didn't want to repeat the future completion code. Is there a well known convention to handle code blocks like this in JAVA? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13936) Invalid consumer lag when monitoring from a kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-13936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17546407#comment-17546407 ] Matthias J. Sax commented on KAFKA-13936: - The docs are in the same repository as the code: [https://github.com/apache/kafka/tree/trunk/docs] A good place might be the consumer monitoring section [https://kafka.apache.org/documentation/#consumer_fetch_monitoring] ? > Invalid consumer lag when monitoring from a kafka streams application > - > > Key: KAFKA-13936 > URL: https://issues.apache.org/jira/browse/KAFKA-13936 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Prashanth Joseph Babu >Priority: Major > > I have a kafka streams application and I'm trying to monitor the consumer lag > via stream metrics. > Here's some code snippet > {code:java} > metrics = streams.metrics(); > lag = 0; > for (Metric m : metrics.values()) { > tags = m.metricName().tags(); > if ( m.metricName().name().equals(MONITOR_CONSUMER_LAG) && > tags.containsKey(MONTOR_TAG_TOPIC) && > tags.get(MONTOR_TAG_TOPIC).equals(inputTopic) ) { > partitionLag = > Float.valueOf(m.metricValue().toString()).floatValue(); > if ( !partitionLag.isNaN() ) { > lag += partitionLag; > } > } > } > {code} > Here MONITOR_CONSUMER_LAG is {{{}records-lag-max{}}}. > However these numbers dont match with the consumer lag we see in the kafka UI > . is records-lag-max the right metric to track for a kafka streams > application when the objective is to get consumer lag? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
guozhangwang commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r889168252 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java: ## @@ -388,6 +401,40 @@ private String nodeSensorPrefix(final String threadId, final String taskId, fina + SENSOR_PREFIX_DELIMITER + SENSOR_NODE_LABEL + SENSOR_PREFIX_DELIMITER + processorNodeName; } +public Sensor topicLevelSensor(final String threadId, + final String taskId, + final String processorNodeName, + final String topicName, Review Comment: I cannot access the apache wiki atm (seems down temporarily) but just double checking is topic name part of the tag in the KIP proposal as well? If not we should update the wiki and the KIP thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12238: KIP-835: MetadataMaxIdleIntervalMs should be much bigger than brokerHeartbeatIntervalMs
cmccabe commented on PR #12238: URL: https://github.com/apache/kafka/pull/12238#issuecomment-1146163957 > Another solution is to require the broker to only catch up to the last committed offset when they last sent the heartbeat. I like this solution too. However, there are some complexities here (we'd want to make sure the heartbeat wasn't too long ago) Bumping the no-op timeout to might be a good quick fix until we have time to implement that (although I wonder why we can't use 4 seconds rather than 5?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17546196#comment-17546196 ] Jun Rao commented on KAFKA-13953: - [~abasilbr] : If you have multiple replicas, it's unlikely all replicas are corrupted. You can just delete the corrupted replica without losing data. Also, is this transient? The corruption could also be caused during network transfer. > kafka Console consumer fails with CorruptRecordException > - > > Key: KAFKA-13953 > URL: https://issues.apache.org/jira/browse/KAFKA-13953 > Project: Kafka > Issue Type: Bug > Components: consumer, controller, core >Affects Versions: 2.7.0 >Reporter: Aldan Brito >Priority: Blocker > > Kafka consumer fails with corrupt record exception. > {code:java} > opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: > --topic BQR-PULL-DEFAULT --from-beginning > > /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest > [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating > consumer process: (kafka.tools.ConsoleConsumer$) > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record > to continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size > 0 is less than the minimum record overhead (14) > Processed a total of 15765197 messages {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13803) Refactor Leader API Access
[ https://issues.apache.org/jira/browse/KAFKA-13803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13803. - Fix Version/s: 3.3.0 Resolution: Fixed merged the PR to trunk > Refactor Leader API Access > -- > > Key: KAFKA-13803 > URL: https://issues.apache.org/jira/browse/KAFKA-13803 > Project: Kafka > Issue Type: Improvement >Reporter: Rittika Adhikari >Assignee: Rittika Adhikari >Priority: Major > Fix For: 3.3.0 > > > Currently, AbstractFetcherThread has a series of protected APIs which control > access to the Leader. ReplicaFetcherThread and ReplicaAlterLogDirsThread > respectively override these protected APIs and handle access to the Leader in > a remote broker leader and a local leader context. > We propose to move these protected APIs to a LeaderEndPoint interface, which > will serve all fetches from the Leader. We will implement a > RemoteLeaderEndPoint and a LocalLeaderEndPoint accordingly. This change will > greatly simplify our existing follower fetch code. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] junrao merged pull request #12005: KAFKA-13803: Refactor Leader API Access
junrao merged PR #12005: URL: https://github.com/apache/kafka/pull/12005 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #12249: MINOR; Test last committed record offset for Controllers
jsancio opened a new pull request, #12249: URL: https://github.com/apache/kafka/pull/12249 As part of KIP-835, LastCommittedRecordOffset was added to the KafkaController metric type. Make sure to test that metric. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…
mimaison opened a new pull request, #12248: URL: https://github.com/apache/kafka/pull/12248 …-827) This implements KIP-827: https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+logdirs+total+and+usable+space+via+Kafka+API Add TotalBytes and UsableBytes to DescribeLogDirsResponse Add matching getters on LogDirDescription ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #12247: MINOR: replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER
dajac merged PR #12247: URL: https://github.com/apache/kafka/pull/12247 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12247: MINOR: replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER
dajac commented on PR #12247: URL: https://github.com/apache/kafka/pull/12247#issuecomment-1146095417 @AnGg98 Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AnGg98 commented on pull request #12247: MINOR: replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER
AnGg98 commented on PR #12247: URL: https://github.com/apache/kafka/pull/12247#issuecomment-1146080043 Test run successful: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-06-03--001.system-test-kafka-branch-builder--1654259402--AnGg98--acl-authorizer--65a5a3106e/report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13883) KIP-835: Monitor Quorum
[ https://issues.apache.org/jira/browse/KAFKA-13883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio resolved KAFKA-13883. Resolution: Fixed > KIP-835: Monitor Quorum > --- > > Key: KAFKA-13883 > URL: https://issues.apache.org/jira/browse/KAFKA-13883 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > Tracking issue for the implementation of KIP-835. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13918) Schedule or cancel nooprecord write on metadata version change
[ https://issues.apache.org/jira/browse/KAFKA-13918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio resolved KAFKA-13918. Resolution: Duplicate > Schedule or cancel nooprecord write on metadata version change > -- > > Key: KAFKA-13918 > URL: https://issues.apache.org/jira/browse/KAFKA-13918 > Project: Kafka > Issue Type: Sub-task > Components: controller >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] vvcephei commented on pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
vvcephei commented on PR #12235: URL: https://github.com/apache/kafka/pull/12235#issuecomment-1146046545 @ableegoldman , you might want to take a look at the build I cancelled. Before it got stuck, there were a large number of failures: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12235/9/#showFailuresLink -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
vvcephei commented on PR #12235: URL: https://github.com/apache/kafka/pull/12235#issuecomment-1146044849 The prior build was hung. I cancelled it so the latest commit could be built. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit
dajac commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889014500 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() { coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); -// should find an available node in next find coordinator request +// should not try to find coordinator since we are in manual assignment +// hence the prepared response should not be returned client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); +assertTrue(coordinator.coordinatorUnknown()); +} + +@Test +public void testAutoCommitAsyncWithUserAssignedType() { +try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions) +) { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error +client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); +// set timeout to 0 because we don't want to retry after the error +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); + +// elapse auto commit interval and set committable position +time.sleep(autoCommitIntervalMs); +subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L)); + +// should try to find coordinator since we are auto committing +client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.poll(time.timer(Long.MAX_VALUE)); +assertFalse(coordinator.coordinatorUnknown()); +} +} + +@Test +public void testCommitAsyncWithUserAssignedType() { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error +client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); +// set timeout to 0 because we don't want to retry after the error +coordinator.poll(time.timer(0)); +assertTrue(coordinator.coordinatorUnknown()); Review Comment: Should we also assert that there is not inflight requests after calling poll? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() { coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); -// should find an available node in next find coordinator request +// should not try to find coordinator since we are in manual assignment +// hence the prepared response should not be returned client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(Long.MAX_VALUE)); +assertTrue(coordinator.coordinatorUnknown()); +} + +@Test +public void testAutoCommitAsyncWithUserAssignedType() { +try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions) +) { +subscriptions.assignFromUser(Collections.singleton(t1p)); +// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error +client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); Review Comment: Same question as before. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -553,8 +554,8 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. -if (coordinatorUnknownAndUnready(timer)) { -return false; +if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { Review Comment: nit: I think that we need to remove `if coordinator is unknown, make sure we lookup one and` from the above comment (first sentence). ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -514,9 +514,50 @@ public void testCoordinatorNotAvailableWithUserAssignedType() { coordinator.poll(time.timer(0));
[jira] [Assigned] (KAFKA-13908) RuntimeException will be thrown in BrokerServer.startup, not the cause of exception
[ https://issues.apache.org/jira/browse/KAFKA-13908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Joerger reassigned KAFKA-13908: --- Assignee: Richard Joerger > RuntimeException will be thrown in BrokerServer.startup, not the cause of > exception > --- > > Key: KAFKA-13908 > URL: https://issues.apache.org/jira/browse/KAFKA-13908 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Richard Joerger >Priority: Major > Labels: newbie > > Before [#11969|https://github.com/apache/kafka/pull/11969], We will throw an > {{ExecutionException(KafkaStorageException)}} in > {{{}BrokerServer.startup{}}}, and we'll catch the exception and rethrow the > cause by: > {code:java} > throw if (e.isInstanceOf[ExecutionException]) e.getCause else e{code} > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/Kafka.scala#L113] > > After [#11969|https://github.com/apache/kafka/pull/11969], We will throw a > {{{}RuntimeException(ExecutionException(KafkaStorageException)){}}}. But the > catch logic didn't change. That means, if the exception is RuntimeException, > we won't throw only the cause, but all the exception chains. > > We should update it and add tests for it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] Moovlin commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…
Moovlin commented on PR #12170: URL: https://github.com/apache/kafka/pull/12170#issuecomment-1146034275 @showuon You were correct. I went and added the "Configs:" column to the quickstart output. Great catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on PR #11779: URL: https://github.com/apache/kafka/pull/11779#issuecomment-1146012029 Given that all merge conflicts have been resolved and https://github.com/apache/kafka/pull/11778 has already been approved, marking this as ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Moovlin commented on pull request #12246: KAFKA-13718: kafka-topics describe topic with default config will show `segment.bytes` overridden config
Moovlin commented on PR #12246: URL: https://github.com/apache/kafka/pull/12246#issuecomment-1145980276 @showuon Thanks! Happy to help contribute to the project! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output
[ https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545877#comment-17545877 ] Richard Joerger commented on KAFKA-13875: - The Jira which was blocking this previously has been resolved. Please let me know in the PR if there are any other changes which should be made to the documentation. Thanks! > update docs to include topoicId for kafka-topics.sh --describe output > - > > Key: KAFKA-13875 > URL: https://issues.apache.org/jira/browse/KAFKA-13875 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 3.2.0 >Reporter: Luke Chen >Assignee: Richard Joerger >Priority: Major > Labels: newbie > > The topic describe output in quickstart doc here: > [https://kafka.apache.org/quickstart] should be updated now. > {code:java} > bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server > localhost:9092 > Topic:quickstart-events PartitionCount:1ReplicationFactor:1 Configs: > Topic: quickstart-events Partition: 0Leader: 0 Replicas: 0 Isr: > 0{code} > After Topic Id implementation, we included the topic id info in the output > now. Also the configs is not empty now. The doc should be updated to avoid > new users get confused. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] showuon commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…
showuon commented on PR #12170: URL: https://github.com/apache/kafka/pull/12170#issuecomment-1145979453 @Moovlin , IIRC, even if there's no any overridden configs, the `configs:` column will still be displayed. Could you confirm it by running the command once with the latest code? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
dajac commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r888913198 ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -1446,6 +1446,35 @@ default DescribeFeaturesResult describeFeatures() { */ UpdateFeaturesResult updateFeatures(Map featureUpdates, UpdateFeaturesOptions options); +/** + * Describes the state of the metadata quorum. + * + * This is a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)} with default options. Review Comment: nit: There is an extra space before `with`. ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java: ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; + +/** + * The result of {@link Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)} + * + */ +public class DescribeMetadataQuorumResult { + +private final KafkaFuture quorumInfo; + +DescribeMetadataQuorumResult(KafkaFuture quorumInfo) { +this.quorumInfo = quorumInfo; +} + +/** + * Returns a future QuorumInfo Review Comment: nit: Should we say `Returns a future containing the quorum info.`? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumOptions.java: ## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +/** + * Options for {@link ConfluentAdmin#describeQuorum(DescribeMetadataQuorumOptions)}. + * Review Comment: nit: We could remove this empty line. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } +@Override +public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { +NodeProvider provider = new LeastLoadedNodeProvider(); + +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +final Call call = new Call( +"describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + +private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) { +Integer partition = 0; +String topicName = response.getTopicNameByIndex(0); +Integer leaderId = response.getPartitionLeaderId(topicName, partition); +List voters = new ArrayList<>(); +List observers = new ArrayList<>(); +response.getVoterInfo(topicName, partition).forEach(v -> { +voters.add(new QuorumInfo.ReplicaState(v.replicaId(), +v.logEndOffset(), +OptionalLong.of(v.lastFetchTimestamp()), +OptionalLong.of(v.lastCaughtUpTimestamp(; +}); +response.getObserverInfo(topicName, partition).forEach(o -> { +observers.add(new QuorumInfo.ReplicaState(o.replicaId(), +o.logEndOffset(), +OptionalLong.of(o.lastFetchTimestamp()), +
[GitHub] [kafka] Moovlin commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…
Moovlin commented on PR #12170: URL: https://github.com/apache/kafka/pull/12170#issuecomment-1145959126 @dengziming I went and submitted a PR for KAFKA-13718 which has since been merged into trunk. The PR (12246) keeps the kafka-topics.sh command from printing the segment.bytes values when describing the topic. Because of this, when end users now follow the quickstart, what they see and what they get should be the same from as the docs. Thanks so much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output
[ https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Joerger reassigned KAFKA-13875: --- Assignee: Richard Joerger > update docs to include topoicId for kafka-topics.sh --describe output > - > > Key: KAFKA-13875 > URL: https://issues.apache.org/jira/browse/KAFKA-13875 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 3.2.0 >Reporter: Luke Chen >Assignee: Richard Joerger >Priority: Major > Labels: newbie > > The topic describe output in quickstart doc here: > [https://kafka.apache.org/quickstart] should be updated now. > {code:java} > bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server > localhost:9092 > Topic:quickstart-events PartitionCount:1ReplicationFactor:1 Configs: > Topic: quickstart-events Partition: 0Leader: 0 Replicas: 0 Isr: > 0{code} > After Topic Id implementation, we included the topic id info in the output > now. Also the configs is not empty now. The doc should be updated to avoid > new users get confused. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13718) kafka-topics describe topic with default config will show `segment.bytes` overridden config
[ https://issues.apache.org/jira/browse/KAFKA-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545865#comment-17545865 ] Richard Joerger commented on KAFKA-13718: - There was a question on if segment.bytes should be included. As was decided in this Jira, we should remove it. This means that it should therefore be removed in the documentation as well. > kafka-topics describe topic with default config will show `segment.bytes` > overridden config > > > Key: KAFKA-13718 > URL: https://issues.apache.org/jira/browse/KAFKA-13718 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.1.0, 2.8.1, 3.0.0 >Reporter: Luke Chen >Assignee: Richard Joerger >Priority: Major > Labels: newbie, newbie++ > Fix For: 3.3.0 > > > Following the quickstart guide[1], when describing the topic just created > with default config, I found there's a overridden config shown: > _> bin/kafka-topics.sh --describe --topic quickstart-events > --bootstrap-server localhost:9092_ > _Topic: quickstart-events TopicId: 06zRrzDCRceR9zWAf_BUWQ > PartitionCount: 1 ReplicationFactor: 1 *Configs: > segment.bytes=1073741824*_ > _Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 > Isr: 0_ > > This config result should be empty as in Kafka quick start page. Although the > config value is what we expected (default 1GB value), this info display still > confuse users. > > Note: I checked the 2.8.1 build, this issue also happened. > > [1]: [https://kafka.apache.org/quickstart] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] Moovlin commented on pull request #12167: KAFKA-13716 Added the DeleteRecordsCommandTest to test the CLI front end of the D…
Moovlin commented on PR #12167: URL: https://github.com/apache/kafka/pull/12167#issuecomment-1145952034 Hey @showuon , just looking to see if you have any input on the last commit I made to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13936) Invalid consumer lag when monitoring from a kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-13936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545846#comment-17545846 ] Prashanth Joseph Babu commented on KAFKA-13936: --- [~mjsax] sure ! I'd love to ! I'm not sure where the doc repos are and what would be the right place to mention this point > Invalid consumer lag when monitoring from a kafka streams application > - > > Key: KAFKA-13936 > URL: https://issues.apache.org/jira/browse/KAFKA-13936 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Prashanth Joseph Babu >Priority: Major > > I have a kafka streams application and I'm trying to monitor the consumer lag > via stream metrics. > Here's some code snippet > {code:java} > metrics = streams.metrics(); > lag = 0; > for (Metric m : metrics.values()) { > tags = m.metricName().tags(); > if ( m.metricName().name().equals(MONITOR_CONSUMER_LAG) && > tags.containsKey(MONTOR_TAG_TOPIC) && > tags.get(MONTOR_TAG_TOPIC).equals(inputTopic) ) { > partitionLag = > Float.valueOf(m.metricValue().toString()).floatValue(); > if ( !partitionLag.isNaN() ) { > lag += partitionLag; > } > } > } > {code} > Here MONITOR_CONSUMER_LAG is {{{}records-lag-max{}}}. > However these numbers dont match with the consumer lag we see in the kafka UI > . is records-lag-max the right metric to track for a kafka streams > application when the objective is to get consumer lag? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13958) Expose logdirs total and usable space via Kafka API
Mickael Maison created KAFKA-13958: -- Summary: Expose logdirs total and usable space via Kafka API Key: KAFKA-13958 URL: https://issues.apache.org/jira/browse/KAFKA-13958 Project: Kafka Issue Type: Bug Reporter: Mickael Maison Assignee: Mickael Maison JIRA for KIP-827: https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+logdirs+total+and+usable+space+via+Kafka+API -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545837#comment-17545837 ] Aldan Brito commented on KAFKA-13953: - hi [~junrao] , there is no evidence of data corruption on the kafka broker logs, is this expected, is deleting the log segment the only way to recover the system ? – this would mean data loss right. > kafka Console consumer fails with CorruptRecordException > - > > Key: KAFKA-13953 > URL: https://issues.apache.org/jira/browse/KAFKA-13953 > Project: Kafka > Issue Type: Bug > Components: consumer, controller, core >Affects Versions: 2.7.0 >Reporter: Aldan Brito >Priority: Blocker > > Kafka consumer fails with corrupt record exception. > {code:java} > opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: > --topic BQR-PULL-DEFAULT --from-beginning > > /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest > [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating > consumer process: (kafka.tools.ConsoleConsumer$) > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record > to continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size > 0 is less than the minimum record overhead (14) > Processed a total of 15765197 messages {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13957) Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-13957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545833#comment-17545833 ] A. Sophie Blee-Goldman commented on KAFKA-13957: Failed again > Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores > - > > Key: KAFKA-13957 > URL: https://issues.apache.org/jira/browse/KAFKA-13957 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > Attachments: > StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores.rtf > > > Failed on a local build so I have the full logs (attached) > {code:java} > java.lang.AssertionError: Unexpected exception thrown while getting the value > from store. > Expected: is (a string containing "Cannot get state store source-table > because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string > containing "The state store, source-table, may have migrated to another > instance" or a string containing "Cannot get state store source-table because > the stream thread is STARTING, not RUNNING") > but: was "The specified partition 1 for store source-table does not > exist." > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.verifyRetrievableException(StoreQueryIntegrationTest.java:539) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQuerySpecificActivePartitionStores$5(StoreQueryIntegrationTest.java:241) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:557) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:183) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] showuon commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…
showuon commented on code in PR #12179: URL: https://github.com/apache/kafka/pull/12179#discussion_r27862 ## clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java: ## @@ -679,10 +679,11 @@ private long calcCompletionTimesAndReturnSessionLifetimeMs() { else if (connectionsMaxReauthMs == null) retvalSessionLifetimeMs = zeroIfNegative(credentialExpirationMs - authenticationEndMs); else -retvalSessionLifetimeMs = zeroIfNegative( -Math.min(credentialExpirationMs - authenticationEndMs, connectionsMaxReauthMs)); +retvalSessionLifetimeMs = zeroIfNegative(Math.min(credentialExpirationMs - authenticationEndMs, connectionsMaxReauthMs)); -sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs; +if (connectionsMaxReauthMs != null) { Review Comment: > While there is the Authenticator interface where comments suggest that #serverSessionExpirationTimeNanos should be left as null when re-authentication is "disabled" Is [this](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java#L90-L102) javadoc what you mean? From the javadoc, I don't see it said we should return null for re-auth disabled. I think it's OK to return the value when re-auth disabled. > here are some clients or SASL mechanisms where we don't expect reauthentication to ever happen? Yes, it is. You can check [this](https://github.com/edenhill/librdkafka/issues/3304) for reference. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] acsaki commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…
acsaki commented on code in PR #12179: URL: https://github.com/apache/kafka/pull/12179#discussion_r22004 ## clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java: ## @@ -976,8 +975,11 @@ public void testChannelCloseWhileProcessingReceives() throws Exception { SelectionKey selectionKey = mock(SelectionKey.class); when(channel.selectionKey()).thenReturn(selectionKey); when(selectionKey.isValid()).thenReturn(true); +when(selectionKey.isReadable()).thenReturn(true); when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_READ); -selectionKey.attach(channel); +when(selectionKey.attachment()) +.thenReturn(channel) +.thenReturn(null); Review Comment: Same as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] acsaki commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…
acsaki commented on code in PR #12179: URL: https://github.com/apache/kafka/pull/12179#discussion_r21809 ## clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java: ## @@ -781,14 +781,13 @@ public void testConnectDisconnectDuringInSinglePoll() throws Exception { when(kafkaChannel.selectionKey()).thenReturn(selectionKey); when(selectionKey.channel()).thenReturn(SocketChannel.open()); when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_CONNECT); +when(selectionKey.attachment()).thenReturn(kafkaChannel); -selectionKey.attach(kafkaChannel); Set selectionKeys = Utils.mkSet(selectionKey); selector.pollSelectionKeys(selectionKeys, false, System.nanoTime()); assertFalse(selector.connected().contains(kafkaChannel.id())); assertTrue(selector.disconnected().containsKey(kafkaChannel.id())); -assertNull(selectionKey.attachment()); Review Comment: This test failed after my change to use mockitoInline to mock statics. The `selectionKey.attach` line called the mock's method directly (pointless?) while it looked like the actual intention was `selectionKey.attachment()` to return the kafkaChannel. That's how it worked actually. Calling the mock method directly in the test and later asserting on attachment being null seemed confusing to me and the assert actually fails too. (with #attachment returning the kafkaChannel). Maybe this was sort of overbearing, is there a simpler way to fix the test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] acsaki commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…
acsaki commented on code in PR #12179: URL: https://github.com/apache/kafka/pull/12179#discussion_r17853 ## build.gradle: ## @@ -1243,7 +1243,7 @@ project(':clients') { testImplementation libs.bcpkix testImplementation libs.junitJupiter -testImplementation libs.mockitoCore +testImplementation libs.mockitoInline Review Comment: I needed this to mock out ChannelBuilders#createPrincipalBuilder and Sasl#createSaslServer. It's a petty there are these static calls, hard to test them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] acsaki commented on pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…
acsaki commented on PR #12179: URL: https://github.com/apache/kafka/pull/12179#issuecomment-1145812750 > Nice tests! Thank you. Left some comments. Also, I found there are many tests failed with the error: > > ``` > org.opentest4j.AssertionFailedError: Topic [__consumer_offsets] metadata not propagated after 6 ms > ``` > > ref: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12179/4/#showFailuresLink > > I guess maybe it's because we use mock time, instead of system time now? Please help check them. Thank you. Thank you, haven't thought of MockTime. Indeed, lot of tests are failing yet, I've found out that in some test `connections.max.reauth.ms` gets set to 0 instead of being left as null, so my current code sets the session expiration eventually and clients seem to get disconnected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] acsaki commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…
acsaki commented on code in PR #12179: URL: https://github.com/apache/kafka/pull/12179#discussion_r12184 ## clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java: ## @@ -679,10 +679,11 @@ private long calcCompletionTimesAndReturnSessionLifetimeMs() { else if (connectionsMaxReauthMs == null) retvalSessionLifetimeMs = zeroIfNegative(credentialExpirationMs - authenticationEndMs); else -retvalSessionLifetimeMs = zeroIfNegative( -Math.min(credentialExpirationMs - authenticationEndMs, connectionsMaxReauthMs)); +retvalSessionLifetimeMs = zeroIfNegative(Math.min(credentialExpirationMs - authenticationEndMs, connectionsMaxReauthMs)); -sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs; +if (connectionsMaxReauthMs != null) { Review Comment: @showuon this is what I've also found rather confusing. I agree that after the token expires the connection should be closed sooner or later which isn't going to happen when sessionExpirationTimeNanos is not set. While there is the Authenticator interface where comments suggest that #serverSessionExpirationTimeNanos should be left as null when re-authentication is "disabled". Does it make sense for reauth to be disabled? Or rather there are some clients or SASL mechanisms where we don't expect reauthentication to ever happen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
ableegoldman commented on PR #12235: URL: https://github.com/apache/kafka/pull/12235#issuecomment-1145785669 @cadonna ok this is ready for another pass -- besides addressing your comments I made two changes worth noting: 1. I moved the produced/consumed metrics to a new `TopicMetrics` class as they didn't exactly fit into ProcessorNodeMetrics since I realized that due to dynamic topic routing you could have more than one topic produced to by a given sink node. 2. To address your (imo valid) concern about over-counting bytes that were sent to the producer but never made it to the topic, I moved the "-produced" sensor into `RecordCollectorImpl` and have it record inside the `send` callback after we've checked for errors The PR has definitely ballooned in size since you last reviewed it but that's almost all due to the new test coverage for the topic-level metrics. The actual logical changes are still relatively minor, so I'm hoping you can give this a +1 by now (and again, if so, please go ahead and merge it yourself) Thanks for the thorough reviews! :P -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
ableegoldman commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r888782190 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java: ## @@ -114,6 +116,63 @@ public void shouldGetProcessAtSourceSensor() { verifySensor(() -> ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, streamsMetrics)); } +@Test +public void shouldGetRecordsAndBytesConsumedSensor() { +final String recordsMetricNamePrefix = "records-consumed"; +final String bytesMetricNamePrefix = "bytes-consumed"; +final String descriptionOfRecordsTotal = "The total number of records consumed from this topic"; +final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic"; +when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, recordsMetricNamePrefix, RecordingLevel.INFO)) +.thenReturn(expectedSensor); +when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, bytesMetricNamePrefix, RecordingLevel.INFO)) +.thenReturn(expectedSensor); +when(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).thenReturn(tagMap); + +final Map consumedTagMap = new HashMap<>(tagMap); +consumedTagMap.put("topic", TOPIC_NAME); +StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor( Review Comment: Ah, thanks for the explanation. I updated the test as suggested (now in `TopicMetricsTest`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12238: KIP-835: MetadataMaxIdleIntervalMs should be much bigger than brokerHeartbeatIntervalMs
showuon commented on PR #12238: URL: https://github.com/apache/kafka/pull/12238#issuecomment-1145703278 > should we allow the broker to be behind by A records (or time) to unfence the broker? I've similar solution came out yesterday, but then, I don't think this is a good solution because it might break the expectation/assumption that when broker is up, the broker should already catch up all the metadata changes. There are chances the `A` records/time behind contain some important metadata update, right? > Another solution is to require the broker to only catch up to the last committed offset when they last sent the heartbeat. I like this idea. We can make sure the broker already catch up the last committed offset when heartbeat sent, which means, the metadata changes before broker startup are all caught up. +1 for it! Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #12243: MINOR: fix doc
showuon merged PR #12243: URL: https://github.com/apache/kafka/pull/12243 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12210: KAFKA-13930: Add 3.2.0 to core upgrade and compatibility system tests
cadonna merged PR #12210: URL: https://github.com/apache/kafka/pull/12210 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12210: KAFKA-13930: Add 3.2.0 to core upgrade and compatibility system tests
cadonna commented on PR #12210: URL: https://github.com/apache/kafka/pull/12210#issuecomment-1145660378 Build failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12210: KAFKA-13930: Add 3.2.0 to core upgrade and compatibility system tests
cadonna commented on PR #12210: URL: https://github.com/apache/kafka/pull/12210#issuecomment-1145659280 Passing system test run: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-06-02--001.system-test-kafka-branch-builder--1654200227--cadonna--AK13930-add_32_upgrade_comp_tests-core--7818cb8df6/report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
ableegoldman commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r888672094 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ## @@ -199,6 +201,7 @@ public void send(final String topic, log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition); } }); +return recordSizeInBytes(keyBytes == null ? 0 : keyBytes.length, valBytes == null ? 0 : valBytes.length, topic, headers); Review Comment: Yes that's a good point -- I fixed this up by moving the sensors to the `RecordCollectorImpl` class and recording the metric in the callback of the Producer's `send` method only upon success. Thanks for clarifying -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
ableegoldman commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r888670820 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ## @@ -199,6 +201,7 @@ public void send(final String topic, log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition); } }); +return recordSizeInBytes(keyBytes == null ? 0 : keyBytes.length, valBytes == null ? 0 : valBytes.length, topic, headers); Review Comment: I did it like this to avoid the extra/unnecessary null check for consumer records specifically, which already track the serialized size in bytes unlike the producer record. And unfortunately they don't inherit from a common interface/class -- but I added separate middle-man methods to handle them and moved the null check for the producer case there, should be addressed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AnGg98 opened a new pull request, #12247: replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER
AnGg98 opened a new pull request, #12247: URL: https://github.com/apache/kafka/pull/12247 Replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER in tests. Required after the changes merged with https://github.com/apache/kafka/pull/12190 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org