[jira] [Created] (KAFKA-10615) Authentication failure log detail

2020-10-14 Thread Jira
Gérald Quintana created KAFKA-10615:
---

 Summary: Authentication failure log detail
 Key: KAFKA-10615
 URL: https://issues.apache.org/jira/browse/KAFKA-10615
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 2.4.1
Reporter: Gérald Quintana


When using the PlainLoginModule and a client application is providing a wrong 
password, you get endless error logs telling:
{code:java}
[2020-10-15 07:00:05,263] INFO [SocketServer brokerId=4] Failed authentication 
with myhost.mycompany.fr/192.168.35.194 (Authentication failed: Invalid 
username or password) (org.apache.kafka.common.network.Selector)
[2020-10-15 07:00:06,400] INFO [SocketServer brokerId=4] Failed authentication 
with myhost.mycompany.fr/192.168.35.194 (Authentication failed: Invalid 
username or password) (org.apache.kafka.common.network.Selector){code}
 

When this client is running in Kubernetes the hostname and IP have no meaning 
because they represent the Kubernetes host. So it's very hard for us to find 
the misconfigured application.

I'd like to have the username in the error message so as to make it easier to 
find the source of the error.

>From a security a point view it may be interesting to know that a given user 
>is used to brute force a password or may have been pawned.

I seems easy to do it in 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java#L107]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10615) Plain authentication failure log detail

2020-10-14 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-10615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gérald Quintana updated KAFKA-10615:

Summary: Plain authentication failure log detail  (was: Authentication 
failure log detail)

> Plain authentication failure log detail
> ---
>
> Key: KAFKA-10615
> URL: https://issues.apache.org/jira/browse/KAFKA-10615
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.4.1
>Reporter: Gérald Quintana
>Priority: Major
>
> When using the PlainLoginModule and a client application is providing a wrong 
> password, you get endless error logs telling:
> {code:java}
> [2020-10-15 07:00:05,263] INFO [SocketServer brokerId=4] Failed 
> authentication with myhost.mycompany.fr/192.168.35.194 (Authentication 
> failed: Invalid username or password) 
> (org.apache.kafka.common.network.Selector)
> [2020-10-15 07:00:06,400] INFO [SocketServer brokerId=4] Failed 
> authentication with myhost.mycompany.fr/192.168.35.194 (Authentication 
> failed: Invalid username or password) 
> (org.apache.kafka.common.network.Selector){code}
>  
> When this client is running in Kubernetes the hostname and IP have no meaning 
> because they represent the Kubernetes host. So it's very hard for us to find 
> the misconfigured application.
> I'd like to have the username in the error message so as to make it easier to 
> find the source of the error.
> From a security a point view it may be interesting to know that a given user 
> is used to brute force a password or may have been pawned.
> I seems easy to do it in 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java#L107]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #9416: MINOR: more log4j entry on elect / resignation of coordinators

2020-10-14 Thread GitBox


guozhangwang commented on a change in pull request #9416:
URL: https://github.com/apache/kafka/pull/9416#discussion_r505170871



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -908,6 +908,7 @@ class GroupCoordinator(val brokerId: Int,
* @param offsetTopicPartitionId The partition we are now leading
*/
   def onElection(offsetTopicPartitionId: Int): Unit = {
+info(s"Becoming the group coordinator for partition 
$offsetTopicPartitionId")

Review comment:
   But during the loading process, we do have log entry as
   
   ```
   Loading / Unloading group metadata for ...
   ```
   
   per group.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9416: MINOR: more log4j entry on elect / resignation of coordinators

2020-10-14 Thread GitBox


guozhangwang commented on a change in pull request #9416:
URL: https://github.com/apache/kafka/pull/9416#discussion_r505164292



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -908,6 +908,7 @@ class GroupCoordinator(val brokerId: Int,
* @param offsetTopicPartitionId The partition we are now leading
*/
   def onElection(offsetTopicPartitionId: Int): Unit = {
+info(s"Becoming the group coordinator for partition 
$offsetTopicPartitionId")

Review comment:
   Yeah that cannot be known until loading is completed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10614) Group coordinator onElection/onResignation should guard against leader epoch

2020-10-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10614:
-

 Summary: Group coordinator onElection/onResignation should guard 
against leader epoch
 Key: KAFKA-10614
 URL: https://issues.apache.org/jira/browse/KAFKA-10614
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Guozhang Wang


When there are a sequence of LeaderAndISR or StopReplica requests sent from 
different controllers causing the group coordinator to elect / resign, we may 
re-order the events due to race condition. For example:

1) First LeaderAndISR request received from old controller to resign as the 
group coordinator.
2) Second LeaderAndISR request received from new controller to elect as the 
group coordinator.
3) Although threads handling the 1/2) requests are synchronized on the replica 
manager, their callback {{onLeadershipChange}} would trigger 
{{onElection/onResignation}} which would schedule the loading / unloading on 
background threads, and are not synchronized.
4) As a result, the {{onElection}} maybe triggered by the thread first, and 
then {{onResignation}}. As a result, the coordinator would not recognize it 
self as the coordinator and hence would respond any coordinator request with 
{{NOT_COORDINATOR}}.

Here are two proposals on top of my head:

1) Let the scheduled load / unload function to keep the passed in leader epoch, 
and also materialize the epoch in memory. Then when execute the unloading check 
against the leader epoch.

2) This may be a bit simpler: using a single background thread working on a 
FIFO queue of loading / unloading jobs, since the caller are actually 
synchronized on replica manager and order preserved, the enqueued loading / 
unloading job would be correctly ordered as well. In that case we would avoid 
the reordering. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9423: KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogD…

2020-10-14 Thread GitBox


chia7712 commented on pull request #9423:
URL: https://github.com/apache/kafka/pull/9423#issuecomment-708886001


   @hachikuji @lbradstreet Could you take a look? ```maybeIncrementLeaderHW``` 
was called with holding lock 
(https://github.com/apache/kafka/commit/f3ded39a0556f9e43a66b238c89c6a60f3ef33df#diff-16590fdbe6661105c6faa87d68e39a553e66b34ee0a6d0b14a48d02e273bab4fL676)
 and 
https://github.com/apache/kafka/commit/f3ded39a0556f9e43a66b238c89c6a60f3ef33df 
changed it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-10-14 Thread Pushkar Deole (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214412#comment-17214412
 ] 

Pushkar Deole edited comment on KAFKA-8803 at 10/15/20, 3:56 AM:
-

[~guozhang] any update on above? we are on confluent 5.5.0 which internally use 
apache kafka 2.5.0. 

Do we need to upgrade to 2.5.1 version broker?

 

By the way, I could observe that the issue is only with 1 of the streams 
application, where we have 3 streams applications. The other 2 don't have the 
error.


was (Author: pdeole):
[~guozhang] any update on above? we are on confluent 5.5.0 which internally use 
apache kafka 2.5.0. 

Do we need to upgrade to 2.5.1 version broker?

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.5.0, 2.3.2, 2.4.2
>
> Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, 
> logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-10-14 Thread Pushkar Deole (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214412#comment-17214412
 ] 

Pushkar Deole commented on KAFKA-8803:
--

[~guozhang] any update on above? we are on confluent 5.5.0 which internally use 
apache kafka 2.5.0. 

Do we need to upgrade to 2.5.1 version broker?

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.5.0, 2.3.2, 2.4.2
>
> Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, 
> logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] AndreyBozhko edited a comment on pull request #9413: MINOR: Fix typos in DefaultSslEngineFactory javadoc

2020-10-14 Thread GitBox


AndreyBozhko edited a comment on pull request #9413:
URL: https://github.com/apache/kafka/pull/9413#issuecomment-70887


   I don't have write access to the repo, so please close this PR at your 
earliest convenience.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] AndreyBozhko commented on pull request #9413: MINOR: Fix typos in DefaultSslEngineFactory javadoc

2020-10-14 Thread GitBox


AndreyBozhko commented on pull request #9413:
URL: https://github.com/apache/kafka/pull/9413#issuecomment-70887


   I don't have write access to the repo, so please close at your earliest 
convenience.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10472) Consider migrating Topology methods to the Builder pattern

2020-10-14 Thread Huynh Quang Thao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214405#comment-17214405
 ] 

Huynh Quang Thao commented on KAFKA-10472:
--

My email is huynhquangt...@gmail.com

