[GitHub] [kafka] showuon merged pull request #12251: MINOR: fix streams tutorial

2022-06-03 Thread GitBox


showuon merged PR #12251:
URL: https://github.com/apache/kafka/pull/12251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13960) ERROR Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed (kafka.log.LogManager)

2022-06-03 Thread rohit k (Jira)
rohit k created KAFKA-13960:
---

 Summary: ERROR Shutdown broker because all log dirs in 
c:\kafka\kafka-logs have failed (kafka.log.LogManager)
 Key: KAFKA-13960
 URL: https://issues.apache.org/jira/browse/KAFKA-13960
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 3.1.1
Reporter: rohit k
 Fix For: 3.1.1
 Attachments: kafka error.png

Am using Kafka for chat application with Django rest framework and react 
native. while building my API I need to make an API for group deletion. 
Whenever , I create a group , it will make a NewTopic And i made a group 
deletion where i will delete that topic . from here, I got an error and cannot 
get back. whenever , i start kafka server am getting issue " ERROR Shutdown 
broker because all log dirs in c:\kafka\kafka-logs have failed 
(kafka.log.LogManager)". I tried changing logs dirs but still error exists.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13960) ERROR Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed (kafka.log.LogManager)

2022-06-03 Thread rohit k (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

rohit k updated KAFKA-13960:

Issue Type: Test  (was: Bug)

> ERROR Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed 
> (kafka.log.LogManager)
> 
>
> Key: KAFKA-13960
> URL: https://issues.apache.org/jira/browse/KAFKA-13960
> Project: Kafka
>  Issue Type: Test
>  Components: admin
>Affects Versions: 3.1.1
>Reporter: rohit k
>Priority: Critical
> Fix For: 3.1.1
>
> Attachments: kafka error.png
>
>
> Am using Kafka for chat application with Django rest framework and react 
> native. while building my API I need to make an API for group deletion. 
> Whenever , I create a group , it will make a NewTopic And i made a group 
> deletion where i will delete that topic . from here, I got an error and 
> cannot get back. whenever , i start kafka server am getting issue " ERROR 
> Shutdown broker because all log dirs in c:\kafka\kafka-logs have failed 
> (kafka.log.LogManager)". I tried changing logs dirs but still error exists.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13598) idempotence producer is not enabled by default if not set explicitly

2022-06-03 Thread Derek Troy-West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547818#comment-17547818
 ] 

Derek Troy-West commented on KAFKA-13598:
-

Very happy to [~showuon] I will just copy points 1-4 into the notes if that's 
ok?

I'll raise a PR later this afternoon. Should I raise that PR on the apache-site 
github repository?

> idempotence producer is not enabled by default if not set explicitly
> 
>
> Key: KAFKA-13598
> URL: https://issues.apache.org/jira/browse/KAFKA-13598
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.1, 3.2.0, 3.1.1
>
>
> In KAFKA-10619, we intended to enable idempotence by default, but this was 
> not achieved due to a bug in the config validation logic. The change from 
> acks=1 to acks=all worked correctly, however.
> This is the following up for KIP-679: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default]
>  
> Note: In KAFKA-13673, we'll disable idempotent producer when 
> acks/retries/max.in.flight config conflicts, to avoid breaking existing 
> producers.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ocadaruma commented on a diff in pull request #12251: MINOR: fix streams tutorial

2022-06-03 Thread GitBox


ocadaruma commented on code in PR #12251:
URL: https://github.com/apache/kafka/pull/12251#discussion_r889457352


##
docs/streams/tutorial.html:
##
@@ -490,25 +490,25 @@ Writing a th
 Processor: KSTREAM-FLATMAPVALUES-01(stores: []) --> 
KSTREAM-KEY-SELECT-02 <-- KSTREAM-SOURCE-00
 Processor: KSTREAM-KEY-SELECT-02(stores: []) --> 
KSTREAM-FILTER-05 <-- KSTREAM-FLATMAPVALUES-01
 Processor: KSTREAM-FILTER-05(stores: []) --> 
KSTREAM-SINK-04 <-- KSTREAM-KEY-SELECT-02
-Sink: KSTREAM-SINK-04(topic: Counts-repartition) <-- 
KSTREAM-FILTER-05
+Sink: KSTREAM-SINK-04(topic: counts-store-repartition) <-- 
KSTREAM-FILTER-05
   Sub-topology: 1
-Source: KSTREAM-SOURCE-06(topics: Counts-repartition) --> 
KSTREAM-AGGREGATE-03
-Processor: KSTREAM-AGGREGATE-03(stores: [Counts]) --> 
KTABLE-TOSTREAM-07 <-- KSTREAM-SOURCE-06
+Source: KSTREAM-SOURCE-06(topics: counts-store-repartition) --> 
KSTREAM-AGGREGATE-03
+Processor: KSTREAM-AGGREGATE-03(stores: [counts-store]) --> 
KTABLE-TOSTREAM-07 <-- KSTREAM-SOURCE-06
 Processor: KTABLE-TOSTREAM-07(stores: []) --> 
KSTREAM-SINK-08 <-- KSTREAM-AGGREGATE-03
 Sink: KSTREAM-SINK-08(topic: streams-wordcount-output) <-- 
KTABLE-TOSTREAM-07
 Global Stores:
   none
 
 
 As we can see above, the topology now contains two disconnected 
sub-topologies.
-The first sub-topology's sink node 
KSTREAM-SINK-04 will write to a repartition topic 
Counts-repartition,
+The first sub-topology's sink node 
KSTREAM-SINK-04 will write to a repartition topic 
counts-repartition,

Review Comment:
   Oh that's right. Let me fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13926) Proposal to have "HasField" predicate for kafka connect

2022-06-03 Thread Kumud Kumar Srivatsava Tirupati (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kumud Kumar Srivatsava Tirupati resolved KAFKA-13926.
-
Resolution: Won't Fix

Dropping in favor of improving the existing SMTs as per the discussion.

https://lists.apache.org/thread/odbj7793plyz7xxyy6d71c3xn7zng49f

> Proposal to have "HasField" predicate for kafka connect
> ---
>
> Key: KAFKA-13926
> URL: https://issues.apache.org/jira/browse/KAFKA-13926
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Kumud Kumar Srivatsava Tirupati
>Assignee: Kumud Kumar Srivatsava Tirupati
>Priority: Major
>
> Hello,
> Today's connect predicates enables checks on the record metadata. However, 
> this can be limiting considering {*}many inbuilt and custom transformations 
> that we (community) use are more key/value centric{*}.
> Some use-cases this can solve:
>  * Data type conversions of certain pre-identified fields for records coming 
> across datasets only if those fields exist. [Ex: TimestampConverter can be 
> run only if the specified date field exists irrespective of the record 
> metadata]
>  * Skip running certain transform if a given field does/does not exist. A lot 
> of inbuilt transforms raise exceptions (Ex: InsertField transform if the 
> field already exists) thereby breaking the task. Giving this control enable 
> users to consciously configure for such cases.
>  * Even though some inbuilt transforms explicitly handle these cases, it 
> would still be an unnecessary pass-through loop.
>  * Considering each connector usually deals with multiple datasets (Even 100s 
> for a database CDC connector), metadata-centric predicate checking will be 
> somewhat limiting when we talk about such pre-identified custom metadata 
> fields in the records.
> I know some of these cases can be handled within the transforms itself but 
> that defeats the purpose of having predicates.
> We have built this predicate for us and it is found to be extremely helpful. 
> Please let me know your thoughts on the same so that I can raise a PR.
>  
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-845%3A+%27HasField%27+predicate+for+kafka+connect



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon commented on a diff in pull request #12251: MINOR: fix streams tutorial

2022-06-03 Thread GitBox


showuon commented on code in PR #12251:
URL: https://github.com/apache/kafka/pull/12251#discussion_r889451065


##
docs/streams/tutorial.html:
##
@@ -490,25 +490,25 @@ Writing a th
 Processor: KSTREAM-FLATMAPVALUES-01(stores: []) --> 
KSTREAM-KEY-SELECT-02 <-- KSTREAM-SOURCE-00
 Processor: KSTREAM-KEY-SELECT-02(stores: []) --> 
KSTREAM-FILTER-05 <-- KSTREAM-FLATMAPVALUES-01
 Processor: KSTREAM-FILTER-05(stores: []) --> 
KSTREAM-SINK-04 <-- KSTREAM-KEY-SELECT-02
-Sink: KSTREAM-SINK-04(topic: Counts-repartition) <-- 
KSTREAM-FILTER-05
+Sink: KSTREAM-SINK-04(topic: counts-store-repartition) <-- 
KSTREAM-FILTER-05
   Sub-topology: 1
-Source: KSTREAM-SOURCE-06(topics: Counts-repartition) --> 
KSTREAM-AGGREGATE-03
-Processor: KSTREAM-AGGREGATE-03(stores: [Counts]) --> 
KTABLE-TOSTREAM-07 <-- KSTREAM-SOURCE-06
+Source: KSTREAM-SOURCE-06(topics: counts-store-repartition) --> 
KSTREAM-AGGREGATE-03
+Processor: KSTREAM-AGGREGATE-03(stores: [counts-store]) --> 
KTABLE-TOSTREAM-07 <-- KSTREAM-SOURCE-06
 Processor: KTABLE-TOSTREAM-07(stores: []) --> 
KSTREAM-SINK-08 <-- KSTREAM-AGGREGATE-03
 Sink: KSTREAM-SINK-08(topic: streams-wordcount-output) <-- 
KTABLE-TOSTREAM-07
 Global Stores:
   none
 
 
 As we can see above, the topology now contains two disconnected 
sub-topologies.
-The first sub-topology's sink node 
KSTREAM-SINK-04 will write to a repartition topic 
Counts-repartition,
+The first sub-topology's sink node 
KSTREAM-SINK-04 will write to a repartition topic 
counts-repartition,

Review Comment:
   Should it be `counts-repartition` -> `counts-store-repartition` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13598) idempotence producer is not enabled by default if not set explicitly

2022-06-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547813#comment-17547813
 ] 

Luke Chen commented on KAFKA-13598:
---

[~d-t-w] , yes, adding a note to notable changes in 3.2.0 section is a good 
idea. Are you interested in submitting a PR for it? Thanks.

> idempotence producer is not enabled by default if not set explicitly
> 
>
> Key: KAFKA-13598
> URL: https://issues.apache.org/jira/browse/KAFKA-13598
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.1, 3.2.0, 3.1.1
>
>
> In KAFKA-10619, we intended to enable idempotence by default, but this was 
> not achieved due to a bug in the config validation logic. The change from 
> acks=1 to acks=all worked correctly, however.
> This is the following up for KIP-679: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default]
>  
> Note: In KAFKA-13673, we'll disable idempotent producer when 
> acks/retries/max.in.flight config conflicts, to avoid breaking existing 
> producers.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13598) idempotence producer is not enabled by default if not set explicitly

2022-06-03 Thread Derek Troy-West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547803#comment-17547803
 ] 

Derek Troy-West commented on KAFKA-13598:
-

This represents a breaking change where:
 # Broker version is < 2.8.0
 # Cluster has ACL configured, but no IDEMPOTENT_WRITE permission set
 # Producer has default configuration (or no config captured in KAFKA-13673)
 # Kafka Client library version is bumped to 3.2.0

The resulting production error has a ClusterAuthorizationException but it 
wasn't clear that a new ACL was required until we found KAFKA-13759.

I think 1-3 might be quite a common scenario for Kafka users in the wild, and 4 
is very easy to do on the application side.

Perhaps it might be an idea to add a note to the 3.2.0 upgrade guide or release 
notes about this one.

More info: https://kpow.io/articles/kafka-producer-breaking-change/

> idempotence producer is not enabled by default if not set explicitly
> 
>
> Key: KAFKA-13598
> URL: https://issues.apache.org/jira/browse/KAFKA-13598
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.1, 3.2.0, 3.1.1
>
>
> In KAFKA-10619, we intended to enable idempotence by default, but this was 
> not achieved due to a bug in the config validation logic. The change from 
> acks=1 to acks=all worked correctly, however.
> This is the following up for KIP-679: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default]
>  
> Note: In KAFKA-13673, we'll disable idempotent producer when 
> acks/retries/max.in.flight config conflicts, to avoid breaking existing 
> producers.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-13598) idempotence producer is not enabled by default if not set explicitly

2022-06-03 Thread Derek Troy-West (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547803#comment-17547803
 ] 

Derek Troy-West edited comment on KAFKA-13598 at 6/4/22 2:02 AM:
-

This represents a breaking change where:
 # Broker version is < 2.8.0
 # Cluster has ACL configured, but no IDEMPOTENT_WRITE permission set
 # Producer has default configuration (or no config captured in KAFKA-13673)
 # Kafka Client library version is bumped to 3.2.0 in the producing application

The resulting production error has a ClusterAuthorizationException but it 
wasn't clear that a new ACL was required until we found KAFKA-13759.

I think 1-3 might be quite a common scenario for Kafka users in the wild, and 4 
is very easy to do on the application side.

Perhaps it might be an idea to add a note to the 3.2.0 upgrade guide or release 
notes about this one.

More info: [https://kpow.io/articles/kafka-producer-breaking-change/]


was (Author: d-t-w):
This represents a breaking change where:
 # Broker version is < 2.8.0
 # Cluster has ACL configured, but no IDEMPOTENT_WRITE permission set
 # Producer has default configuration (or no config captured in KAFKA-13673)
 # Kafka Client library version is bumped to 3.2.0

The resulting production error has a ClusterAuthorizationException but it 
wasn't clear that a new ACL was required until we found KAFKA-13759.

I think 1-3 might be quite a common scenario for Kafka users in the wild, and 4 
is very easy to do on the application side.

Perhaps it might be an idea to add a note to the 3.2.0 upgrade guide or release 
notes about this one.

More info: https://kpow.io/articles/kafka-producer-breaking-change/

> idempotence producer is not enabled by default if not set explicitly
> 
>
> Key: KAFKA-13598
> URL: https://issues.apache.org/jira/browse/KAFKA-13598
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.1, 3.2.0, 3.1.1
>
>
> In KAFKA-10619, we intended to enable idempotence by default, but this was 
> not achieved due to a bug in the config validation logic. The change from 
> acks=1 to acks=all worked correctly, however.
> This is the following up for KIP-679: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default]
>  
> Note: In KAFKA-13673, we'll disable idempotent producer when 
> acks/retries/max.in.flight config conflicts, to avoid breaking existing 
> producers.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on pull request #12236: MINOR: Use right enum value for broker registration change

2022-06-03 Thread GitBox


dengziming commented on PR #12236:
URL: https://github.com/apache/kafka/pull/12236#issuecomment-1146478325

   > @dengziming I merge `trunk` to try to kick off a build.
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-03 Thread GitBox


dengziming commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r889424223


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+private final String topic;

Review Comment:
   I think It's better to add partition here because we are making way for 
multi-raft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-03 Thread GitBox


dengziming commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r889423970


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
 }
 }
 
+@Test
+public void testDescribeMetadataQuorumSuccess() throws Exception {
+try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   We can't use `NodeApiVersions.create()` here due to a bug here, which I'm 
trying to fix here: https://github.com/apache/kafka/pull/11784



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ocadaruma opened a new pull request, #12251: MINOR: fix streams tutorial

2022-06-03 Thread GitBox


ocadaruma opened a new pull request, #12251:
URL: https://github.com/apache/kafka/pull/12251

   - Fix inconsistency in store name in streams tutorial
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13935) Factor out static IBP usages from broker

2022-06-03 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-13935:
-
Summary: Factor out static IBP usages from broker  (was: Factor out IBP 
from Partition and Log code)

> Factor out static IBP usages from broker
> 
>
> Key: KAFKA-13935
> URL: https://issues.apache.org/jira/browse/KAFKA-13935
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.3.0
>Reporter: David Arthur
>Priority: Major
>
> We pass the IBP down to the log layer for checking things like compression 
> support. Currently, we are still reading this from KafkaConfig. In ZK mode 
> this is fine, but in KRaft mode, reading the IBP from the config is not 
> supported.
> Since KRaft only supports IBP/MetadataVersion greater than 3.0 (which 
> supports the compression mode we check for), we may be able to avoid using a 
> dynamic call and/or volatile to get the current version. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889363536


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -548,14 +549,18 @@ public boolean poll(Timer timer, boolean 
waitForJoinGroup) {
 }
 }
 } else {
-// For manually assigned partitions, if coordinator is unknown, 
make sure we lookup one and await metadata.
+// For manually assigned partitions, we do not try to pro-actively 
lookup coordinator;
+// instead we only try to refresh metadata when necessary.
 // If connections to all nodes fail, wakeups triggered while 
attempting to send fetch
 // requests result in polls returning immediately, causing a tight 
loop of polls. Without
 // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
 // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-if (coordinatorUnknownAndUnready(timer)) {
-return false;
+if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
+client.awaitMetadataUpdate(timer);
 }
+
+// if there is pending coordinator requests, ensure they have a 
chance to be transmitted.

Review Comment:
   This is a major change while addressing @dajac 's comment: previously the 
manual assignment, the `coordinator.poll` call would not call 
`networkClient.poll`, which means that if the coordinator discovery does not 
complete within the `commitAsync` (note we call `networkClient.poll` twice in 
that function, so it's possible that function would complete the discovery), we 
would have no other places to poll the network client to complete the pending 
requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889361765


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -514,9 +514,50 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());
 
-// should find an available node in next find coordinator request
+// should not try to find coordinator since we are in manual assignment
+// hence the prepared response should not be returned
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.poll(time.timer(Long.MAX_VALUE));
+assertTrue(coordinator.coordinatorUnknown());
+}
+
+@Test
+public void testAutoCommitAsyncWithUserAssignedType() {
+try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+) {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE 
error
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.COORDINATOR_NOT_AVAILABLE));
+// set timeout to 0 because we don't want to retry after the error
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());
+
+// elapse auto commit interval and set committable position
+time.sleep(autoCommitIntervalMs);
+subscriptions.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));
+
+// should try to find coordinator since we are auto committing
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+coordinator.poll(time.timer(Long.MAX_VALUE));
+assertFalse(coordinator.coordinatorUnknown());
+}
+}
+
+@Test
+public void testCommitAsyncWithUserAssignedType() {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE 
error
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.COORDINATOR_NOT_AVAILABLE));
+// set timeout to 0 because we don't want to retry after the error
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-03 Thread GitBox


ableegoldman commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r889358327


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java:
##
@@ -388,6 +401,40 @@ private String nodeSensorPrefix(final String threadId, 
final String taskId, fina
 + SENSOR_PREFIX_DELIMITER + SENSOR_NODE_LABEL + 
SENSOR_PREFIX_DELIMITER + processorNodeName;
 }
 
+public Sensor topicLevelSensor(final String threadId,
+   final String taskId,
+   final String processorNodeName,
+   final String topicName,

Review Comment:
   I did update it to include the `topic` tag already, but I haven't updated it 
yet to clarify that we are introducing these at a new "topic-level" metrics 
scope. I'll do that right now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13944) Shutting down broker can be elected as partition leader in KRaft

2022-06-03 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17547157#comment-17547157
 ] 

Jose Armando Garcia Sancio commented on KAFKA-13944:


Looks like this issue is addressed by 
https://issues.apache.org/jira/browse/KAFKA-13916

 

> Shutting down broker can be elected as partition leader in KRaft
> 
>
> Key: KAFKA-13944
> URL: https://issues.apache.org/jira/browse/KAFKA-13944
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
>
> When a broker requests shutdown, it transitions to the CONTROLLED_SHUTDOWN 
> state in the controller. It is possible for the broker to remain unfenced in 
> this state until the controlled shutdown completes. When doing an election, 
> the only thing we generally check is that the broker is unfenced, so this 
> means we can elect a broker that is in controlled shutdown. 
> Here are a few snippets from a recent system test in which this occurred:
> {code:java}
> // broker 2 starts controlled shutdown
> [2022-05-26 21:17:26,451] INFO [Controller 3001] Unfenced broker 2 has 
> requested and been granted a controlled shutdown. 
> (org.apache.kafka.controller.BrokerHeartbeatManager)
>  
> // there is only one replica, so we set leader to -1
> [2022-05-26 21:17:26,452] DEBUG [Controller 3001] partition change for _foo-1 
> with topic ID _iUQ72T_R4mmZgI3WrsyXw: leader: 2 -> -1, leaderEpoch: 0 -> 1, 
> partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager)
> // controlled shutdown cannot complete immediately
> [2022-05-26 21:17:26,529] DEBUG [Controller 3001] The request from broker 2 
> to shut down can not yet be granted because the lowest active offset 177 is 
> not greater than the broker's shutdown offset 244. 
> (org.apache.kafka.controller.BrokerHeartbeatManager)
> [2022-05-26 21:17:26,530] DEBUG [Controller 3001] Updated the controlled 
> shutdown offset for broker 2 to 244. 
> (org.apache.kafka.controller.BrokerHeartbeatManager)
> // later on we elect leader 2 again
> [2022-05-26 21:17:27,703] DEBUG [Controller 3001] partition change for _foo-1 
> with topic ID _iUQ72T_R4mmZgI3WrsyXw: leader: -1 -> 2, leaderEpoch: 1 -> 2, 
> partitionEpoch: 1 -> 2 (org.apache.kafka.controller.ReplicationControlManager)
> // now controlled shutdown is stuck because of the newly elected leader
> [2022-05-26 21:17:28,531] DEBUG [Controller 3001] Broker 2 is in controlled 
> shutdown state, but can not shut down because more leaders still need to be 
> moved. (org.apache.kafka.controller.BrokerHeartbeatManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] jsancio commented on pull request #12236: MINOR: Use right enum value for broker registration change

2022-06-03 Thread GitBox


jsancio commented on PR #12236:
URL: https://github.com/apache/kafka/pull/12236#issuecomment-1146366630

   @dengziming I merge `trunk` to try to kick off a build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889351949


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -514,9 +514,50 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());
 
-// should find an available node in next find coordinator request
+// should not try to find coordinator since we are in manual assignment
+// hence the prepared response should not be returned
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.poll(time.timer(Long.MAX_VALUE));
+assertTrue(coordinator.coordinatorUnknown());
+}
+
+@Test
+public void testAutoCommitAsyncWithUserAssignedType() {
+try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889350182


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -553,8 +554,8 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
 // requests result in polls returning immediately, causing a tight 
loop of polls. Without
 // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
 // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-if (coordinatorUnknownAndUnready(timer)) {
-return false;
+if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-06-03 Thread Jim Hughes (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jim Hughes updated KAFKA-13873:
---
Fix Version/s: 3.3.0

> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: kip
> Fix For: 3.3.0
>
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.
> KIP-834: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] aartigao closed pull request #10929: KAFKA-12995 [WIP] Allow old Broker compatibility for Metadata calls

2022-06-03 Thread GitBox


aartigao closed pull request #10929: KAFKA-12995 [WIP] Allow old Broker 
compatibility for Metadata calls
URL: https://github.com/apache/kafka/pull/10929


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12245: KAFKA-13410; Add a --release-version flag for storage-tool

2022-06-03 Thread GitBox


jsancio commented on code in PR #12245:
URL: https://github.com/apache/kafka/pull/12245#discussion_r889255627


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -162,7 +163,7 @@ Found problem:
   val stream = new ByteArrayOutputStream()
   assertEquals(0, StorageTool.
 formatCommand(new PrintStream(stream), Seq(tempDir.toString), 
metaProperties, MetadataVersion.latest(), ignoreFormatted = false))
-  assertEquals("Formatting %s%n".format(tempDir), stream.toString())
+  assertTrue(stream.toString().startsWith("Formatting %s".format(tempDir)))

Review Comment:
   Minor but you can use Scala string interpolation. E.g. `s"Formatting 
$tempDir"`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13959) Controller should unfence Broker with busy metadata log

2022-06-03 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17546695#comment-17546695
 ] 

Jose Armando Garcia Sancio commented on KAFKA-13959:


[~dengziming] [~showuon] Are you interested in working on this issue?

> Controller should unfence Broker with busy metadata log
> ---
>
> Key: KAFKA-13959
> URL: https://issues.apache.org/jira/browse/KAFKA-13959
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
>
> https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible 
> for the controller to not unfence a broker if the committed offset keeps 
> increasing.
>  
> One solution to this problem is to require the broker to only catch up to the 
> last committed offset when they last sent the heartbeat. For example:
>  # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit 
> offset is {{{}X{}}}. The controller remember this last commit offset, call it 
> {{X'}}
>  # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence 
> the broker if {{Z >= X}} or {{{}Z >= X'{}}}.
>  
> This change should also set the default for MetadataMaxIdleIntervalMs back to 
> 500.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13959) Controller should unfence Broker with busy metadata log

2022-06-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13959:
---
Description: 
https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible 
for the controller to not unfence a broker if the committed offset keeps 
increasing.

 

One solution to this problem is to require the broker to only catch up to the 
last committed offset when they last sent the heartbeat. For example:
 # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit 
offset is {{{}X{}}}. The controller remember this last commit offset, call it 
{{X'}}
 # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence the 
broker if {{Z >= X}} or {{{}Z >= X'{}}}.

 

This change should also set the default for MetadataMaxIdleIntervalMs back to 
500.

  was:
https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible 
for the controller to not unfence a broker if the committed offset keeps 
increasing.

 

One solution to this problem is to require the broker to only catch up to the 
last committed offset when they last sent the heartbeat. For example:
 # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit 
offset is {{{}X{}}}. The controller remember this last commit offset, call it 
{{X'}}
 # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence the 
broker if {{Z >= X}} or {{{}Z >= X'{}}}.


> Controller should unfence Broker with busy metadata log
> ---
>
> Key: KAFKA-13959
> URL: https://issues.apache.org/jira/browse/KAFKA-13959
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
>
> https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible 
> for the controller to not unfence a broker if the committed offset keeps 
> increasing.
>  
> One solution to this problem is to require the broker to only catch up to the 
> last committed offset when they last sent the heartbeat. For example:
>  # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit 
> offset is {{{}X{}}}. The controller remember this last commit offset, call it 
> {{X'}}
>  # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence 
> the broker if {{Z >= X}} or {{{}Z >= X'{}}}.
>  
> This change should also set the default for MetadataMaxIdleIntervalMs back to 
> 500.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] jsancio commented on pull request #12238: KAFKA-13955: Fix failing KRaftClusterTest tests

2022-06-03 Thread GitBox


jsancio commented on PR #12238:
URL: https://github.com/apache/kafka/pull/12238#issuecomment-1146263145

   I merged this PR to fix the tests and I created 
https://issues.apache.org/jira/browse/KAFKA-13959.
   
   > @showuon Yes, after we reach a consensus here, we should update the KIP 
and note it in the KIP discussion thread.
   
   @dengziming @showuon My preference is to fix KAFKA-13959 and revert this 
commit before we ship 3.3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13959) Controller should unfence Broker with busy metadata log

2022-06-03 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13959:
--

 Summary: Controller should unfence Broker with busy metadata log
 Key: KAFKA-13959
 URL: https://issues.apache.org/jira/browse/KAFKA-13959
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.3.0
Reporter: Jose Armando Garcia Sancio


https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible 
for the controller to not unfence a broker if the committed offset keeps 
increasing.

 

One solution to this problem is to require the broker to only catch up to the 
last committed offset when they last sent the heartbeat. For example:
 # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit 
offset is {{{}X{}}}. The controller remember this last commit offset, call it 
{{X'}}
 # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence the 
broker if {{Z >= X}} or {{{}Z >= X'{}}}.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-03 Thread GitBox


niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r889238615


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
 }
 }
 
+@Test
+public void testDescribeMetadataQuorumSuccess() throws Exception {
+try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   Not sure about this comment. I think I would need to add the 
`DESCRIBE_QUORUM` API to the list of known APIs even if we used the default 
constructor, right? If I do not do that, the test encounters an error where it 
claims to not know about the `DESCRIBE_QUORUM` API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #12238: KAFKA-13955: Fix failing KRaftClusterTest tests

2022-06-03 Thread GitBox


jsancio merged PR #12238:
URL: https://github.com/apache/kafka/pull/12238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12238: KIP-835: MetadataMaxIdleIntervalMs should be much bigger than brokerHeartbeatIntervalMs

2022-06-03 Thread GitBox


jsancio commented on PR #12238:
URL: https://github.com/apache/kafka/pull/12238#issuecomment-1146244214

   These are the failing tests:
   ```
   
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testRemovingWorker
   Build / JDK 8 and Scala 2.12 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   ```
   They look unrelated to KRaft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-03 Thread GitBox


niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r889220152


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+Integer partition = 0;
+String topicName = response.getTopicNameByIndex(0);
+Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+response.getVoterInfo(topicName, partition).forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+OptionalLong.of(v.lastFetchTimestamp()),
+OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+response.getObserverInfo(topicName, partition).forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+OptionalLong.of(o.lastFetchTimestamp()),
+OptionalLong.of(o.lastCaughtUpTimestamp(;
+});
+QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, 
observers);
+return info;
+}
+
+@Override
+DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+return new Builder(DescribeQuorumRequest.singletonRequest(
+new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition(;
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
+try {
+if (quorumResponse.data().errorCode() != 
Errors.NONE.code()) {
+throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();
+}
+if (quorumResponse.data().topics().size() > 1) {
+log.error("DescribeMetadataQuorum received {} topics 
when 1 was expected",
+quorumResponse.data().topics().size());
+throw new UnknownServerException();

Review Comment:
   Good point about adding the message to the exception being returned.
   
   The behavior of the code block is equivalent to completing the future and 
returning. I just didn't want to repeat the future completion code. Is there a 
well known convention to handle code blocks like this in JAVA?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13936) Invalid consumer lag when monitoring from a kafka streams application

2022-06-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17546407#comment-17546407
 ] 

Matthias J. Sax commented on KAFKA-13936:
-

The docs are in the same repository as the code: 
[https://github.com/apache/kafka/tree/trunk/docs]

A good place might be the consumer monitoring section 
[https://kafka.apache.org/documentation/#consumer_fetch_monitoring] ?

> Invalid consumer lag when monitoring from a kafka streams application
> -
>
> Key: KAFKA-13936
> URL: https://issues.apache.org/jira/browse/KAFKA-13936
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have a kafka streams application and I'm trying to monitor the consumer lag 
> via stream metrics.
> Here's some code snippet
> {code:java}
> metrics = streams.metrics();
> lag = 0;
> for (Metric m : metrics.values()) {
> tags = m.metricName().tags();
> if ( m.metricName().name().equals(MONITOR_CONSUMER_LAG) && 
> tags.containsKey(MONTOR_TAG_TOPIC) && 
> tags.get(MONTOR_TAG_TOPIC).equals(inputTopic) ) {
> partitionLag = 
> Float.valueOf(m.metricValue().toString()).floatValue();
> if ( !partitionLag.isNaN() ) {
> lag += partitionLag;
> }
> }
> }
> {code}
> Here MONITOR_CONSUMER_LAG is {{{}records-lag-max{}}}.
> However these numbers dont match with the consumer lag we see in the kafka UI 
> . is records-lag-max the right metric to track for a kafka streams 
> application when the objective is to get consumer lag?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-03 Thread GitBox


guozhangwang commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r889168252


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java:
##
@@ -388,6 +401,40 @@ private String nodeSensorPrefix(final String threadId, 
final String taskId, fina
 + SENSOR_PREFIX_DELIMITER + SENSOR_NODE_LABEL + 
SENSOR_PREFIX_DELIMITER + processorNodeName;
 }
 
+public Sensor topicLevelSensor(final String threadId,
+   final String taskId,
+   final String processorNodeName,
+   final String topicName,

Review Comment:
   I cannot access the apache wiki atm (seems down temporarily) but just double 
checking is topic name part of the tag in the KIP proposal as well? If not we 
should update the wiki and the KIP thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12238: KIP-835: MetadataMaxIdleIntervalMs should be much bigger than brokerHeartbeatIntervalMs

2022-06-03 Thread GitBox


cmccabe commented on PR #12238:
URL: https://github.com/apache/kafka/pull/12238#issuecomment-1146163957

   > Another solution is to require the broker to only catch up to the last 
committed offset when they last sent the heartbeat.
   
   I like this solution too. However, there are some complexities here (we'd 
want to make sure the heartbeat wasn't too long ago)
   
   Bumping the no-op timeout to might be a good quick fix until we have time to 
implement that (although I wonder why we can't use 4 seconds rather than 5?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-06-03 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17546196#comment-17546196
 ] 

Jun Rao commented on KAFKA-13953:
-

[~abasilbr] : If you have multiple replicas, it's unlikely all replicas are 
corrupted. You can just delete the corrupted replica without losing data. Also, 
is this transient? The corruption could also be caused during network transfer.

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13803) Refactor Leader API Access

2022-06-03 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13803.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

merged the PR to trunk

> Refactor Leader API Access
> --
>
> Key: KAFKA-13803
> URL: https://issues.apache.org/jira/browse/KAFKA-13803
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Rittika Adhikari
>Assignee: Rittika Adhikari
>Priority: Major
> Fix For: 3.3.0
>
>
> Currently, AbstractFetcherThread has a series of protected APIs which control 
> access to the Leader. ReplicaFetcherThread and ReplicaAlterLogDirsThread 
> respectively override these protected APIs and handle access to the Leader in 
> a remote broker leader and a local leader context.
> We propose to move these protected APIs to a LeaderEndPoint interface, which 
> will serve all fetches from the Leader. We will implement a 
> RemoteLeaderEndPoint and a LocalLeaderEndPoint accordingly. This change will 
> greatly simplify our existing follower fetch code.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] junrao merged pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-06-03 Thread GitBox


junrao merged PR #12005:
URL: https://github.com/apache/kafka/pull/12005


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #12249: MINOR; Test last committed record offset for Controllers

2022-06-03 Thread GitBox


jsancio opened a new pull request, #12249:
URL: https://github.com/apache/kafka/pull/12249

   As part of KIP-835, LastCommittedRecordOffset was added to the
   KafkaController metric type. Make sure to test that metric.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…

2022-06-03 Thread GitBox


mimaison opened a new pull request, #12248:
URL: https://github.com/apache/kafka/pull/12248

   …-827)
   
   This implements KIP-827: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+logdirs+total+and+usable+space+via+Kafka+API
   
   Add TotalBytes and UsableBytes to DescribeLogDirsResponse
   Add matching getters on LogDirDescription
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12247: MINOR: replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER

2022-06-03 Thread GitBox


dajac merged PR #12247:
URL: https://github.com/apache/kafka/pull/12247


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12247: MINOR: replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER

2022-06-03 Thread GitBox


dajac commented on PR #12247:
URL: https://github.com/apache/kafka/pull/12247#issuecomment-1146095417

   @AnGg98 Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AnGg98 commented on pull request #12247: MINOR: replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER

2022-06-03 Thread GitBox


AnGg98 commented on PR #12247:
URL: https://github.com/apache/kafka/pull/12247#issuecomment-1146080043

   Test run successful: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-06-03--001.system-test-kafka-branch-builder--1654259402--AnGg98--acl-authorizer--65a5a3106e/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13883) KIP-835: Monitor Quorum

2022-06-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-13883.

Resolution: Fixed

> KIP-835: Monitor Quorum
> ---
>
> Key: KAFKA-13883
> URL: https://issues.apache.org/jira/browse/KAFKA-13883
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Tracking issue for the implementation of KIP-835.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13918) Schedule or cancel nooprecord write on metadata version change

2022-06-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-13918.

Resolution: Duplicate

> Schedule or cancel nooprecord write on metadata version change
> --
>
> Key: KAFKA-13918
> URL: https://issues.apache.org/jira/browse/KAFKA-13918
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] vvcephei commented on pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-03 Thread GitBox


vvcephei commented on PR #12235:
URL: https://github.com/apache/kafka/pull/12235#issuecomment-1146046545

   @ableegoldman , you might want to take a look at the build I cancelled. 
Before it got stuck, there were a large number of failures: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12235/9/#showFailuresLink
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-03 Thread GitBox


vvcephei commented on PR #12235:
URL: https://github.com/apache/kafka/pull/12235#issuecomment-1146044849

   The prior build was hung. I cancelled it so the latest commit could be built.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12244: HOTFIX: only try to clear discover-coordinator future upon commit

2022-06-03 Thread GitBox


dajac commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889014500


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -514,9 +514,50 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());
 
-// should find an available node in next find coordinator request
+// should not try to find coordinator since we are in manual assignment
+// hence the prepared response should not be returned
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.poll(time.timer(Long.MAX_VALUE));
+assertTrue(coordinator.coordinatorUnknown());
+}
+
+@Test
+public void testAutoCommitAsyncWithUserAssignedType() {
+try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+) {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE 
error
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.COORDINATOR_NOT_AVAILABLE));
+// set timeout to 0 because we don't want to retry after the error
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());
+
+// elapse auto commit interval and set committable position
+time.sleep(autoCommitIntervalMs);
+subscriptions.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));
+
+// should try to find coordinator since we are auto committing
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+coordinator.poll(time.timer(Long.MAX_VALUE));
+assertFalse(coordinator.coordinatorUnknown());
+}
+}
+
+@Test
+public void testCommitAsyncWithUserAssignedType() {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE 
error
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.COORDINATOR_NOT_AVAILABLE));
+// set timeout to 0 because we don't want to retry after the error
+coordinator.poll(time.timer(0));
+assertTrue(coordinator.coordinatorUnknown());

Review Comment:
   Should we also assert that there is not inflight requests after calling poll?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -514,9 +514,50 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());
 
-// should find an available node in next find coordinator request
+// should not try to find coordinator since we are in manual assignment
+// hence the prepared response should not be returned
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.poll(time.timer(Long.MAX_VALUE));
+assertTrue(coordinator.coordinatorUnknown());
+}
+
+@Test
+public void testAutoCommitAsyncWithUserAssignedType() {
+try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
+) {
+subscriptions.assignFromUser(Collections.singleton(t1p));
+// should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE 
error
+client.prepareResponse(groupCoordinatorResponse(node, 
Errors.COORDINATOR_NOT_AVAILABLE));

Review Comment:
   Same question as before.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -553,8 +554,8 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
 // requests result in polls returning immediately, causing a tight 
loop of polls. Without
 // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
 // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-if (coordinatorUnknownAndUnready(timer)) {
-return false;
+if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {

Review Comment:
   nit: I think that we need to remove `if coordinator is unknown, make sure we 
lookup one and` from the above comment (first sentence).



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -514,9 +514,50 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
 coordinator.poll(time.timer(0));
 

[jira] [Assigned] (KAFKA-13908) RuntimeException will be thrown in BrokerServer.startup, not the cause of exception

2022-06-03 Thread Richard Joerger (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Joerger reassigned KAFKA-13908:
---

Assignee: Richard Joerger

> RuntimeException will be thrown in BrokerServer.startup, not the cause of 
> exception
> ---
>
> Key: KAFKA-13908
> URL: https://issues.apache.org/jira/browse/KAFKA-13908
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Richard Joerger
>Priority: Major
>  Labels: newbie
>
> Before [#11969|https://github.com/apache/kafka/pull/11969], We will throw an 
> {{ExecutionException(KafkaStorageException)}} in 
> {{{}BrokerServer.startup{}}}, and we'll catch the exception and rethrow the 
> cause by:
> {code:java}
> throw if (e.isInstanceOf[ExecutionException]) e.getCause else e{code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/Kafka.scala#L113]
>  
> After [#11969|https://github.com/apache/kafka/pull/11969], We will throw a 
> {{{}RuntimeException(ExecutionException(KafkaStorageException)){}}}. But the 
> catch logic didn't change. That means, if the exception is RuntimeException, 
> we won't throw only the cause, but all the exception chains. 
>  
> We should update it and add tests for it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] Moovlin commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…

2022-06-03 Thread GitBox


Moovlin commented on PR #12170:
URL: https://github.com/apache/kafka/pull/12170#issuecomment-1146034275

   @showuon You were correct. I went and added the "Configs:" column to the 
quickstart output. Great catch!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

2022-06-03 Thread GitBox


C0urante commented on PR #11779:
URL: https://github.com/apache/kafka/pull/11779#issuecomment-1146012029

   Given that all merge conflicts have been resolved and 
https://github.com/apache/kafka/pull/11778 has already been approved, marking 
this as ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Moovlin commented on pull request #12246: KAFKA-13718: kafka-topics describe topic with default config will show `segment.bytes` overridden config

2022-06-03 Thread GitBox


Moovlin commented on PR #12246:
URL: https://github.com/apache/kafka/pull/12246#issuecomment-1145980276

   @showuon Thanks! Happy to help contribute to the project!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output

2022-06-03 Thread Richard Joerger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545877#comment-17545877
 ] 

Richard Joerger commented on KAFKA-13875:
-

The Jira which was blocking this previously has been resolved. Please let me 
know in the PR if there are any other changes which should be made to the 
documentation. Thanks!

> update docs to include topoicId for kafka-topics.sh --describe output
> -
>
> Key: KAFKA-13875
> URL: https://issues.apache.org/jira/browse/KAFKA-13875
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Assignee: Richard Joerger
>Priority: Major
>  Labels: newbie
>
> The topic describe output in quickstart doc here: 
> [https://kafka.apache.org/quickstart] should be updated now.
> {code:java}
> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
> localhost:9092
> Topic:quickstart-events  PartitionCount:1ReplicationFactor:1 Configs:
> Topic: quickstart-events Partition: 0Leader: 0   Replicas: 0 Isr: 
> 0{code}
> After Topic Id implementation, we included the topic id info in the output 
> now. Also the configs is not empty now. The doc should be updated to avoid 
> new users get confused.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…

2022-06-03 Thread GitBox


showuon commented on PR #12170:
URL: https://github.com/apache/kafka/pull/12170#issuecomment-1145979453

   @Moovlin , IIRC, even if there's no any overridden configs, the `configs:` 
column will still be displayed. Could you confirm it by running the command 
once with the latest code? Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-06-03 Thread GitBox


dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r888913198


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -1446,6 +1446,35 @@ default DescribeFeaturesResult describeFeatures() {
  */
 UpdateFeaturesResult updateFeatures(Map 
featureUpdates, UpdateFeaturesOptions options);
 
+/**
+ * Describes the state of the metadata quorum.
+ * 
+ * This is a convenience method for {@link 
#describeMetadataQuorum(DescribeMetadataQuorumOptions)}  with default options.

Review Comment:
   nit: There is an extra space before `with`.



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of {@link 
Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *
+ */
+public class DescribeMetadataQuorumResult {
+
+private final KafkaFuture quorumInfo;
+
+DescribeMetadataQuorumResult(KafkaFuture quorumInfo) {
+this.quorumInfo = quorumInfo;
+}
+
+/**
+ * Returns a future QuorumInfo

Review Comment:
   nit: Should we say `Returns a future containing the quorum info.`?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumOptions.java:
##
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for {@link 
ConfluentAdmin#describeQuorum(DescribeMetadataQuorumOptions)}.
+ *

Review Comment:
   nit: We could remove this empty line.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) {
 return new UpdateFeaturesResult(new HashMap<>(updateFutures));
 }
 
+@Override
+public DescribeMetadataQuorumResult 
describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+NodeProvider provider = new LeastLoadedNodeProvider();
+
+final KafkaFutureImpl future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final Call call = new Call(
+"describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+
+private QuorumInfo createQuorumResult(final DescribeQuorumResponse 
response) {
+Integer partition = 0;
+String topicName = response.getTopicNameByIndex(0);
+Integer leaderId = response.getPartitionLeaderId(topicName, 
partition);
+List voters = new ArrayList<>();
+List observers = new ArrayList<>();
+response.getVoterInfo(topicName, partition).forEach(v -> {
+voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+v.logEndOffset(),
+OptionalLong.of(v.lastFetchTimestamp()),
+OptionalLong.of(v.lastCaughtUpTimestamp(;
+});
+response.getObserverInfo(topicName, partition).forEach(o -> {
+observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+o.logEndOffset(),
+OptionalLong.of(o.lastFetchTimestamp()),
+  

[GitHub] [kafka] Moovlin commented on pull request #12170: KAFKA-13875 Adjusted the output the topic describe output to include TopicID & se…

2022-06-03 Thread GitBox


Moovlin commented on PR #12170:
URL: https://github.com/apache/kafka/pull/12170#issuecomment-1145959126

   @dengziming I went and submitted a PR for KAFKA-13718 which has since been 
merged into trunk. The PR (12246) keeps the kafka-topics.sh command from 
printing the segment.bytes values when describing the topic. Because of this, 
when end users now follow the quickstart, what they see and what they get 
should be the same from as the docs. Thanks so much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output

2022-06-03 Thread Richard Joerger (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Joerger reassigned KAFKA-13875:
---

Assignee: Richard Joerger

> update docs to include topoicId for kafka-topics.sh --describe output
> -
>
> Key: KAFKA-13875
> URL: https://issues.apache.org/jira/browse/KAFKA-13875
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Assignee: Richard Joerger
>Priority: Major
>  Labels: newbie
>
> The topic describe output in quickstart doc here: 
> [https://kafka.apache.org/quickstart] should be updated now.
> {code:java}
> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
> localhost:9092
> Topic:quickstart-events  PartitionCount:1ReplicationFactor:1 Configs:
> Topic: quickstart-events Partition: 0Leader: 0   Replicas: 0 Isr: 
> 0{code}
> After Topic Id implementation, we included the topic id info in the output 
> now. Also the configs is not empty now. The doc should be updated to avoid 
> new users get confused.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13718) kafka-topics describe topic with default config will show `segment.bytes` overridden config

2022-06-03 Thread Richard Joerger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545865#comment-17545865
 ] 

Richard Joerger commented on KAFKA-13718:
-

There was a question on if segment.bytes should be included. As was decided in 
this Jira, we should remove it. This means that it should therefore be removed 
in the documentation as well. 

> kafka-topics describe topic with default config will show `segment.bytes` 
> overridden config 
> 
>
> Key: KAFKA-13718
> URL: https://issues.apache.org/jira/browse/KAFKA-13718
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Luke Chen
>Assignee: Richard Joerger
>Priority: Major
>  Labels: newbie, newbie++
> Fix For: 3.3.0
>
>
> Following the quickstart guide[1], when describing the topic just created 
> with default config, I found there's a overridden config shown:
> _> bin/kafka-topics.sh --describe --topic quickstart-events 
> --bootstrap-server localhost:9092_
> _Topic: quickstart-events   TopicId: 06zRrzDCRceR9zWAf_BUWQ    
> PartitionCount: 1    ReplicationFactor: 1    *Configs: 
> segment.bytes=1073741824*_
>     _Topic: quickstart-events    Partition: 0    Leader: 0    Replicas: 0    
> Isr: 0_
>  
> This config result should be empty as in Kafka quick start page. Although the 
> config value is what we expected (default 1GB value), this info display still 
> confuse users.
>  
> Note: I checked the 2.8.1 build, this issue also happened.
>  
> [1]: [https://kafka.apache.org/quickstart]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] Moovlin commented on pull request #12167: KAFKA-13716 Added the DeleteRecordsCommandTest to test the CLI front end of the D…

2022-06-03 Thread GitBox


Moovlin commented on PR #12167:
URL: https://github.com/apache/kafka/pull/12167#issuecomment-1145952034

   Hey @showuon , just looking to see if you have any input on the last commit 
I made to this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13936) Invalid consumer lag when monitoring from a kafka streams application

2022-06-03 Thread Prashanth Joseph Babu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545846#comment-17545846
 ] 

Prashanth Joseph Babu commented on KAFKA-13936:
---

[~mjsax] sure ! I'd love to ! I'm not sure where the doc repos are and what 
would be the right place to mention this point

> Invalid consumer lag when monitoring from a kafka streams application
> -
>
> Key: KAFKA-13936
> URL: https://issues.apache.org/jira/browse/KAFKA-13936
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have a kafka streams application and I'm trying to monitor the consumer lag 
> via stream metrics.
> Here's some code snippet
> {code:java}
> metrics = streams.metrics();
> lag = 0;
> for (Metric m : metrics.values()) {
> tags = m.metricName().tags();
> if ( m.metricName().name().equals(MONITOR_CONSUMER_LAG) && 
> tags.containsKey(MONTOR_TAG_TOPIC) && 
> tags.get(MONTOR_TAG_TOPIC).equals(inputTopic) ) {
> partitionLag = 
> Float.valueOf(m.metricValue().toString()).floatValue();
> if ( !partitionLag.isNaN() ) {
> lag += partitionLag;
> }
> }
> }
> {code}
> Here MONITOR_CONSUMER_LAG is {{{}records-lag-max{}}}.
> However these numbers dont match with the consumer lag we see in the kafka UI 
> . is records-lag-max the right metric to track for a kafka streams 
> application when the objective is to get consumer lag?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13958) Expose logdirs total and usable space via Kafka API

2022-06-03 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-13958:
--

 Summary: Expose logdirs total and usable space via Kafka API
 Key: KAFKA-13958
 URL: https://issues.apache.org/jira/browse/KAFKA-13958
 Project: Kafka
  Issue Type: Bug
Reporter: Mickael Maison
Assignee: Mickael Maison


JIRA for KIP-827: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-827%3A+Expose+logdirs+total+and+usable+space+via+Kafka+API



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-06-03 Thread Aldan Brito (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545837#comment-17545837
 ] 

Aldan Brito commented on KAFKA-13953:
-

hi [~junrao] , 

there is no evidence of data corruption on the kafka broker logs, 

is this expected, 

is  deleting the log segment the only way to recover the system ? – this would 
mean data loss right. 

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13957) Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores

2022-06-03 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545833#comment-17545833
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13957:


Failed again

> Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-13957
> URL: https://issues.apache.org/jira/browse/KAFKA-13957
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
> Attachments: 
> StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores.rtf
>
>
> Failed on a local build so I have the full logs (attached)
> {code:java}
> java.lang.AssertionError: Unexpected exception thrown while getting the value 
> from store.
> Expected: is (a string containing "Cannot get state store source-table 
> because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string 
> containing "The state store, source-table, may have migrated to another 
> instance" or a string containing "Cannot get state store source-table because 
> the stream thread is STARTING, not RUNNING")
>  but: was "The specified partition 1 for store source-table does not 
> exist."
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.verifyRetrievableException(StoreQueryIntegrationTest.java:539)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQuerySpecificActivePartitionStores$5(StoreQueryIntegrationTest.java:241)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:557)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:183)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833) {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…

2022-06-03 Thread GitBox


showuon commented on code in PR #12179:
URL: https://github.com/apache/kafka/pull/12179#discussion_r27862


##
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java:
##
@@ -679,10 +679,11 @@ private long 
calcCompletionTimesAndReturnSessionLifetimeMs() {
 else if (connectionsMaxReauthMs == null)
 retvalSessionLifetimeMs = 
zeroIfNegative(credentialExpirationMs - authenticationEndMs);
 else
-retvalSessionLifetimeMs = zeroIfNegative(
-Math.min(credentialExpirationMs - 
authenticationEndMs, connectionsMaxReauthMs));
+retvalSessionLifetimeMs = 
zeroIfNegative(Math.min(credentialExpirationMs - authenticationEndMs, 
connectionsMaxReauthMs));
 
-sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 
1000 * retvalSessionLifetimeMs;
+if (connectionsMaxReauthMs != null) {

Review Comment:
   > While there is the Authenticator interface where comments suggest that 
#serverSessionExpirationTimeNanos should be left as null when re-authentication 
is "disabled"
   
   Is 
[this](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java#L90-L102)
 javadoc what you mean? From the javadoc, I don't see it said we should return 
null for re-auth disabled. I think it's OK to return the value when re-auth 
disabled.
   
   > here are some clients or SASL mechanisms where we don't expect 
reauthentication to ever happen?
   
   Yes, it is. You can check 
[this](https://github.com/edenhill/librdkafka/issues/3304) for reference.
   
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] acsaki commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…

2022-06-03 Thread GitBox


acsaki commented on code in PR #12179:
URL: https://github.com/apache/kafka/pull/12179#discussion_r22004


##
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##
@@ -976,8 +975,11 @@ public void testChannelCloseWhileProcessingReceives() 
throws Exception {
 SelectionKey selectionKey = mock(SelectionKey.class);
 when(channel.selectionKey()).thenReturn(selectionKey);
 when(selectionKey.isValid()).thenReturn(true);
+when(selectionKey.isReadable()).thenReturn(true);
 when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_READ);
-selectionKey.attach(channel);
+when(selectionKey.attachment())
+.thenReturn(channel)
+.thenReturn(null);

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] acsaki commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…

2022-06-03 Thread GitBox


acsaki commented on code in PR #12179:
URL: https://github.com/apache/kafka/pull/12179#discussion_r21809


##
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##
@@ -781,14 +781,13 @@ public void testConnectDisconnectDuringInSinglePoll() 
throws Exception {
 when(kafkaChannel.selectionKey()).thenReturn(selectionKey);
 when(selectionKey.channel()).thenReturn(SocketChannel.open());
 when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_CONNECT);
+when(selectionKey.attachment()).thenReturn(kafkaChannel);
 
-selectionKey.attach(kafkaChannel);
 Set selectionKeys = Utils.mkSet(selectionKey);
 selector.pollSelectionKeys(selectionKeys, false, System.nanoTime());
 
 assertFalse(selector.connected().contains(kafkaChannel.id()));
 assertTrue(selector.disconnected().containsKey(kafkaChannel.id()));
-assertNull(selectionKey.attachment());

Review Comment:
   This test failed after my change to use mockitoInline to mock statics. The 
`selectionKey.attach` line called the mock's method directly (pointless?) while 
it looked like the actual intention was `selectionKey.attachment()` to return 
the kafkaChannel. That's how it worked actually. Calling the mock method 
directly in the test and later asserting on attachment being null seemed 
confusing to me and the assert actually fails too. (with #attachment returning 
the kafkaChannel). Maybe this was sort of overbearing, is there a simpler way 
to fix the test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] acsaki commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…

2022-06-03 Thread GitBox


acsaki commented on code in PR #12179:
URL: https://github.com/apache/kafka/pull/12179#discussion_r17853


##
build.gradle:
##
@@ -1243,7 +1243,7 @@ project(':clients') {
 
 testImplementation libs.bcpkix
 testImplementation libs.junitJupiter
-testImplementation libs.mockitoCore
+testImplementation libs.mockitoInline

Review Comment:
   I needed this to mock out ChannelBuilders#createPrincipalBuilder and 
Sasl#createSaslServer. It's a petty there are these static calls, hard to test 
them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] acsaki commented on pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…

2022-06-03 Thread GitBox


acsaki commented on PR #12179:
URL: https://github.com/apache/kafka/pull/12179#issuecomment-1145812750

   > Nice tests! Thank you. Left some comments. Also, I found there are many 
tests failed with the error:
   > 
   > ```
   > org.opentest4j.AssertionFailedError: Topic [__consumer_offsets] metadata 
not propagated after 6 ms
   > ```
   > 
   > ref: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12179/4/#showFailuresLink
   > 
   > I guess maybe it's because we use mock time, instead of system time now? 
Please help check them. Thank you.
   
   Thank you, haven't thought of MockTime. Indeed, lot of tests are failing 
yet, I've found out that in some test `connections.max.reauth.ms` gets set to 0 
instead of being left as null, so my current code sets the session expiration 
eventually and clients seem to get disconnected. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] acsaki commented on a diff in pull request #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…

2022-06-03 Thread GitBox


acsaki commented on code in PR #12179:
URL: https://github.com/apache/kafka/pull/12179#discussion_r12184


##
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java:
##
@@ -679,10 +679,11 @@ private long 
calcCompletionTimesAndReturnSessionLifetimeMs() {
 else if (connectionsMaxReauthMs == null)
 retvalSessionLifetimeMs = 
zeroIfNegative(credentialExpirationMs - authenticationEndMs);
 else
-retvalSessionLifetimeMs = zeroIfNegative(
-Math.min(credentialExpirationMs - 
authenticationEndMs, connectionsMaxReauthMs));
+retvalSessionLifetimeMs = 
zeroIfNegative(Math.min(credentialExpirationMs - authenticationEndMs, 
connectionsMaxReauthMs));
 
-sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 
1000 * retvalSessionLifetimeMs;
+if (connectionsMaxReauthMs != null) {

Review Comment:
   @showuon this is what I've also found rather confusing. I agree that after 
the token expires the connection should be closed sooner or later which isn't 
going to happen when sessionExpirationTimeNanos is not set. While there is the 
Authenticator interface where comments suggest that  
#serverSessionExpirationTimeNanos should be left as null when re-authentication 
is "disabled". Does it make sense for reauth to be disabled? Or rather there 
are some clients or SASL mechanisms where we don't expect reauthentication to 
ever happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-03 Thread GitBox


ableegoldman commented on PR #12235:
URL: https://github.com/apache/kafka/pull/12235#issuecomment-1145785669

   @cadonna ok this is ready for another pass -- besides addressing your 
comments I made two changes worth noting:
   1. I moved the produced/consumed metrics to a new `TopicMetrics` class as 
they didn't exactly fit into ProcessorNodeMetrics since I realized that due to 
dynamic topic routing you could have more than one topic produced to by a given 
sink node.
   2. To address your (imo valid) concern about over-counting bytes that were 
sent to the producer but never made it to the topic, I moved the "-produced" 
sensor into `RecordCollectorImpl` and have it record inside the `send` callback 
after we've checked for errors
   
   The PR has definitely ballooned in size since you last reviewed it but 
that's almost all due to the new test coverage for the topic-level metrics. The 
actual logical changes are still relatively minor, so I'm hoping you can give 
this a +1 by now (and again, if so, please go ahead and merge it yourself)
   
   Thanks for the thorough reviews! :P 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-03 Thread GitBox


ableegoldman commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r888782190


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java:
##
@@ -114,6 +116,63 @@ public void shouldGetProcessAtSourceSensor() {
 verifySensor(() -> 
ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, streamsMetrics));
 }
 
+@Test
+public void shouldGetRecordsAndBytesConsumedSensor() {
+final String recordsMetricNamePrefix = "records-consumed";
+final String bytesMetricNamePrefix = "bytes-consumed";
+final String descriptionOfRecordsTotal = "The total number of records 
consumed from this topic";
+final String descriptionOfBytesTotal = "The total number of bytes 
consumed from this topic";
+when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, recordsMetricNamePrefix, RecordingLevel.INFO))
+.thenReturn(expectedSensor);
+when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, bytesMetricNamePrefix, RecordingLevel.INFO))
+.thenReturn(expectedSensor);
+when(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID)).thenReturn(tagMap);
+
+final Map consumedTagMap = new HashMap<>(tagMap);
+consumedTagMap.put("topic", TOPIC_NAME);
+StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(

Review Comment:
   Ah, thanks for the explanation. I updated the test as suggested (now in 
`TopicMetricsTest`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12238: KIP-835: MetadataMaxIdleIntervalMs should be much bigger than brokerHeartbeatIntervalMs

2022-06-03 Thread GitBox


showuon commented on PR #12238:
URL: https://github.com/apache/kafka/pull/12238#issuecomment-1145703278

   > should we allow the broker to be behind by A records (or time) to unfence 
the broker?
   
   I've similar solution came out yesterday, but then, I don't think this is a 
good solution because it might break the expectation/assumption that when 
broker is up, the broker should already catch up all the metadata changes. 
There are chances the `A` records/time behind contain some important metadata 
update, right?
   
   > Another solution is to require the broker to only catch up to the last 
committed offset when they last sent the heartbeat. 
   
   I like this idea. We can make sure the broker already catch up the last 
committed offset when heartbeat sent, which means, the metadata changes before 
broker startup are all caught up. 
   +1 for it!
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12243: MINOR: fix doc

2022-06-03 Thread GitBox


showuon merged PR #12243:
URL: https://github.com/apache/kafka/pull/12243


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12210: KAFKA-13930: Add 3.2.0 to core upgrade and compatibility system tests

2022-06-03 Thread GitBox


cadonna merged PR #12210:
URL: https://github.com/apache/kafka/pull/12210


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12210: KAFKA-13930: Add 3.2.0 to core upgrade and compatibility system tests

2022-06-03 Thread GitBox


cadonna commented on PR #12210:
URL: https://github.com/apache/kafka/pull/12210#issuecomment-1145660378

   Build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12210: KAFKA-13930: Add 3.2.0 to core upgrade and compatibility system tests

2022-06-03 Thread GitBox


cadonna commented on PR #12210:
URL: https://github.com/apache/kafka/pull/12210#issuecomment-1145659280

   Passing system test run: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-06-02--001.system-test-kafka-branch-builder--1654200227--cadonna--AK13930-add_32_upgrade_comp_tests-core--7818cb8df6/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-03 Thread GitBox


ableegoldman commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r888672094


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##
@@ -199,6 +201,7 @@ public  void send(final String topic,
 log.trace("Failed record: (key {} value {} timestamp {}) 
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
 }
 });
+return recordSizeInBytes(keyBytes == null ? 0 : keyBytes.length, 
valBytes == null ? 0 : valBytes.length, topic, headers);

Review Comment:
   Yes that's a good point -- I fixed this up by moving the sensors to the 
`RecordCollectorImpl` class and recording the metric in the callback of the 
Producer's `send` method only upon success. Thanks for clarifying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics

2022-06-03 Thread GitBox


ableegoldman commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r888670820


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##
@@ -199,6 +201,7 @@ public  void send(final String topic,
 log.trace("Failed record: (key {} value {} timestamp {}) 
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
 }
 });
+return recordSizeInBytes(keyBytes == null ? 0 : keyBytes.length, 
valBytes == null ? 0 : valBytes.length, topic, headers);

Review Comment:
   I did it like this to avoid the extra/unnecessary null check for consumer 
records specifically, which already track the serialized size in bytes unlike 
the producer record. And unfortunately they don't inherit from a common 
interface/class -- but I added separate middle-man methods to handle them and 
moved the null check for the producer case there, should be addressed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] AnGg98 opened a new pull request, #12247: replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER

2022-06-03 Thread GitBox


AnGg98 opened a new pull request, #12247:
URL: https://github.com/apache/kafka/pull/12247

   Replace ACL_AUTHORIZER attribute with ZK_ACL_AUTHORIZER in tests. Required 
after the changes merged with https://github.com/apache/kafka/pull/12190


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org