> Consider migrating Topology methods to the Builder pattern
> --
>
> Key: KAFKA-10472
> URL: https://issues.apache.org/jira/browse/KAFKA-10472
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Huynh Quang Thao
>Priority: Minor
>  Labels: need-kip
>
> During code review for KIP-478, I got this feedback from [~bbejeck] .
> In Topology, we have methods like this:
> {code:java}
> public synchronized  Topology addGlobalStore(
>   final StoreBuilder storeBuilder,
>   final String sourceName,
>   final TimestampExtractor timestampExtractor,
>   final Deserializer keyDeserializer,
>   final Deserializer valueDeserializer,
>   final String topic,
>   final String processorName,
>   final ProcessorSupplier stateUpdateSupplier){code}
> It would probably be better UX to preset a builder interface like:
> {code:java}
> public synchronized  Topology addGlobalStore(
>   AddGlobalStoreParameters.fromStoreBuilder(storeBuiler)
>   .withSourceName(sourceName)
>   .withSourceTopic(topic)
>   .withTimestampExtractor(timestampExtractor)
>   .withKeyDeserializer(keyDeserializer)
>   .withValueDeserializer(valueDeserializer)
>   .withProcessorName(processorName)
>   .withStateUpdateSupplier(stateUpdateSupplier)
> ){code}
>  
> Note: new API design proposals should take into account the proposed grammar: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-10-14 Thread GitBox


showuon commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-708870508


   Thanks for the comment, @kkonstantine , I've updated in this commit: 
https://github.com/apache/kafka/pull/9149/commits/094ee6acc772f0532f5513f9d29e5b46f165bd01.
 Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #9407: KAFKA-10611: Merge log error to avoid double error

2020-10-14 Thread GitBox


kkonstantine merged pull request #9407:
URL: https://github.com/apache/kafka/pull/9407


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10611) Merge log error to avoid double error

2020-10-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10611:
---
Fix Version/s: 2.7.0

> Merge log error to avoid double error
> -
>
> Key: KAFKA-10611
> URL: https://issues.apache.org/jira/browse/KAFKA-10611
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Benoit MAGGI
>Priority: Trivial
> Fix For: 2.7.0
>
>
> When using an error tracking system, 2 error logs means 2 different alerts.
> It's best to group the logs and have one error with all information.
> For example when using with [Sentry|https://sentry.io/welcome/], this [double 
> line|https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L187]
>  of log.error will create 2 different Issues.
> One can merge the issues but it will be simpler to have a single error log 
> line
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-10-14 Thread GitBox


kkonstantine commented on a change in pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#discussion_r505137462



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -409,6 +409,9 @@ private boolean sendRecords() {
 // RegexRouter) topic creation can not be batched for multiple topics
 private void maybeCreateTopic(String topic) {
 if (!topicCreation.isTopicCreationRequired(topic)) {
+log.trace("The topic creation setting is disabled or the topic 
name {} is already in the topic cache." +
+"If the topic doesn't exist, we'll rely on the 
auto.create.topics.enable setting in broker side " +
+"to see if the topic can be auto created or not", topic);

Review comment:
   ```suggestion
   "If auto.create.topics.enable is enabled on the broker, the 
topic will be created with  " +
   "default settings", topic);
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -409,6 +409,9 @@ private boolean sendRecords() {
 // RegexRouter) topic creation can not be batched for multiple topics
 private void maybeCreateTopic(String topic) {
 if (!topicCreation.isTopicCreationRequired(topic)) {
+log.trace("The topic creation setting is disabled or the topic 
name {} is already in the topic cache." +

Review comment:
   ```suggestion
   log.trace("Topic creation by the connector is disabled or the 
topic {} was previously created." +
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-14 Thread GitBox


feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r505133488



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
 group.currentState match {
   case Stable =>
-info(s"Static member joins during Stable stage will not trigger 
rebalance.")
-group.maybeInvokeJoinCallback(member, JoinGroupResult(
-  members = List.empty,
-  memberId = newMemberId,
-  generationId = group.generationId,
-  protocolType = group.protocolType,
-  protocolName = group.protocolName,
-  // We want to avoid current leader performing trivial assignment 
while the group
-  // is in stable stage, because the new assignment in leader's next 
sync call
-  // won't be broadcast by a stable group. This could be guaranteed by
-  // always returning the old leader id so that the current leader 
won't assume itself
-  // as a leader based on the returned message, since the new 
member.id won't match
-  // returned leader id, therefore no assignment will be performed.
-  leaderId = currentLeader,
-  error = Errors.NONE))
+// check if group's selectedProtocol of next generation will change, 
if not, simply store group to persist the
+// updated static member, if yes, rebalance should be triggered to let 
the group's assignment and selectProtocol consistent
+val selectedProtocolOfNextGeneration = group.selectProtocol
+if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+  info(s"Static member which joins during Stable stage and doesn't 
affect selectProtocol will not trigger rebalance.")
+  val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
+  groupManager.storeGroup(group, groupAssignment, error => {
+group.inLock {
+  if (error != Errors.NONE) {

Review comment:
   Revised the persistence failure handling logic, now it will revert the 
member id update in the groupMetaData if any persistence error encountered and 
call the responseCallback with the returned error.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-14 Thread GitBox


feyman2016 commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r505133488



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
 group.currentState match {
   case Stable =>
-info(s"Static member joins during Stable stage will not trigger 
rebalance.")
-group.maybeInvokeJoinCallback(member, JoinGroupResult(
-  members = List.empty,
-  memberId = newMemberId,
-  generationId = group.generationId,
-  protocolType = group.protocolType,
-  protocolName = group.protocolName,
-  // We want to avoid current leader performing trivial assignment 
while the group
-  // is in stable stage, because the new assignment in leader's next 
sync call
-  // won't be broadcast by a stable group. This could be guaranteed by
-  // always returning the old leader id so that the current leader 
won't assume itself
-  // as a leader based on the returned message, since the new 
member.id won't match
-  // returned leader id, therefore no assignment will be performed.
-  leaderId = currentLeader,
-  error = Errors.NONE))
+// check if group's selectedProtocol of next generation will change, 
if not, simply store group to persist the
+// updated static member, if yes, rebalance should be triggered to let 
the group's assignment and selectProtocol consistent
+val selectedProtocolOfNextGeneration = group.selectProtocol
+if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+  info(s"Static member which joins during Stable stage and doesn't 
affect selectProtocol will not trigger rebalance.")
+  val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
+  groupManager.storeGroup(group, groupAssignment, error => {
+group.inLock {
+  if (error != Errors.NONE) {

Review comment:
   @vvcephei  @abbccdda Revised the persistence failure handling logic, now 
it will revert the member id update in the groupMetaData if any persistence 
error encountered and call the responseCallback with the returned error.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-14 Thread GitBox


feyman2016 commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-708859895


   Hi, @vvcephei and @abbccdda ,sorry this PR takes a very long time,  just 
updated the PR, could you help to review? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system

2020-10-14 Thread GitBox


junrao commented on a change in pull request #9409:
URL: https://github.com/apache/kafka/pull/9409#discussion_r505019265



##
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+val opts = new FeatureCommandOptions(args)
+val featureApis = new FeatureApis(opts)
+var exitCode = 0
+try {
+  featureApis.execute()
+} catch {
+  case e: IllegalArgumentException =>
+printException(e)
+opts.parser.printHelpOn(System.err)
+exitCode = 1
+  case _: UpdateFeaturesException =>
+exitCode = 1
+  case e: Throwable =>
+printException(e)
+exitCode = 1
+} finally {
+  featureApis.close()
+  Exit.exit(exitCode)
+}
+  }
+
+  private def printException(exception: Throwable): Unit = {
+System.err.println("\nError encountered when executing command: " + 
Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends 
RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature 
APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = 
BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  private[admin] def setSupportedFeatures(newFeatures: 
Features[SupportedVersionRange]): Unit = {
+supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+opts = newOpts
+  }
+
+  private def describeFeatures(sendRequestToController: Boolean): 
FeatureMetadata = {
+val options = new 
DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+adminClient.describeFeatures(options).featureMetadata().get()
+  }
+
+  /**
+   * Describes the supported and finalized features. If the --from-controller 
CLI option
+   * is provided, then the request is issued only to the controller, otherwise 
the request is issued
+   * to any of the provided bootstrap servers.
+   */
+  def describeFeatures(): Unit = {
+val result = describeFeatures(opts.hasFromControllerOption)
+val features = result.supportedFeatures.asScala.keys.toSet ++ 
result.finalizedFeatures.asScala.keys.toSet
+
+features.toList.sorted.foreach {
+  feature =>
+val output = new StringBuilder()
+output.append(s"Feature: $feature")
+
+val (supportedMinVersion, supportedMaxVersion) = {
+  val supportedVersionRange = result.supportedFeatures.get(feature)
+  if (supportedVersionRange == null) {
+("-", "-")
+  } else {
+(supportedVersionRange.minVersion, 
supportedVersionRange.maxVersion)
+  }
+}
+output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+  val finalizedVersionRange = result.finalizedFeatures.get(feature)
+  if (finalizedVersionRange == null) {
+("-", "-")
+  } else {
+

[GitHub] [kafka] xvrl opened a new pull request #9439: KAFKA-10587 MirrorMaker CLI change for KIP-629

2020-10-14 Thread GitBox


xvrl opened a new pull request #9439:
URL: https://github.com/apache/kafka/pull/9439


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl closed pull request #9427: backport KAFKA-10571

2020-10-14 Thread GitBox


xvrl closed pull request #9427:
URL: https://github.com/apache/kafka/pull/9427


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl commented on pull request #9427: backport KAFKA-10571

2020-10-14 Thread GitBox


xvrl commented on pull request #9427:
URL: https://github.com/apache/kafka/pull/9427#issuecomment-708718493


   merged to 2.7 as part of 
https://github.com/apache/kafka/commit/ff1e2271f9cf148c513207a0a87c22e647940d0b



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 edited a comment on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-14 Thread GitBox


lct45 edited a comment on pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#issuecomment-708708449


   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4228/ 
system tests part 2



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 edited a comment on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-14 Thread GitBox


lct45 edited a comment on pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#issuecomment-708708449


   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4227/ 
system tests part 2



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-14 Thread GitBox


lct45 commented on pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#issuecomment-708708449


   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4226/ 
system tests part 2



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9419: KAFKA-10343: Add IBP based ApiVersion tests

2020-10-14 Thread GitBox


abbccdda commented on pull request #9419:
URL: https://github.com/apache/kafka/pull/9419#issuecomment-708706701


   Merged with https://github.com/apache/kafka/pull/9103



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda closed pull request #9419: KAFKA-10343: Add IBP based ApiVersion tests

2020-10-14 Thread GitBox


abbccdda closed pull request #9419:
URL: https://github.com/apache/kafka/pull/9419


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation

2020-10-14 Thread GitBox


jsancio commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r505036063



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -19,33 +19,51 @@
 import org.apache.kafka.common.record.Records;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient {
+
+interface Listener {
+/**
+ * Callback which is invoked when records written through {@link 
#scheduleAppend(int, List)}
+ * become committed.
+ *
+ * Note that there is not a one-to-one correspondence between writes 
through
+ * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+ * is free to batch together the records from multiple append calls 
provided
+ * that batch boundaries are respected. This means that each batch 
specified
+ * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+ * a batch passed to {@link #handleCommit(int, long, List)}.
+ *
+ * @param epoch the epoch in which the write was accepted
+ * @param lastOffset the offset of the last record in the record list
+ * @param records the set of records that were committed
+ */
+void handleCommit(int epoch, long lastOffset, List records);
+}
 
 /**
  * Initialize the client. This should only be called once and it must be
  * called before any of the other APIs can be invoked.
  *
  * @throws IOException For any IO errors during initialization
  */
-void initialize() throws IOException;
+void initialize(Listener listener) throws IOException;
 
 /**
- * Append a new entry to the log. The client must be in the leader state to
- * accept an append: it is up to the state machine implementation
- * to ensure this using {@link #currentLeaderAndEpoch()}.
- *
- * TODO: One improvement we can make here is to allow the caller to specify
- * the current leader epoch in the record set. That would ensure that each
- * leader change must be "observed" by the state machine before new appends
- * are accepted.
+ * Append a list of records to the log. The write will be scheduled for 
some time
+ * in the future. There is no guarantee that appended records will be 
written to
+ * the log and eventually committed. However, it is guaranteed that if any 
of the
+ * records become committed, then all of them will be.
  *
- * @param records The records to append to the log
- * @param timeoutMs Maximum time to wait for the append to complete
- * @return A future containing the last offset and epoch of the appended 
records (if successful)
+ * @param epoch the current leader epoch
+ * @param records the list of records to append
+ * @return the offset within the current epoch that the log entries will 
be appended,
+ * or null if the leader was unable to accept the write (e.g. due 
to memory
+ * being reached).
  */
-CompletableFuture append(Records records, AckMode ackMode, 
long timeoutMs);
+Long scheduleAppend(int epoch, List records);

Review comment:
   How about either 
   ```
   OptionalLong scheduleAppend(int epoch, List records);
   ```
   or
   ```
   void scheduleAppend(int epoch, List records) throws BusyException;
   ```
   
   I am okay with either solution but I am wondering why did you decide to 
return a `null` for this case instead of throwing some exception?

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.RecordSerde;
+

[GitHub] [kafka] Lincong commented on pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request

2020-10-14 Thread GitBox


Lincong commented on pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#issuecomment-708681626


   @ijuma  Will do! Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9428: MINOR: fix a bug in removing elements from an ImplicitLinkedHashColle…

2020-10-14 Thread GitBox


cmccabe commented on pull request #9428:
URL: https://github.com/apache/kafka/pull/9428#issuecomment-708681157


   Backported to 2.7 as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #9428: MINOR: fix a bug in removing elements from an ImplicitLinkedHashColle…

2020-10-14 Thread GitBox


cmccabe merged pull request #9428:
URL: https://github.com/apache/kafka/pull/9428


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9438: KAFKA-10613: Only set leader epoch when list-offset version >= 4

2020-10-14 Thread GitBox


guozhangwang commented on pull request #9438:
URL: https://github.com/apache/kafka/pull/9438#issuecomment-708678537


   System tests on `StreamsUpgradeTest.test_metadata_upgrade`:
   
   trunk failed: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4220/
   this branch succeeded: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4222/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-14 Thread GitBox


mjsax commented on pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#issuecomment-708678450


   Address comments. @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9438: KAFKA-10613: Only set leader epoch when list-offset version >= 4

2020-10-14 Thread GitBox


ijuma commented on pull request #9438:
URL: https://github.com/apache/kafka/pull/9438#issuecomment-708677767


   Can we add a test please?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10613) Broker should not set leader epoch if the list-offset request version < 4

2020-10-14 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214294#comment-17214294
 ] 

Ismael Juma commented on KAFKA-10613:
-

When did we regress here?

> Broker should not set leader epoch if the list-offset request version < 4
> -
>
> Key: KAFKA-10613
> URL: https://issues.apache.org/jira/browse/KAFKA-10613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> The list-offset response added a new field in version 4:
> {code}
> { "name": "LeaderEpoch", "type": "int32", "versions": "4+" }
> {code}
> And the compiled code would throw UnsupportedVersionException if that field 
> is not default (-1) with version < 4. However, on the broker side we forget 
> to add the logic to not setting this field based on the request version. This 
> would cause old versioned clients' list-offset call to always get 
> UnsupportedVersionException and an empty result would be returned.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang opened a new pull request #9438: KAFKA-10613: Only set leader epoch when list-offset version >= 4

2020-10-14 Thread GitBox


guozhangwang opened a new pull request #9438:
URL: https://github.com/apache/kafka/pull/9438


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10613) Broker should not set leader epoch if the list-offset request version < 4

2020-10-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10613:
-

 Summary: Broker should not set leader epoch if the list-offset 
request version < 4
 Key: KAFKA-10613
 URL: https://issues.apache.org/jira/browse/KAFKA-10613
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang
Assignee: Guozhang Wang


The list-offset response added a new field in version 4:

{code}
{ "name": "LeaderEpoch", "type": "int32", "versions": "4+" }
{code}

And the compiled code would throw UnsupportedVersionException if that field is 
not default (-1) with version < 4. However, on the broker side we forget to add 
the logic to not setting this field based on the request version. This would 
cause old versioned clients' list-offset call to always get 
UnsupportedVersionException and an empty result would be returned.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request

2020-10-14 Thread GitBox


ijuma commented on pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#issuecomment-708671677


   `KafkaApisTest` has a few tests like that. Could we add one there? The main 
concern is that we will regress here if we don't have tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-14 Thread GitBox


mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504910261



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -900,9 +900,7 @@
 
 // These are not settable in the main Streams config; they are set by 
the StreamThread to pass internal
 // state into the assignor.
-public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = 
"__task.manager.instance__";
-public static final String 
STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = 
"__streams.metadata.state.instance__";
-public static final String STREAMS_ADMIN_CLIENT = 
"__streams.admin.client.instance__";
+public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = 
"__reference.container.instance__";
 public static final String ASSIGNMENT_ERROR_CODE = 
"__assignment.error.code__";
 public static final String NEXT_SCHEDULED_REBALANCE_MS = 
"__next.probing.rebalance.ms__";
 public static final String TIME = "__time__";

Review comment:
   Ah. Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cyrusv commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect

2020-10-14 Thread GitBox


cyrusv commented on a change in pull request #8918:
URL: https://github.com/apache/kafka/pull/8918#discussion_r504963920



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -492,7 +492,7 @@ public boolean commitOffsets() {
 // to persistent storage
 
 // Next we need to wait for all outstanding messages to finish 
sending
-log.info("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());
+log.debug("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());

Review comment:
   Sounds good, updated





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Lincong commented on pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request

2020-10-14 Thread GitBox


Lincong commented on pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#issuecomment-708650004


   @ijuma One way to test it could be to mock objects passed to the `KafkaApis` 
class. The `MetadataCache` object should be mocked in a way that upon the first 
invocation on the method `getAllTopics` to simulate the scenario in which the 
metadata cache got updated "concurrently" with `handleTopicMetadataRequest`.
   
   However, IMO, this change is pretty minor. So, I am wondering how necessary 
it is to add tests for it?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

2020-10-14 Thread GitBox


rajinisivaram commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r504954049



##
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##
@@ -88,6 +90,13 @@ class DelayedFetch(delayMs: Long,
 try {
   if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
 val partition = 
replicaManager.getPartitionOrException(topicPartition)
+
+// Case H: Return diverging epoch in response to trigger truncation
+if (fetchStatus.hasDivergingEpoch) {

Review comment:
   @hachikuji Thanks for the review. Makes sense, I have added a new check 
at the end instead of this one, not sure if there is a better way to check.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

2020-10-14 Thread GitBox


rajinisivaram commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r504952903



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig,
   fetchInfos.foreach { case (topicPartition, partitionData) =>
 logReadResultMap.get(topicPartition).foreach(logReadResult => {
   val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
-  fetchPartitionStatus += (topicPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
+  fetchPartitionStatus += (topicPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData, 
logReadResult.divergingEpoch.nonEmpty))

Review comment:
   ah, yes, so we don't need to check the original result in DelayedFetch, 
we return immediately here. Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-14 Thread GitBox


bbejeck commented on pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#issuecomment-708635800


   Java 8 failed 
[StreamTableJoinTopologyOptimizationIntegrationTest/](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9237/17/testReport/junit/org.apache.kafka.streams.integration/StreamTableJoinTopologyOptimizationIntegrationTest/Build___JDK_8___shouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___none_/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed)
 seems like a flaky test failure as it was due directory clean-up.
   
   Java 11 and Java 15 passed. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] belugabehr opened a new pull request #9437: KAFKA-10612: Log When SSL Authentication is in Unexpected State

2020-10-14 Thread GitBox


belugabehr opened a new pull request #9437:
URL: https://github.com/apache/kafka/pull/9437


   Additional logging, no functional changes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10612) Log When SSL Authentication is in Unexpected State

2020-10-14 Thread David Mollitor (Jira)
David Mollitor created KAFKA-10612:
--

 Summary: Log When SSL Authentication is in Unexpected State
 Key: KAFKA-10612
 URL: https://issues.apache.org/jira/browse/KAFKA-10612
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mollitor


Recently got into some deep troubleshooting of Kafka SSL client authentication. 
 I was looking at a lot of SSL debug logging and seeing that the client was 
correctly passing its client credentials but the client would not authorize 
correctly with Apache Sentry.

I finally discovered that the issue was simply that {{ssl.client.auth}} was set 
to {{none}}. D'oh.  

It would have been helpful to get some broker logging indicating that the 
client is doing SSL authentication but that none is required by the server.  I 
doubt many environments would bother setting it up if it wasn't going to be 
used.

https://kafka.apache.org/documentation/#ssl.client.auth



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-14 Thread GitBox


vvcephei commented on a change in pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#discussion_r504936524



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+public interface WrappingNullableSerde extends Serde {
+@SuppressWarnings({"unchecked", "rawtypes"})
+default void setIfUnset(final Serde defaultKeySerde, final 
Serde defaultValueSerde) {
+final Serializer serializer = this.serializer();

Review comment:
   No worries, I think this is actually always ok in practice, since I 
think that all the Wrapping implementations keep their de/serializers in fields 
anyway. But then again, we've had _so_ many bugs with serdes that I feel a bit 
twitchy about just ignoring the potential for a future bug that I happened to 
notice here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request

2020-10-14 Thread GitBox


ijuma commented on pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#issuecomment-708617750


   Thanks for the PR. Thoughts on how to test this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thake commented on a change in pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-14 Thread GitBox


thake commented on a change in pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#discussion_r504924920



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+public interface WrappingNullableSerde extends Serde {
+@SuppressWarnings({"unchecked", "rawtypes"})
+default void setIfUnset(final Serde defaultKeySerde, final 
Serde defaultValueSerde) {
+final Serializer serializer = this.serializer();

Review comment:
   Ah, my bad. I didn't get the API right. I propably have worked too much 
on kotlin code with a lot of immutables :) The abstract class is a good idea to 
circumvent this problem. I'll check the PR and comment on it directly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect

2020-10-14 Thread GitBox


rhauch commented on a change in pull request #8918:
URL: https://github.com/apache/kafka/pull/8918#discussion_r504920005



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -492,7 +492,7 @@ public boolean commitOffsets() {
 // to persistent storage
 
 // Next we need to wait for all outstanding messages to finish 
sending
-log.info("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());
+log.debug("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());

Review comment:
   Yeah, I think the log messages other than this one are fine as DEBUG.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10611) Merge log error to avoid double error

2020-10-14 Thread Benoit MAGGI (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benoit MAGGI updated KAFKA-10611:
-
External issue URL: https://github.com/apache/kafka/pull/9407

> Merge log error to avoid double error
> -
>
> Key: KAFKA-10611
> URL: https://issues.apache.org/jira/browse/KAFKA-10611
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Benoit MAGGI
>Priority: Trivial
>
> When using an error tracking system, 2 error logs means 2 different alerts.
> It's best to group the logs and have one error with all information.
> For example when using with [Sentry|https://sentry.io/welcome/], this [double 
> line|https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L187]
>  of log.error will create 2 different Issues.
> One can merge the issues but it will be simpler to have a single error log 
> line
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10472) Consider migrating Topology methods to the Builder pattern

2020-10-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214221#comment-17214221
 ] 

Matthias J. Sax commented on KAFKA-10472:
-

Just create an account and share your account name, so I can grant permissions.

> Consider migrating Topology methods to the Builder pattern
> --
>
> Key: KAFKA-10472
> URL: https://issues.apache.org/jira/browse/KAFKA-10472
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Huynh Quang Thao
>Priority: Minor
>  Labels: need-kip
>
> During code review for KIP-478, I got this feedback from [~bbejeck] .
> In Topology, we have methods like this:
> {code:java}
> public synchronized  Topology addGlobalStore(
>   final StoreBuilder storeBuilder,
>   final String sourceName,
>   final TimestampExtractor timestampExtractor,
>   final Deserializer keyDeserializer,
>   final Deserializer valueDeserializer,
>   final String topic,
>   final String processorName,
>   final ProcessorSupplier stateUpdateSupplier){code}
> It would probably be better UX to preset a builder interface like:
> {code:java}
> public synchronized  Topology addGlobalStore(
>   AddGlobalStoreParameters.fromStoreBuilder(storeBuiler)
>   .withSourceName(sourceName)
>   .withSourceTopic(topic)
>   .withTimestampExtractor(timestampExtractor)
>   .withKeyDeserializer(keyDeserializer)
>   .withValueDeserializer(valueDeserializer)
>   .withProcessorName(processorName)
>   .withStateUpdateSupplier(stateUpdateSupplier)
> ){code}
>  
> Note: new API design proposals should take into account the proposed grammar: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bmaggi commented on pull request #9407: MINOR: Merge log error to avoid double error

2020-10-14 Thread GitBox


bmaggi commented on pull request #9407:
URL: https://github.com/apache/kafka/pull/9407#issuecomment-708611073


   Thanks for the comments.
   
   Following suggestions: 
   * I created a JIRA ticket 
[KAFKA-10611](https://issues.apache.org/jira/browse/KAFKA-10611) on the subject 
   * I also added a new commit using the suggested text (closer to the old 
version) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cyrusv commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect

2020-10-14 Thread GitBox


cyrusv commented on a change in pull request #8918:
URL: https://github.com/apache/kafka/pull/8918#discussion_r504916758



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -492,7 +492,7 @@ public boolean commitOffsets() {
 // to persistent storage
 
 // Next we need to wait for all outstanding messages to finish 
sending
-log.info("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());
+log.debug("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());

Review comment:
   I do not feel strongly about this particular message -- it represents 
about 2% of the connect log volume in my deployment, so not too bad. Are you 
still onboard with the other log levels I've updated in the PR? In which case, 
I will revert this one?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system

2020-10-14 Thread GitBox


kowshik commented on pull request #9409:
URL: https://github.com/apache/kafka/pull/9409#issuecomment-708609368


   @abbccdda Thanks for the review! I've addressed the most recent comment.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system

2020-10-14 Thread GitBox


kowshik commented on a change in pull request #9409:
URL: https://github.com/apache/kafka/pull/9409#discussion_r504915393



##
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+val opts = new FeatureCommandOptions(args)
+val featureApis = new FeatureApis(opts)
+var exitCode = 0
+try {
+  featureApis.execute()
+} catch {
+  case e: IllegalArgumentException =>
+printException(e)
+opts.parser.printHelpOn(System.err)
+exitCode = 1
+  case _: UpdateFeaturesException =>
+exitCode = 1
+  case e: Throwable =>
+printException(e)
+exitCode = 1
+} finally {
+  featureApis.close()
+  Exit.exit(exitCode)
+}
+  }
+
+  private def printException(exception: Throwable): Unit = {
+System.err.println("\nError encountered when executing command: " + 
Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends 
RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature 
APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = 
BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit 
= {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10472) Consider migrating Topology methods to the Builder pattern

2020-10-14 Thread Huynh Quang Thao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214210#comment-17214210
 ] 

Huynh Quang Thao commented on KAFKA-10472:
--

Hi [~mjsax],

I followed on 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 
|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]

However, I don't have permission to create a KIP document. What is the next 
step I should do to have the permission? Or I should create somewhere else 
first? Thanks.

> Consider migrating Topology methods to the Builder pattern
> --
>
> Key: KAFKA-10472
> URL: https://issues.apache.org/jira/browse/KAFKA-10472
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Huynh Quang Thao
>Priority: Minor
>  Labels: need-kip
>
> During code review for KIP-478, I got this feedback from [~bbejeck] .
> In Topology, we have methods like this:
> {code:java}
> public synchronized  Topology addGlobalStore(
>   final StoreBuilder storeBuilder,
>   final String sourceName,
>   final TimestampExtractor timestampExtractor,
>   final Deserializer keyDeserializer,
>   final Deserializer valueDeserializer,
>   final String topic,
>   final String processorName,
>   final ProcessorSupplier stateUpdateSupplier){code}
> It would probably be better UX to preset a builder interface like:
> {code:java}
> public synchronized  Topology addGlobalStore(
>   AddGlobalStoreParameters.fromStoreBuilder(storeBuiler)
>   .withSourceName(sourceName)
>   .withSourceTopic(topic)
>   .withTimestampExtractor(timestampExtractor)
>   .withKeyDeserializer(keyDeserializer)
>   .withValueDeserializer(valueDeserializer)
>   .withProcessorName(processorName)
>   .withStateUpdateSupplier(stateUpdateSupplier)
> ){code}
>  
> Note: new API design proposals should take into account the proposed grammar: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10611) Merge log error to avoid double error

2020-10-14 Thread Benoit MAGGI (Jira)
Benoit MAGGI created KAFKA-10611:


 Summary: Merge log error to avoid double error
 Key: KAFKA-10611
 URL: https://issues.apache.org/jira/browse/KAFKA-10611
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.6.0
Reporter: Benoit MAGGI


When using an error tracking system, 2 error logs means 2 different alerts.
It's best to group the logs and have one error with all information.

For example when using with [Sentry|https://sentry.io/welcome/], this [double 
line|https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L187]
 of log.error will create 2 different Issues.
One can merge the issues but it will be simpler to have a single error log line

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-14 Thread GitBox


mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504910261



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -900,9 +900,7 @@
 
 // These are not settable in the main Streams config; they are set by 
the StreamThread to pass internal
 // state into the assignor.
-public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = 
"__task.manager.instance__";
-public static final String 
STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = 
"__streams.metadata.state.instance__";
-public static final String STREAMS_ADMIN_CLIENT = 
"__streams.admin.client.instance__";
+public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = 
"__reference.container.instance__";
 public static final String ASSIGNMENT_ERROR_CODE = 
"__assignment.error.code__";
 public static final String NEXT_SCHEDULED_REBALANCE_MS = 
"__next.probing.rebalance.ms__";
 public static final String TIME = "__time__";

Review comment:
   Ah. Ack.
   
   Actually, we can also pull `INTERNAL_TASK_ASSIGNOR_CLASS` and 
`ASSIGNMENT_LISTENER` into the reference container.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-14 Thread GitBox


mjsax commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504909676



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -192,12 +191,11 @@ public String toString() {
  */
 @Override
 public void configure(final Map configs) {
-final AssignorConfiguration assignorConfiguration = new 
AssignorConfiguration(configs);
+assignorConfiguration = new AssignorConfiguration(configs);

Review comment:
   I never intended to drop it. Maybe I miss understand your comment?
   
   We could replace the field `StreamsParttionAssignor#taskManager` etc with 
`StreamsPartitionAssigner#referenceContainer` but it just make the code lines 
longer each time we need to access the TM (etc). Thus, it seems to make the 
code more readable if we just "extract" those field from the reference 
container once?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9436: MINOR: Check for active controller in UpdateFeatures request processing logic

2020-10-14 Thread GitBox


kowshik commented on pull request #9436:
URL: https://github.com/apache/kafka/pull/9436#issuecomment-708598843


   cc @junrao @abbccdda 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik opened a new pull request #9436: MINOR: Check for active controller in UpdateFeatures request processing logic

2020-10-14 Thread GitBox


kowshik opened a new pull request #9436:
URL: https://github.com/apache/kafka/pull/9436


   Tuned the code a bit to check for active controller upfront in 
UpdateFeatures request processing logic, before the event is queued.
   
   **Tests:**
   
   Relying on existing test, particularly: 
`UpdateFeaturesTest.testShouldFailRequestIfNotController`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #9407: MINOR: Merge log error to avoid double error

2020-10-14 Thread GitBox


rhauch commented on a change in pull request #9407:
URL: https://github.com/apache/kafka/pull/9407#discussion_r504901806



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##
@@ -184,8 +184,7 @@ private void doRun() throws InterruptedException {
 
 execute();
 } catch (Throwable t) {
-log.error("{} Task threw an uncaught and unrecoverable exception", 
this, t);
-log.error("{} Task is being killed and will not recover until 
manually restarted", this);
+log.error("{} Task threw an uncaught and unrecoverable exception, 
task is being killed and will not recover until manually restarted", this, t);

Review comment:
   I also like @kkonstantine's suggested format.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect

2020-10-14 Thread GitBox


rhauch commented on a change in pull request #8918:
URL: https://github.com/apache/kafka/pull/8918#discussion_r504899011



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -492,7 +492,7 @@ public boolean commitOffsets() {
 // to persistent storage
 
 // Next we need to wait for all outstanding messages to finish 
sending
-log.info("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());
+log.debug("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());

Review comment:
   I'm not convinced that changing this to DEBUG is worth it. The number of 
source records output in this log message can be instrumental in some cases.
   
   For example, if the producer does not keep up with the source task (for 
whatever reason), this currently INFO-level message appears shortly before the 
following ERROR-level message:
   ```
   ... ERROR WorkerSourceTask{id=...} Failed to commit offsets 
(org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
   ```
   The number of outstanding source records reported by this line is an 
important factor in determining how to tune the producer and 
`offset.flush.timeout.ms` value.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7061) Enhanced log compaction

2020-10-14 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214188#comment-17214188
 ] 

Bill Bejeck commented on KAFKA-7061:


PR is still open, and feature freeze for 2.7 passed on 10/7.  So I'm moving 
this out of the expected KIP list for the 2.7 release.

> Enhanced log compaction
> ---
>
> Key: KAFKA-7061
> URL: https://issues.apache.org/jira/browse/KAFKA-7061
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.5.0
>Reporter: Luis Cabral
>Assignee: Senthilnathan Muthusamy
>Priority: Major
>  Labels: kip
>
> Enhance log compaction to support more than just offset comparison, so the 
> insertion order isn't dictating which records to keep.
> Default behavior is kept as it was, with the enhanced approached having to be 
> purposely activated.
>  The enhanced compaction is done either via the record timestamp, by settings 
> the new configuration as "timestamp" or via the record headers by setting 
> this configuration to anything other than the default "offset" or the 
> reserved "timestamp".
> See 
> [KIP-280|https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction]
>  for more details.
> +From Guozhang:+ We should emphasize on the WIKI that the newly introduced 
> config yields to the existing "log.cleanup.policy", i.e. if the latter's 
> value is `delete` not `compact`, then the previous config would be ignored.
> +From Jun Rao:+ With the timestamp/header strategy, the behavior of the 
> application may need to change. In particular, the application can't just 
> blindly take the record with a larger offset and assuming that it's the value 
> to keep. It needs to check the timestamp or the header now. So, it would be 
> useful to at least document this. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread Lincong Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214130#comment-17214130
 ] 

Lincong Li commented on KAFKA-10606:


Hi [~tombentley] [~ijuma] I just created a PR. Let me know if you have any 
feedback or suggestions. Thanks!

https://github.com/apache/kafka/pull/9435

> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Assignee: Lincong Li
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Lincong opened a new pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request

2020-10-14 Thread GitBox


Lincong opened a new pull request #9435:
URL: https://github.com/apache/kafka/pull/9435


   
   There is a bug that causes fetch-all-topic-metadata requests triggering
   auto topic creation. Details are described in KAFKA-10606. This is the
   simplest way to fix this bug on the broker side.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9915) Throttle Create Topic, Create Partition and Delete Topic Operations

2020-10-14 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214121#comment-17214121
 ] 

Ismael Juma commented on KAFKA-9915:


[~dajac] Sounds good, I missed it when I was looking before.

> Throttle Create Topic, Create Partition and Delete Topic Operations
> ---
>
> Key: KAFKA-9915
> URL: https://issues.apache.org/jira/browse/KAFKA-9915
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>
> This tracks the completion of the KIP-599: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations].
>  If/when the KIP is approved by the community, we will create smaller 
> sub-tasks to track overall progress.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9915) Throttle Create Topic, Create Partition and Delete Topic Operations

2020-10-14 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214116#comment-17214116
 ] 

Bill Bejeck commented on KAFKA-9915:


[~dajac] can we resolve this issue? Looks like it's completed.

> Throttle Create Topic, Create Partition and Delete Topic Operations
> ---
>
> Key: KAFKA-9915
> URL: https://issues.apache.org/jira/browse/KAFKA-9915
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>
> This tracks the completion of the KIP-599: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations].
>  If/when the KIP is approved by the community, we will create smaller 
> sub-tasks to track overall progress.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9929) Support reverse iterator on WindowStore

2020-10-14 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-9929.

Resolution: Fixed

Resolved via 
!https://github.com/favicon.ico|width=16,height=16!  [GitHub Pull Request 
#9137|https://github.com/apache/kafka/pull/9137]
!https://github.com/favicon.ico|width=16,height=16!  [GitHub Pull Request 
#9138|https://github.com/apache/kafka/pull/9138]
!https://github.com/favicon.ico|width=16,height=16!  [GitHub Pull Request 
#9139|https://github.com/apache/kafka/pull/9139]
!https://github.com/favicon.ico|width=16,height=16!  [GitHub Pull Request 
#9321|https://github.com/apache/kafka/pull/9321]

> Support reverse iterator on WindowStore
> ---
>
> Key: KAFKA-9929
> URL: https://issues.apache.org/jira/browse/KAFKA-9929
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: needs-kip
>
> Currently, WindowStore fetch operations return an iterator sorted from 
> earliest to latest result:
> ```
> * For each key, the iterator guarantees ordering of windows, starting from 
> the oldest/earliest
> * available window to the newest/latest window.
> ```
>  
> We have a use-case where traces are stored in a WindowStore 
> and use Kafka Streams to create a materialized view of traces. A query 
> request comes with a time range (e.g. now-1h, now) and want to return the 
> most recent results, i.e. fetch from this period of time, iterate and pattern 
> match latest/most recent traces, and if enough results, then reply without 
> moving further on the iterator.
> Same store is used to search for previous traces. In this case, it search a 
> key for the last day, if found traces, we would also like to iterate from the 
> most recent.
> RocksDb seems to support iterating backward and forward: 
> [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound]
>  
> For reference: This in some way extracts some bits from this previous issue: 
> https://issues.apache.org/jira/browse/KAFKA-4212:
>  
> > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via 
> > segment dropping, but it stores multiple items per key, based on their 
> > timestamp. But this store can be repurposed as a cache by fetching the 
> > items in reverse chronological order and returning the first item found.
>  
> Would like to know if there is any impediment on RocksDb or  WindowStore to 
> support this.
> Adding an argument to reverse in current fetch methods would be great:
> ```
> WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD)
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10044) Deprecate ConsumerConfig#addDeserializerToConfig and ProducerConfig#addSerializerToConfig

2020-10-14 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-10044.
-
Resolution: Fixed

Resolved via https://github.com/apache/kafka/pull/9013

> Deprecate ConsumerConfig#addDeserializerToConfig and 
> ProducerConfig#addSerializerToConfig
> -
>
> Key: KAFKA-10044
> URL: https://issues.apache.org/jira/browse/KAFKA-10044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
>
> from [~ijuma] suggestion 
> (https://github.com/apache/kafka/pull/8605#discussion_r430431086)
> {quote}
> I think you could submit a KIP for the deprecation of the two methods in this 
> class, but we can merge the other changes in the meantime.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-10606:
---

Assignee: Lincong Li  (was: Tom Bentley)

> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Assignee: Lincong Li
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system

2020-10-14 Thread GitBox


abbccdda commented on a change in pull request #9409:
URL: https://github.com/apache/kafka/pull/9409#discussion_r504831412



##
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+val opts = new FeatureCommandOptions(args)
+val featureApis = new FeatureApis(opts)
+var exitCode = 0
+try {
+  featureApis.execute()
+} catch {
+  case e: IllegalArgumentException =>
+printException(e)
+opts.parser.printHelpOn(System.err)
+exitCode = 1
+  case _: UpdateFeaturesException =>
+exitCode = 1
+  case e: Throwable =>
+printException(e)
+exitCode = 1
+} finally {
+  featureApis.close()
+  Exit.exit(exitCode)
+}
+  }
+
+  private def printException(exception: Throwable): Unit = {
+System.err.println("\nError encountered when executing command: " + 
Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends 
RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature 
APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = 
BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit 
= {

Review comment:
   nit: we could make these functions package private





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread Lincong Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214088#comment-17214088
 ] 

Lincong Li commented on KAFKA-10606:


Hi [~tombentley]. Thanks for your comment. I will open a PR with the same 
change in 
https://github.com/linkedin/kafka/pull/94/commits/95ad9add181db980914a13a6ffe1a88cd5636a6d

Thanks for your comment as well [~ijuma], I prefer broker-side fix instead of 
client-side fix since the client-side fix requires Kafka users' client version 
upgrade and it is much more difficult to do compared to deploy a new version of 
Kafka server. Let me know whether you have any particular concerns regarding 
the broker-side fix.

> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Assignee: Tom Bentley
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5235) GetOffsetShell: support for multiple topics and consumer configuration override

2020-10-14 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214081#comment-17214081
 ] 

Ron Dagostino commented on KAFKA-5235:
--

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-635%3A+GetOffsetShell%3A+support+for+multiple+topics+and+consumer+configuration+override

> GetOffsetShell: support for multiple topics and consumer configuration 
> override
> ---
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Assignee: Daniel Urban
>Priority: Major
>  Labels: kip, tool
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently, GetOffsetShell only allows fetching the offsets of a single topic 
> with an optional list of which partitions to describe. Besides that, it does 
> not allow consumer properties to be overridden. The tool does not have a 
> dedicated script under bin either.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214080#comment-17214080
 ] 

Tom Bentley commented on KAFKA-10606:
-

[~andrewlinc...@gmail.com] agreed that a broker-side fix is necessary. Are you 
going to open a PR for Apache Kafka too? I'd assigned this to myself, since you 
didn't assign it to yourself, but if you're intending to open a PR then go 
ahead.

> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Assignee: Tom Bentley
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214077#comment-17214077
 ] 

Ismael Juma commented on KAFKA-10606:
-

Seems like the way to fix this is to pass `false` as the 
`allowAutoTopicCreation` parameter of `getTopicMetadata` when it's a all topics 
request.

> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Assignee: Tom Bentley
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214077#comment-17214077
 ] 

Ismael Juma edited comment on KAFKA-10606 at 10/14/20, 5:19 PM:


Seems like the way to fix this is to pass `false` as the 
`allowAutoTopicCreation` parameter of `getTopicMetadata` when it's an all 
topics request.


was (Author: ijuma):
Seems like the way to fix this is to pass `false` as the 
`allowAutoTopicCreation` parameter of `getTopicMetadata` when it's a all topics 
request.

> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Assignee: Tom Bentley
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

2020-10-14 Thread GitBox


hachikuji commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r504839142



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig,
   fetchInfos.foreach { case (topicPartition, partitionData) =>
 logReadResultMap.get(topicPartition).foreach(logReadResult => {
   val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
-  fetchPartitionStatus += (topicPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
+  fetchPartitionStatus += (topicPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData, 
logReadResult.divergingEpoch.nonEmpty))

Review comment:
   Hmm.. If the `LogReadResult` has a diverging epoch, wouldn't we want to 
respond immediately?

##
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##
@@ -88,6 +90,13 @@ class DelayedFetch(delayMs: Long,
 try {
   if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
 val partition = 
replicaManager.getPartitionOrException(topicPartition)
+
+// Case H: Return diverging epoch in response to trigger truncation
+if (fetchStatus.hasDivergingEpoch) {

Review comment:
   Here we are using the status from the original fetch. I am wondering if 
we need to recheck below since it is possible to get a truncation while a fetch 
is in purgatory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread Lincong Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214051#comment-17214051
 ] 

Lincong Li commented on KAFKA-10606:


[~huxi_2b] Thanks for your comment. However, clients set this to true 
explicitly on all paths, and even if they didn't this breaks behavior when 
getting MD for particular topics (where auto creation may be expected because 
the topics were named. 

For example, 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37

Also changing default values for RPC objects changes the behavior for older 
clients. Thus it is a breaking API change. So even if we want to change the 
default value of that field (for hypothetical clients that don't explicitly set 
it), it needs to be in a new RPC version.

A simple way to fix it on the server side is:

https://github.com/linkedin/kafka/pull/94/commits/95ad9add181db980914a13a6ffe1a88cd5636a6d

I prefer the change that does not require Kafka client version upgrade. Because 
in some cases (e.g. LinkedIn), client version upgrade means hundreds or even 
thousands of Kafka users will have to bump Kafka client version in their 
project dependency and it makes this approach less feasible compared to the 
server side fix.

There are definitely other options with more complex (or fancier) way to fix 
this issue on the server side as well.


> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Assignee: Tom Bentley
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10610) Integration tests for each CLI command to make sure it continues working with existing minimal authorizations

2020-10-14 Thread Chris Johnson (Jira)
Chris Johnson created KAFKA-10610:
-

 Summary: Integration tests for each CLI command to make sure it 
continues working with existing minimal authorizations
 Key: KAFKA-10610
 URL: https://issues.apache.org/jira/browse/KAFKA-10610
 Project: Kafka
  Issue Type: Test
  Components: tools
Reporter: Chris Johnson


It would be nice to have test coverage of all CLI commands (kafka-topics, 
kafka-acls, kafka-configs, kafka-console-consumer, etc) to ensure that they 
work for a user given the minimal permissions expected for each command.

This will help to catch regressions where a change to an existing command's 
functionality unwittingly requires expanded permissions.

An example regression these kinds of tests would have caught: 
https://issues.apache.org/jira/browse/KAFKA-10212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-14 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r504803409



##
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##
@@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long,
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds 
the minimum bytes
+   * Case H: A diverging epoch was found, return response to trigger truncation

Review comment:
   PR with the changes in DelayedFetch and FetchSession: 
https://github.com/apache/kafka/pull/9434

##
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##
@@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long,
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds 
the minimum bytes
+   * Case H: A diverging epoch was found, return response to trigger truncation

Review comment:
   PR with the changes in DelayedFetch and FetchSession: 
https://github.com/apache/kafka/pull/9434





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch

2020-10-14 Thread GitBox


rajinisivaram opened a new pull request #9434:
URL: https://github.com/apache/kafka/pull/9434


   In 2.7, we added lastFetchedEpoch to fetch requests and divergingEpoch to 
fetch responses. We are not using these for truncation yet, but in order to use 
these for truncation with IBP 2.7 onwards in the next release, we should make 
sure that we handle these in all the supporting classes even in 2.7.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks

2020-10-14 Thread GitBox


vvcephei commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r504792364



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -482,8 +486,10 @@ boolean tryToCompleteRestoration() {
 if (restored.containsAll(task.changelogPartitions())) {
 try {
 task.completeRestoration();
-} catch (final TimeoutException e) {
-log.debug("Could not complete restoration for {} due 
to {}; will retry", task.id(), e);
+task.clearTaskTimeout();
+} catch (final TimeoutException timeoutException) {
+task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
+log.debug("Could not complete restoration for {} due 
to {}; will retry", task.id(), timeoutException);

Review comment:
   ```suggestion
   log.debug(String.format("Could not complete 
restoration for %s; will retry", task.id()), timeoutException);
   ```
   
   It might be a good idea to add tests for the log messages so we can tell if 
they're actually properly formatted or not. Hopefully, the log4j upgrade makes 
it easier to detect these logging bugs.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -147,9 +147,12 @@ public void update(final Set 
topicPartitions, final Map 
task-timeout-deadline}
+ */
 void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
- final TimeoutException timeoutException,
- final Logger log) throws StreamsException 
{
+ final Exception cause,
+ final Logger log) {

Review comment:
   It seems like we ought to just define `log` at the AbstractTask level 
and avoid having two almost identical `maybeInitTaskTimeoutOrThrow` method 
definitions.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -148,7 +148,7 @@ public void update(final Set 
topicPartitions, final Map activeTasks = new LinkedList<>();
 for (final Task task : tasks.values()) {
 try {
 task.initializeIfNeeded();
-} catch (final LockException | TimeoutException e) {
+task.clearTaskTimeout();
+} catch (final LockException retriableException) {
 // it is possible that if there are multiple threads within 
the instance that one thread
 // trying to grab the task from the other, while the other has 
not released the lock since
 // it did not participate in the rebalance. In this case we 
can just retry in the next iteration
-log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
+log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), retriableException);

Review comment:
   ```suggestion
   log.debug(String.format("Could not initialize %s due to the 
following exception; will retry", task.id()), retriableException);
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-14 Thread GitBox


vvcephei commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504775858



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -192,12 +191,11 @@ public String toString() {
  */
 @Override
 public void configure(final Map configs) {
-final AssignorConfiguration assignorConfiguration = new 
AssignorConfiguration(configs);
+assignorConfiguration = new AssignorConfiguration(configs);

Review comment:
   Thanks for the update! Are you still planning to drop the 
`assignorConfiguration` field in favor of a `referenceContainer` field?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-10606:
---

Assignee: Tom Bentley

> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Assignee: Tom Bentley
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor

2020-10-14 Thread GitBox


vvcephei commented on a change in pull request #9384:
URL: https://github.com/apache/kafka/pull/9384#discussion_r504763968



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -900,9 +900,7 @@
 
 // These are not settable in the main Streams config; they are set by 
the StreamThread to pass internal
 // state into the assignor.
-public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = 
"__task.manager.instance__";
-public static final String 
STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = 
"__streams.metadata.state.instance__";
-public static final String STREAMS_ADMIN_CLIENT = 
"__streams.admin.client.instance__";
+public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = 
"__reference.container.instance__";
 public static final String ASSIGNMENT_ERROR_CODE = 
"__assignment.error.code__";
 public static final String NEXT_SCHEDULED_REBALANCE_MS = 
"__next.probing.rebalance.ms__";
 public static final String TIME = "__time__";

Review comment:
   Sorry, @mjsax , I was referring to all three of "assignment error code", 
"next scheduled rebalance ms", and "time".





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

2020-10-14 Thread GitBox


vvcephei commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-708466216


   Oh, sorry, @thake , I didn't see your replies when I submitted my review 
last night.
   
   Thanks both for your initial feedback and your reply. I'd certainly never 
pass up the opportunity to hear such concerns or ideas, especially before a 
release, and I really appreciate that you took the time to raise them.
   
   Also, thanks for your contribution in this PR!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-14 Thread GitBox


rajinisivaram commented on pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#issuecomment-708446946


   @hachikuji Thanks for the review, have addressed the comments so far.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-14 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r504731267



##
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##
@@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long,
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds 
the minimum bytes
+   * Case H: A diverging epoch was found, return response to trigger truncation

Review comment:
   Makes sense, will submit another PR with just those changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-14 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r504729807



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -432,14 +455,22 @@ abstract class AbstractFetcherThread(name: String,
   failedPartitions.removeAll(initialFetchStates.keySet)
 
   initialFetchStates.forKeyValue { (tp, initialFetchState) =>
-// We can skip the truncation step iff the leader epoch matches the 
existing epoch
+// For IBP 2.7 onwards, we can rely on truncation based on diverging 
data returned in fetch responses.
+// For older versions, we can skip the truncation step iff the leader 
epoch matches the existing epoch
 val currentState = partitionStates.stateValue(tp)
-val updatedState = if (currentState != null && 
currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {
+val updatedState = if (initialFetchState.offset >= 0 && 
isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty) {
+  if (currentState != null)
+currentState

Review comment:
   I refactored this code a bit and added a check for Fetching state. Not 
sure if I have missed something though. I think we can continue to fetch 
without truncating if currentState is Fetching when `lastFetchedEpoch` is 
known. If we need to truncate, we will do that later when we get told about 
diverging epochs. Does that make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-14 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r504727381



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -408,9 +428,12 @@ abstract class AbstractFetcherThread(name: String,
   def markPartitionsForTruncation(topicPartition: TopicPartition, 
truncationOffset: Long): Unit = {
 partitionMapLock.lockInterruptibly()
 try {
+  // It is safe to reset `lastFetchedEpoch` here since we don't expect 
diverging offsets

Review comment:
   Updated. The method is still there for older versions, but it is now 
disabled with IBP 2.7.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-14 Thread GitBox


rajinisivaram commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r504726150



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -813,8 +852,9 @@ case class OffsetTruncationState(offset: Long, 
truncationCompleted: Boolean) {
   override def toString: String = 
"offset:%d-truncationCompleted:%b".format(offset, truncationCompleted)
 }
 
-case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) {
+case class OffsetAndEpoch(offset: Long, leaderEpoch: Int, lastFetchedEpoch: 
Option[Int] = None) {

Review comment:
   I had initially added another class because I didn't want to change 
`OffsetAndEpoch`, but I removed that because it looked like too many similar 
classes. Your suggestion to use InitialFetchState sounds much better, updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-10-14 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r503489280



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##
@@ -52,10 +52,10 @@ public void testClusterConfigProperties() {
 "replication.factor", "4"));
 Map connectorProps = 
mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
 MirrorSourceConnector.class);
-assertEquals("source.cluster.bootstrap.servers is set",
-"servers-one", 
connectorProps.get("source.cluster.bootstrap.servers"));
-assertEquals("target.cluster.bootstrap.servers is set",
-"servers-two", 
connectorProps.get("target.cluster.bootstrap.servers"));
+assertEquals("source.bootstrap.servers is set",

Review comment:
   I think I have achieved what we want. Explicitly setting "source." 
prefix for props starting with consumer|producer|admin and setting 
"source.cluster." otherwise. All tests passed. The code runs fine on my infra.
   
   @mimaison please take a look whenever you get some time :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-10-14 Thread Pushkar Deole (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213887#comment-17213887
 ] 

Pushkar Deole edited comment on KAFKA-8803 at 10/14/20, 1:03 PM:
-

[~guozhang] I recently faced this issue on our lab. The weird thing is some of 
the events are getting processed by the stream while other are getting this 
error (probably some events might be consumed by different streams app) 

We use following versions:

Kafka broker 2.5.0 

Kafka streams and client 2.5.0

So do i need to upgrade to 2.5.1 version for both i.e. broker as well as 
clients ?


was (Author: pdeole):
[~guozhang] I recently faced this issue on our lab. We use following versions:

Kafka broker 2.5.0 

Kafka streams and client 2.5.0

So do i need to upgrade to 2.5.1 version for both i.e. broker as well as 
clients ?

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.5.0, 2.3.2, 2.4.2
>
> Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, 
> logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-10-14 Thread Pushkar Deole (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213887#comment-17213887
 ] 

Pushkar Deole edited comment on KAFKA-8803 at 10/14/20, 12:59 PM:
--

[~guozhang] I recently faced this issue on our lab. We use following versions:

Kafka broker 2.5.0 

Kafka streams and client 2.5.0

So do i need to upgrade to 2.5.1 version for both i.e. broker as well as 
clients ?


was (Author: pdeole):
[~guozhang] I recently faced this issue on our lab. We use following versions:

Kafka broker 2.5.0 

Kafka streams and client 2.5.0

So do i need to upgrade to 2.5.1 version for both ?

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.5.0, 2.3.2, 2.4.2
>
> Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, 
> logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-10-14 Thread Pushkar Deole (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213887#comment-17213887
 ] 

Pushkar Deole commented on KAFKA-8803:
--

[~guozhang] I recently faced this issue on our lab. We use following versions:

Kafka broker 2.5.0 

Kafka streams and client 2.5.0

So do i need to upgrade to 2.5.1 version for both ?

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.5.0, 2.3.2, 2.4.2
>
> Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, 
> logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-10-14 Thread GitBox


tombentley commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-708340163


   @abbccdda please could you take a look since you opened the JIRA? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-10-14 Thread GitBox


tombentley opened a new pull request #9433:
URL: https://github.com/apache/kafka/pull/9433


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics

2020-10-14 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17213813#comment-17213813
 ] 

huxihx edited comment on KAFKA-10606 at 10/14/20, 10:45 AM:


I am wondering if we could change the default value for 
_AllowAutoTopicCreation_ to _false_ in MetadataRequest.json. In doing so could 
we have ALL_TOPICS_REQUEST_DATA actually disable auto-creation.


was (Author: huxi_2b):
I am wondering if we could change the default value for 
_AllowAutoTopicCreation_ to _false_ in MetadataRequest.json.

> Auto create non-existent topics when fetching metadata for all topics
> -
>
> Key: KAFKA-10606
> URL: https://issues.apache.org/jira/browse/KAFKA-10606
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lincong Li
>Priority: Major
>
> The "allow auto topic creation" flag is hardcoded to be true for the 
> fetch-all-topic metadata request:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37
> In the below code, annotation claims that "*This never causes 
> auto-creation*". It it NOT true and auto topic creation still gets triggered 
> under some circumstances. So, this is a bug that needs to be fixed.
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68
> For example, the bug could be manifested in the below situation:
> A topic T is being deleted and a request to fetch metadata for all topics 
> gets sent to one broker. The broker reads names of all topics from its 
> metadata cache (shown below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196
> Then the broker authorizes all topics and makes sure that they are allowed to 
> be described. Then the broker tries to get metadata for every authorized 
> topic by reading the metadata cache again, once for every topic (show below).
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240
> However, the metadata cache could have been updated while the broker was 
> authorizing all topics and topic T and its metadata no longer exist in the 
> cache since the topic got deleted and metadata update requests eventually got 
> propagated from the controller to all brokers. So, at this point, when the 
> broker tries to get metadata for topic T from its cache, it realizes that it 
> does not exist and the broker tries to "auto create" topic T since the 
> allow-auto-topic-creation flag was set to true in all the fetch-all-topic 
> metadata requests.
> I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was 
> introduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >