[GitHub] [kafka] andrewchoi5 commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-07-03 Thread GitBox


andrewchoi5 commented on a change in pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#discussion_r448505615



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -499,7 +500,15 @@ class Partition(val topicPartition: TopicPartition,
 addingReplicas = addingReplicas,
 removingReplicas = removingReplicas
   )
-  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints)
+  try {
+this.createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints)

Review comment:
   @junrao -- My understanding was we wanted to catch 
ZooKeeperClientException when performing createLogIfNotExists within makeLeader 
and makeFollower methods. Did we want to place this somewhere else?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] andrewchoi5 commented on pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-07-03 Thread GitBox


andrewchoi5 commented on pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-653725987


   @junrao -- let me know if anything else needs your attention. Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8975: MINOR: Document that max.block.ms affects some transaction methods

2020-07-03 Thread GitBox


abbccdda commented on pull request #8975:
URL: https://github.com/apache/kafka/pull/8975#issuecomment-653723481


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10228) producer: NETWORK_EXCEPTION is thrown instead of a request timeout

2020-07-03 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17151148#comment-17151148
 ] 

Chia-Ping Tsai commented on KAFKA-10228:


{quote}
it might make sense to change the log line from debug to info or something more 
severe to give a clue about the cause:
{quote}

+1

[~hachikuji] WDYT? Adding new DEBUG level message helps users to trace the 
correct issue. 

> producer: NETWORK_EXCEPTION is thrown instead of a request timeout
> --
>
> Key: KAFKA-10228
> URL: https://issues.apache.org/jira/browse/KAFKA-10228
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.3.1
>Reporter: Christian Becker
>Priority: Major
>
> We're currently seeing an issue with the java client (producer), when message 
> producing runs into a timeout. Namely a NETWORK_EXCEPTION is thrown instead 
> of a timeout exception.
> *Situation and relevant code:*
> Config
> {code:java}
> request.timeout.ms: 200
> retries: 3
> acks: all{code}
> {code:java}
> for (UnpublishedEvent event : unpublishedEvents) {
> ListenableFuture> future;
> future = kafkaTemplate.send(new ProducerRecord<>(event.getTopic(), 
> event.getKafkaKey(), event.getPayload()));
> futures.add(future.completable());
> }
> CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();{code}
> We're using the KafkaTemplate from SpringBoot here, but it shouldn't matter, 
> as it's merely a wrapper. There we put in batches of messages to be sent.
> 200ms later, we can see the following in the logs: (not sure about the order, 
> they've arrived in the same ms, so our logging system might not display them 
> in the right order)
> {code:java}
> [Producer clientId=producer-1] Received invalid metadata error in produce 
> request on partition events-6 due to 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.. Going to request metadata update now
> [Producer clientId=producer-1] Got error produce response with correlation id 
> 3094 on topic-partition events-6, retrying (2 attempts left). Error: 
> NETWORK_EXCEPTION {code}
> There is also a corresponding error on the broker (within a few ms):
> {code:java}
> Attempting to send response via channel for which there is no open 
> connection, connection id XXX (kafka.network.Processor) {code}
> This was somewhat unexpected and sent us for a hunt across the infrastructure 
> for possible connection issues, but we've found none.
> Side note: In some cases the retries worked and the messages were 
> successfully produced.
> Only after many hours of heavy debugging, we've noticed, that the error might 
> be related to the low timeout setting. We've removed that setting now, as it 
> was a remnant from the past and no longer valid for our use-case. However in 
> order to avoid other people having that issue again and to simplify future 
> debugging, some form of timeout exception should be thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest

2020-07-03 Thread GitBox


chia7712 commented on pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#issuecomment-653707341


   @ableegoldman thanks for your reviews. I have addressed your comment. Could 
you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

2020-07-03 Thread GitBox


chia7712 commented on a change in pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#discussion_r449728663



##
File path: clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
##
@@ -26,17 +26,16 @@
 
 public class MockDeserializer implements ClusterResourceListener, 
Deserializer {
 public static AtomicInteger initCount = new AtomicInteger(0);
-public static AtomicInteger closeCount = new AtomicInteger(0);
 public static AtomicReference clusterMeta = new 
AtomicReference<>();
 public static ClusterResource noClusterId = new 
ClusterResource("no_cluster_id");
 public static AtomicReference clusterIdBeforeDeserialize 
= new AtomicReference<>(noClusterId);
 
 public boolean isKey;
 public Map configs;
+public boolean isClosed = false;

Review comment:
   I revert all changes of ```MockDeserializer``` as the concern I 
described is not existent yet.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishbellapu commented on a change in pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2020-07-03 Thread GitBox


satishbellapu commented on a change in pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#discussion_r449727554



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -229,8 +235,8 @@ Duration adminTimeout() {
 props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
 props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
 props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
-props.put("enable.auto.commit", "false");
-props.put("auto.offset.reset", "earliest");
+props.put(ENABLE_AUTO_COMMIT, "false");
+props.put(AUTH_OFFSET_RESET, CONSUMER_AUTO_OFFSET_RESET);

Review comment:
   ACK 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8944: KAFKA-10209: Fix connect_rest_test.py after the introduction of new connector configs

2020-07-03 Thread GitBox


chia7712 commented on pull request #8944:
URL: https://github.com/apache/kafka/pull/8944#issuecomment-653702134


   > Also, note this is not flaky. It's failing consistently.
   
   Got it. I will give more accurate title next time!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #2312: Kafka 960 metrics upgrade

2020-07-03 Thread GitBox


ijuma edited a comment on pull request #2312:
URL: https://github.com/apache/kafka/pull/2312#issuecomment-653672453


   #7121 is also relevant, it refers to KIP-510.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #2312: Kafka 960 metrics upgrade

2020-07-03 Thread GitBox


ijuma commented on pull request #2312:
URL: https://github.com/apache/kafka/pull/2312#issuecomment-653672141


   @daniellwu I agree it would be good to upgrade to the latest version of the 
library. We need someone to submit a KIP detailing the compatibility impact. We 
have a major release approaching due to KIP-500, so it would be a great time to 
do this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-03 Thread GitBox


ijuma edited a comment on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-653671572


   I think it makes sense to do this, but we also need to figure out a way to:
   1. Make it clear that this is retriable for non Java clients via a KIP
   2. Consider not returning this for older protocol versions.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-03 Thread GitBox


ijuma commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-653671572


   I think it makes sense to do this, but we also need to figure out a way to:



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10234) The key/value deserializer used by ConsoleConsumer is not closed

2020-07-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10234:
--

 Summary: The key/value deserializer used by ConsoleConsumer is not 
closed
 Key: KAFKA-10234
 URL: https://issues.apache.org/jira/browse/KAFKA-10234
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We instantiate, configure and use them but them are never closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10232) MirrorMaker2 internal topics Formatters

2020-07-03 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10232.

Fix Version/s: 2.7.0
   Resolution: Fixed

> MirrorMaker2 internal topics Formatters
> ---
>
> Key: KAFKA-10232
> URL: https://issues.apache.org/jira/browse/KAFKA-10232
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, mirrormaker
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 2.7.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-597%3A+MirrorMaker2+internal+topics+Formatters



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10232) MirrorMaker2 internal topics Formatters

2020-07-03 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-10232:
--

 Summary: MirrorMaker2 internal topics Formatters
 Key: KAFKA-10232
 URL: https://issues.apache.org/jira/browse/KAFKA-10232
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect, mirrormaker
Reporter: Mickael Maison
Assignee: Mickael Maison


https://cwiki.apache.org/confluence/display/KAFKA/KIP-597%3A+MirrorMaker2+internal+topics+Formatters



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10227) Enforce cleanup policy to only contain compact or delete once

2020-07-03 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-10227:
--

Assignee: huxihx

> Enforce cleanup policy to only contain compact or delete once
> -
>
> Key: KAFKA-10227
> URL: https://issues.apache.org/jira/browse/KAFKA-10227
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Mickael Maison
>Assignee: huxihx
>Priority: Minor
>
> When creating or altering a topic, it's possible to set cleanup.policy to 
> values like "compact,compact,delete".
> For example:
>  {{./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact,compact,delete}}
> {{./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}}
>  {{Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs: 
> cleanup.policy=compact,compact,delete,segment.bytes=1073741824}}
>  
>  We should prevent this and enforce cleanup policy contains each value only 
> once.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10209) Fix connect_rest_test.py after the introduction of new connector configs

2020-07-03 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10209:
---
Fix Version/s: 2.6.0

> Fix connect_rest_test.py after the introduction of new connector configs
> 
>
> Key: KAFKA-10209
> URL: https://issues.apache.org/jira/browse/KAFKA-10209
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.6.0
>
>
> There are two new configs introduced by 
> [371f14c3c12d2e341ac96bd52393b43a10acfa84|https://github.com/apache/kafka/commit/371f14c3c12d2e341ac96bd52393b43a10acfa84]
>  and 
> [1c4eb1a5757df611735cfac9b709e0d80d0da4b3|https://github.com/apache/kafka/commit/1c4eb1a5757df611735cfac9b709e0d80d0da4b3]
>  so we have to update the expected configs in connect_rest_test.py



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10209) Fix connect_rest_test.py after the introduction of new connector configs

2020-07-03 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-10209.

Resolution: Fixed

> Fix connect_rest_test.py after the introduction of new connector configs
> 
>
> Key: KAFKA-10209
> URL: https://issues.apache.org/jira/browse/KAFKA-10209
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.6.0
>
>
> There are two new configs introduced by 
> [371f14c3c12d2e341ac96bd52393b43a10acfa84|https://github.com/apache/kafka/commit/371f14c3c12d2e341ac96bd52393b43a10acfa84]
>  and 
> [1c4eb1a5757df611735cfac9b709e0d80d0da4b3|https://github.com/apache/kafka/commit/1c4eb1a5757df611735cfac9b709e0d80d0da4b3]
>  so we have to update the expected configs in connect_rest_test.py



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10228) producer: NETWORK_EXCEPTION is thrown instead of a request timeout

2020-07-03 Thread Christian Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17150855#comment-17150855
 ] 

Christian Becker edited comment on KAFKA-10228 at 7/3/20, 9:21 AM:
---

If you don't want to change the exception type to ensure compatibility, it 
might make sense to change the log line from debug to info or something more 
severe to give a clue about the cause:
{code:java}
log.debug("Disconnecting from node {} due to request timeout.", nodeId); {code}
By default most production services are not run on this log level, so the 
message would never be emitted.


was (Author: tgbeck):
If you don't want to change the exception type to ensure compatibility, it 
might make sense to change the log line from debug to info or something more 
severe to give a clue about the cause:
{code:java}
log.debug("Disconnecting from node {} due to request timeout.", nodeId); {code}

> producer: NETWORK_EXCEPTION is thrown instead of a request timeout
> --
>
> Key: KAFKA-10228
> URL: https://issues.apache.org/jira/browse/KAFKA-10228
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.3.1
>Reporter: Christian Becker
>Priority: Major
>
> We're currently seeing an issue with the java client (producer), when message 
> producing runs into a timeout. Namely a NETWORK_EXCEPTION is thrown instead 
> of a timeout exception.
> *Situation and relevant code:*
> Config
> {code:java}
> request.timeout.ms: 200
> retries: 3
> acks: all{code}
> {code:java}
> for (UnpublishedEvent event : unpublishedEvents) {
> ListenableFuture> future;
> future = kafkaTemplate.send(new ProducerRecord<>(event.getTopic(), 
> event.getKafkaKey(), event.getPayload()));
> futures.add(future.completable());
> }
> CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();{code}
> We're using the KafkaTemplate from SpringBoot here, but it shouldn't matter, 
> as it's merely a wrapper. There we put in batches of messages to be sent.
> 200ms later, we can see the following in the logs: (not sure about the order, 
> they've arrived in the same ms, so our logging system might not display them 
> in the right order)
> {code:java}
> [Producer clientId=producer-1] Received invalid metadata error in produce 
> request on partition events-6 due to 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.. Going to request metadata update now
> [Producer clientId=producer-1] Got error produce response with correlation id 
> 3094 on topic-partition events-6, retrying (2 attempts left). Error: 
> NETWORK_EXCEPTION {code}
> There is also a corresponding error on the broker (within a few ms):
> {code:java}
> Attempting to send response via channel for which there is no open 
> connection, connection id XXX (kafka.network.Processor) {code}
> This was somewhat unexpected and sent us for a hunt across the infrastructure 
> for possible connection issues, but we've found none.
> Side note: In some cases the retries worked and the messages were 
> successfully produced.
> Only after many hours of heavy debugging, we've noticed, that the error might 
> be related to the low timeout setting. We've removed that setting now, as it 
> was a remnant from the past and no longer valid for our use-case. However in 
> order to avoid other people having that issue again and to simplify future 
> debugging, some form of timeout exception should be thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10228) producer: NETWORK_EXCEPTION is thrown instead of a request timeout

2020-07-03 Thread Christian Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17150855#comment-17150855
 ] 

Christian Becker commented on KAFKA-10228:
--

If you don't want to change the exception type to ensure compatibility, it 
might make sense to change the log line from debug to info or something more 
severe to give a clue about the cause:
{code:java}
log.debug("Disconnecting from node {} due to request timeout.", nodeId); {code}

> producer: NETWORK_EXCEPTION is thrown instead of a request timeout
> --
>
> Key: KAFKA-10228
> URL: https://issues.apache.org/jira/browse/KAFKA-10228
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.3.1
>Reporter: Christian Becker
>Priority: Major
>
> We're currently seeing an issue with the java client (producer), when message 
> producing runs into a timeout. Namely a NETWORK_EXCEPTION is thrown instead 
> of a timeout exception.
> *Situation and relevant code:*
> Config
> {code:java}
> request.timeout.ms: 200
> retries: 3
> acks: all{code}
> {code:java}
> for (UnpublishedEvent event : unpublishedEvents) {
> ListenableFuture> future;
> future = kafkaTemplate.send(new ProducerRecord<>(event.getTopic(), 
> event.getKafkaKey(), event.getPayload()));
> futures.add(future.completable());
> }
> CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();{code}
> We're using the KafkaTemplate from SpringBoot here, but it shouldn't matter, 
> as it's merely a wrapper. There we put in batches of messages to be sent.
> 200ms later, we can see the following in the logs: (not sure about the order, 
> they've arrived in the same ms, so our logging system might not display them 
> in the right order)
> {code:java}
> [Producer clientId=producer-1] Received invalid metadata error in produce 
> request on partition events-6 due to 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.. Going to request metadata update now
> [Producer clientId=producer-1] Got error produce response with correlation id 
> 3094 on topic-partition events-6, retrying (2 attempts left). Error: 
> NETWORK_EXCEPTION {code}
> There is also a corresponding error on the broker (within a few ms):
> {code:java}
> Attempting to send response via channel for which there is no open 
> connection, connection id XXX (kafka.network.Processor) {code}
> This was somewhat unexpected and sent us for a hunt across the infrastructure 
> for possible connection issues, but we've found none.
> Side note: In some cases the retries worked and the messages were 
> successfully produced.
> Only after many hours of heavy debugging, we've noticed, that the error might 
> be related to the low timeout setting. We've removed that setting now, as it 
> was a remnant from the past and no longer valid for our use-case. However in 
> order to avoid other people having that issue again and to simplify future 
> debugging, some form of timeout exception should be thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10231) Broken Kafka Connect node to node communication if invalid hostname is in place

2020-07-03 Thread Pere Urbon-Bayes (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pere Urbon-Bayes updated KAFKA-10231:
-
Description: 
As a Kafka Connect operator I would expect a more definitive error when the 
internal node to node communication can't happen.

If the hostname contains an invalid character according to the [RFC1123 section 
2.1|#page-13]], the error raised by the Kafka Connect worker node look like:

 
{quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
 Invalid URI host: null (authority: kafka_connect-0.dev-2:8083)}}{{at 
org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{
at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{  
  at 
org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{
at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{  
  at 
org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
{quote}
 

it would be much nicer for operators that such situations are detected in 
similar, or improved, version as the JVM is doing in the [IDN class|#L291]].

 

  was:
As a Kafka Connect operator I would expect a more definitive error when the 
internal node to node communication can't happen.

If the hostname contains an invalid character according to the [RFC1123 section 
2.1|[https://tools.ietf.org/html/rfc1123#page-13]], the error raised by the 
Kafka Connect worker node look like:

 
{quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
 Invalid URI host: null (authority: kafka_connect-0.cdh-dev-2:8083)}}{{
at org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{   
 at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{ 
   at 
org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{
at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{  
  at 
org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
at 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
{quote}
 

it would be much nicer for operators that such situations are detected in 
similar, or improved, version as the JVM is doing in the [IDN 
class|[https://github.com/JetBrains/jdk8u_jdk/blob/master/src/share/classes/java/net/IDN.java#L291]].

 


> Broken Kafka Connect node to node communication if invalid hostname is in 
> place
> ---
>
> Key: KAFKA-10231
> URL: https://issues.apache.org/jira/browse/KAFKA-10231
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Pere Urbon-Bayes
>Priority: Minor
>
> As a Kafka Connect operator I would expect a more definitive error when the 
> internal node to node communication can't happen.
> If the hostname contains an invalid character according to the [RFC1123 
> section 2.1|#page-13]], the error raised by the Kafka Connect worker node 
> look like:
>  
> {quote}{{[2020-06-30 10:38:49,990] ERROR Uncaught exception in REST call to 
> /connectors 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)}}{{java.lang.IllegalArgumentException:
>  Invalid URI host: null (authority: kafka_connect-0.dev-2:8083)}}{{at 
> org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:506)}}{{
> at 
> org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:491)}}{{   
>  at 
> org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:449)}}{{   
>  at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:438)}}{{   
>  at 
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:83)}}{{
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:309)}}{{
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:138)}}
> {quote}
>  

[GitHub] [kafka] guozhangwang commented on pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

2020-07-03 Thread GitBox


guozhangwang commented on pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#issuecomment-653654543


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8971: MINOR: prune the metadata upgrade test matrix

2020-07-03 Thread GitBox


guozhangwang commented on a change in pull request #8971:
URL: https://github.com/apache/kafka/pull/8971#discussion_r449685553



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -189,9 +185,8 @@ def test_upgrade_downgrade_brokers(self, from_version, 
to_version):
 processor.stop()
 processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" 
% processor.STDOUT_FILE, allow_fail=False)
 
-@matrix(from_version=metadata_1_versions, 
to_version=backward_compatible_metadata_2_versions)
-@matrix(from_version=metadata_1_versions, 
to_version=metadata_3_or_higher_versions)
-@matrix(from_version=metadata_2_versions, 
to_version=metadata_3_or_higher_versions)
+@matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)])
+@matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)])

Review comment:
   I was originally thinking that `streams_application_upgrade_test.py` and 
`streams_upgrade_test.py` can be re-consolidated so that the latter would be 
for broker-clients compatibility (including rolling upgrade the brokers), and 
would for now be ignored. While the former would be used for upgrading / 
downgrading clients for different paths (one rolling bounce including version 
probing, two rolling bounces, full shutdown and restart). In that case, this 
test and the test_version_probing_upgrade below could be moved into the other 
file. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

2020-07-03 Thread GitBox


guozhangwang commented on pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#issuecomment-653654599


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

2020-07-03 Thread GitBox


guozhangwang commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r449684173



##
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), 
str(LATEST_2_5)]
+dev_version = [str(DEV_VERSION)]
+
+class StreamsUpgradeTest(Test):
+"""
+Test upgrading Kafka Streams (all version combination)
+If metadata was changes, upgrade is more difficult
+Metadata version was bumped in 0.10.1.0 and
+subsequently bumped in 2.0.0
+"""
+
+def __init__(self, test_context):
+super(StreamsUpgradeTest, self).__init__(test_context)
+self.topics = {
+'echo' : { 'partitions': 5 },
+'data' : { 'partitions': 5 },
+}
+
+processed_msg = "processed [0-9]* records"
+base_version_number = str(DEV_VERSION).split("-")[0]
+
+def perform_broker_upgrade(self, to_version):
+self.logger.info("First pass bounce - rolling broker upgrade")
+for node in self.kafka.nodes:
+self.kafka.stop_node(node)
+node.version = KafkaVersion(to_version)
+self.kafka.start_node(node)
+
+@matrix(from_version=smoke_test_versions, to_version=dev_version, 
bounce_type=["full"])

Review comment:
   Sounds great, thank you!!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

2020-07-03 Thread GitBox


chia7712 commented on a change in pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#discussion_r449675093



##
File path: clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
##
@@ -26,17 +26,16 @@
 
 public class MockDeserializer implements ClusterResourceListener, 
Deserializer {
 public static AtomicInteger initCount = new AtomicInteger(0);
-public static AtomicInteger closeCount = new AtomicInteger(0);
 public static AtomicReference clusterMeta = new 
AtomicReference<>();
 public static ClusterResource noClusterId = new 
ClusterResource("no_cluster_id");
 public static AtomicReference clusterIdBeforeDeserialize 
= new AtomicReference<>(noClusterId);
 
 public boolean isKey;
 public Map configs;
+public boolean isClosed = false;

Review comment:
   I did not reuse the atomic count since the count is a static variable. 
Junit, by default, could run different tests on same JVM and hence the static 
variable may be changed by other tests.
   
   
   For another, your point makes sense that the atomic integer is good enough. 
I will revert the Boolean and remove the static modifier to prevent the case I 
described above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8944: KAFKA-10209: Fix connect_rest_test.py after the introduction of new connector configs

2020-07-03 Thread GitBox


kkonstantine commented on pull request #8944:
URL: https://github.com/apache/kafka/pull/8944#issuecomment-653633933


   Merged to `trunk` and cherry-picked to `2.6`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #8944: KAFKA-10209: Fix connect_rest_test.py after the introduction of new connector configs

2020-07-03 Thread GitBox


kkonstantine merged pull request #8944:
URL: https://github.com/apache/kafka/pull/8944


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8944: KAFKA-10209 fix flaky connect_rest_test.py

2020-07-03 Thread GitBox


kkonstantine commented on pull request #8944:
URL: https://github.com/apache/kafka/pull/8944#issuecomment-653632091


   Also, note this is not flaky. It's failing consistently. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8944: KAFKA-10209 fix flaky connect_rest_test.py

2020-07-03 Thread GitBox


kkonstantine commented on pull request #8944:
URL: https://github.com/apache/kafka/pull/8944#issuecomment-653631904


   What I don't like too much about this test is that it fails on optional 
configs. But on the other hand I guess it's good to have a test for a complete 
listing, it's just that this is failing after a merge. Thanks for the fix. I 
confirmed by running this system test locally. Unit/integration tests are not 
relevant here. 
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

2020-07-03 Thread GitBox


kkonstantine commented on a change in pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#discussion_r449657463



##
File path: clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
##
@@ -26,17 +26,16 @@
 
 public class MockDeserializer implements ClusterResourceListener, 
Deserializer {
 public static AtomicInteger initCount = new AtomicInteger(0);
-public static AtomicInteger closeCount = new AtomicInteger(0);
 public static AtomicReference clusterMeta = new 
AtomicReference<>();
 public static ClusterResource noClusterId = new 
ClusterResource("no_cluster_id");
 public static AtomicReference clusterIdBeforeDeserialize 
= new AtomicReference<>(noClusterId);
 
 public boolean isKey;
 public Map configs;
+public boolean isClosed = false;

Review comment:
   This needs to be `volatile` or otherwise accessed with synchronization. 
   
   In multicores (and multiprocessors in general) changing the value of a 
variable like this is, is not guaranteed to be visible to other threads unless 
there's a memory barrier associated with this change, that will enforce a 
happens-before behavior for the changes on this variable. 
   
   I'm saying the above, without really checking whether this code is accessed 
by multiple threads. Yet the presence of several atomic variables tells me that 
it is. 

##
File path: clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
##
@@ -26,17 +26,16 @@
 
 public class MockDeserializer implements ClusterResourceListener, 
Deserializer {
 public static AtomicInteger initCount = new AtomicInteger(0);
-public static AtomicInteger closeCount = new AtomicInteger(0);
 public static AtomicReference clusterMeta = new 
AtomicReference<>();
 public static ClusterResource noClusterId = new 
ClusterResource("no_cluster_id");
 public static AtomicReference clusterIdBeforeDeserialize 
= new AtomicReference<>(noClusterId);
 
 public boolean isKey;
 public Map configs;
+public boolean isClosed = false;

Review comment:
   On another note, why are we switching to `boolean` from `integer`? I 
agree that there's no use of `closeCount` in the code base and that this is not 
public interface. But this changes the symmetry with `initCount` (which remains 
an integer) and it could still be used by a third-party package that uses these 
tests (by incorrectly depending on non-public interface). My point is, if we 
just do this for an assertion, we could use 
   
   `closeCount.get() > 0` or `close.get() == 0` for the equivalent of `true` or 
`false` 
   
   and be on the safe side (preserving symmetry too). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-03 Thread GitBox


rajinisivaram opened a new pull request #8979:
URL: https://github.com/apache/kafka/pull/8979


   Since `ReplicaNotAvailableException` may be thrown during reassignments, it 
should be a retriable InvalidMetadataException. It looks like the consumer 
handles this error correctly by processing the error codes. But for 
consistency, we should make it an InvalidMetadataException.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

2020-07-03 Thread GitBox


chia7712 opened a new pull request #8978:
URL: https://github.com/apache/kafka/pull/8978


   We instantiate, configure and use them but them are never closed.
   
   issue: https://issues.apache.org/jira/browse/KAFKA-10234
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

2020-07-03 Thread GitBox


dajac commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r449633467



##
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##
@@ -209,4 +212,139 @@ public void shouldReturnPresenceOfMetrics() {
 
 assertThat(sensor.hasMetrics(), is(true));
 }
-}
\ No newline at end of file
+
+@Test
+public void testStrictQuotaEnforcement() {
+final Time time = new MockTime(0, 0, 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(10))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("rate", "test-group");
+assertTrue(sensor.add(metricName, new Rate()));
+final KafkaMetric rateMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0s to bring the avg rate to 9. Value 
is accepted
+// because the quota is not exhausted yet.
+sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Recording a second value at T+1s to bring the avg rate to 18. Value 
is accepted
+// because the quota is not exhausted yet.
+time.sleep(1000);
+sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Recording a third value at T+2s is rejected immediately and rate is 
not updated
+// because the quota is exhausted.
+time.sleep(1000);
+assertThrows(QuotaViolationException.class,
+() -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.STRICT));
+assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+metrics.close();
+}
+
+@Test
+public void testPermissiveQuotaEnforcement() {
+final Time time = new MockTime(0, 0, 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(10))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("rate", "test-group");
+assertTrue(sensor.add(metricName, new Rate()));
+final KafkaMetric rateMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0s to bring the avg rate to 9. Value 
is accepted
+// because the quota is not exhausted yet.
+sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE);
+assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Recording a second value at T+1s to bring the avg rate to 18. Value 
is accepted
+// and rate is updated even though the quota is exhausted.
+time.sleep(1000);
+assertThrows(QuotaViolationException.class,
+() -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE));
+assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Recording a second value at T+1s to bring the avg rate to 27. Value 
is accepted
+// and rate is updated even though the quota is exhausted.
+time.sleep(1000);
+assertThrows(QuotaViolationException.class,
+() -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE));
+assertEquals(27, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+metrics.close();
+}
+
+@Test
+public void testStrictQuotaEnforcementWithDefaultRate() {
+final Time time = new MockTime(0, 0, 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(2))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("rate", "test-group");
+assertTrue(sensor.add(metricName, new Rate()));
+final KafkaMetric rateMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0 to bring the avg rate to 3 which is 
already
+// above the quota.
+sensor.record(30, time.milliseconds(), QuotaEnforcementType.STRICT);
+assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+// ((30 / 10) - 2) / 2 * 10 = 5s
+time.sleep(5000);
+
+// But, recording a second value is rejected because the avr rate is 
still equal
+// to 3 after 5s.
+assertThrows(QuotaViolationException.class,
+() -> sensor.record(30, time.milliseconds(), 

[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

2020-07-03 Thread GitBox


dajac commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r44964



##
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##
@@ -209,4 +212,139 @@ public void shouldReturnPresenceOfMetrics() {
 
 assertThat(sensor.hasMetrics(), is(true));
 }
-}
\ No newline at end of file
+
+@Test
+public void testStrictQuotaEnforcement() {
+final Time time = new MockTime(0, 0, 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(10))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("rate", "test-group");
+assertTrue(sensor.add(metricName, new Rate()));
+final KafkaMetric rateMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0s to bring the avg rate to 9. Value 
is accepted
+// because the quota is not exhausted yet.
+sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Recording a second value at T+1s to bring the avg rate to 18. Value 
is accepted
+// because the quota is not exhausted yet.
+time.sleep(1000);
+sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Recording a third value at T+2s is rejected immediately and rate is 
not updated
+// because the quota is exhausted.
+time.sleep(1000);
+assertThrows(QuotaViolationException.class,
+() -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.STRICT));
+assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+metrics.close();
+}
+
+@Test
+public void testPermissiveQuotaEnforcement() {
+final Time time = new MockTime(0, 0, 0);
+final Metrics metrics = new Metrics(time);
+final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+.quota(Quota.upperBound(10))
+.timeWindow(1, TimeUnit.SECONDS)
+.samples(11));
+final MetricName metricName = metrics.metricName("rate", "test-group");
+assertTrue(sensor.add(metricName, new Rate()));
+final KafkaMetric rateMetric = metrics.metric(metricName);
+
+// Recording a first value at T+0s to bring the avg rate to 9. Value 
is accepted
+// because the quota is not exhausted yet.
+sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE);
+assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Recording a second value at T+1s to bring the avg rate to 18. Value 
is accepted
+// and rate is updated even though the quota is exhausted.
+time.sleep(1000);
+assertThrows(QuotaViolationException.class,
+() -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE));
+assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+// Recording a second value at T+1s to bring the avg rate to 27. Value 
is accepted
+// and rate is updated even though the quota is exhausted.
+time.sleep(1000);
+assertThrows(QuotaViolationException.class,
+() -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE));
+assertEquals(27, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+metrics.close();
+}
+
+@Test
+public void testStrictQuotaEnforcementWithDefaultRate() {

Review comment:
   This test illustrate the problem that we are trying to resolve.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-03 Thread GitBox


dajac commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r449603684



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##
@@ -156,145 +66,98 @@ private Builder(short oldestAllowedVersion,
 int replicaId,
 IsolationLevel isolationLevel) {
 super(ApiKeys.LIST_OFFSETS, oldestAllowedVersion, 
latestAllowedVersion);
-this.replicaId = replicaId;
-this.isolationLevel = isolationLevel;
+data = new ListOffsetRequestData()
+.setIsolationLevel(isolationLevel.id())
+.setReplicaId(replicaId);
 }
 
-public Builder setTargetTimes(Map 
partitionTimestamps) {
-this.partitionTimestamps = partitionTimestamps;
+public Builder setTargetTimes(List topics) {
+data.setTopics(topics);
 return this;
 }
 
 @Override
 public ListOffsetRequest build(short version) {
-return new ListOffsetRequest(replicaId, partitionTimestamps, 
isolationLevel, version);
+return new ListOffsetRequest(version, data);
 }
 
 @Override
 public String toString() {
-StringBuilder bld = new StringBuilder();
-bld.append("(type=ListOffsetRequest")
-   .append(", replicaId=").append(replicaId);
-if (partitionTimestamps != null) {
-bld.append(", 
partitionTimestamps=").append(partitionTimestamps);
-}
-bld.append(", isolationLevel=").append(isolationLevel);
-bld.append(")");
-return bld.toString();
-}
-}
-
-public static final class PartitionData {
-public final long timestamp;
-public final int maxNumOffsets; // only supported in v0
-public final Optional currentLeaderEpoch;
-
-private PartitionData(long timestamp, int maxNumOffsets, 
Optional currentLeaderEpoch) {
-this.timestamp = timestamp;
-this.maxNumOffsets = maxNumOffsets;
-this.currentLeaderEpoch = currentLeaderEpoch;
-}
-
-// For V0
-public PartitionData(long timestamp, int maxNumOffsets) {
-this(timestamp, maxNumOffsets, Optional.empty());
-}
-
-public PartitionData(long timestamp, Optional 
currentLeaderEpoch) {
-this(timestamp, 1, currentLeaderEpoch);
-}
-
-@Override
-public boolean equals(Object obj) {
-if (!(obj instanceof PartitionData)) return false;
-PartitionData other = (PartitionData) obj;
-return this.timestamp == other.timestamp &&
-this.currentLeaderEpoch.equals(other.currentLeaderEpoch);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(timestamp, currentLeaderEpoch);
-}
-
-@Override
-public String toString() {
-StringBuilder bld = new StringBuilder();
-bld.append("{timestamp: ").append(timestamp).
-append(", maxNumOffsets: ").append(maxNumOffsets).
-append(", currentLeaderEpoch: 
").append(currentLeaderEpoch).
-append("}");
-return bld.toString();
+return data.toString();
 }
 }
 
 /**
  * Private constructor with a specified version.
  */
-private ListOffsetRequest(int replicaId,
-  Map targetTimes,
-  IsolationLevel isolationLevel,
-  short version) {
+private ListOffsetRequest(short version, ListOffsetRequestData data) {
 super(ApiKeys.LIST_OFFSETS, version);
-this.replicaId = replicaId;
-this.isolationLevel = isolationLevel;
-this.partitionTimestamps = targetTimes;
+this.data = data;
 this.duplicatePartitions = Collections.emptySet();
 }
 
 public ListOffsetRequest(Struct struct, short version) {
 super(ApiKeys.LIST_OFFSETS, version);
-Set duplicatePartitions = new HashSet<>();
-replicaId = struct.get(REPLICA_ID);
-isolationLevel = struct.hasField(ISOLATION_LEVEL) ?
-IsolationLevel.forId(struct.get(ISOLATION_LEVEL)) :
-IsolationLevel.READ_UNCOMMITTED;
-partitionTimestamps = new HashMap<>();
-for (Object topicResponseObj : struct.get(TOPICS)) {
-Struct topicResponse = (Struct) topicResponseObj;
-String topic = topicResponse.get(TOPIC_NAME);
-for (Object partitionResponseObj : topicResponse.get(PARTITIONS)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-int partition = partitionResponse.get(PARTITION_ID);
-long timestamp = partitionResponse.get(TIMESTAMP);
-

[GitHub] [kafka] tombentley commented on pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-03 Thread GitBox


tombentley commented on pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#issuecomment-653582517


   @dajac fixed those. I do agree about having smaller tests, but I'd prefer to 
do that in another PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-03 Thread GitBox


dajac commented on a change in pull request #8808:
URL: https://github.com/apache/kafka/pull/8808#discussion_r449554332



##
File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
##
@@ -127,25 +131,53 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 testAclCli(adminArgs)
   }
 
-  private def createServer(): Unit = {
+  private def createServer(commandConfig: File = null): Unit = {

Review comment:
   nit: It would be better to use an `Option` and default to `None` here.

##
File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
##
@@ -127,25 +131,53 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
 testAclCli(adminArgs)
   }
 
-  private def createServer(): Unit = {
+  private def createServer(commandConfig: File = null): Unit = {
 servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
 val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-adminArgs = Array("--bootstrap-server", 
TestUtils.bootstrapServers(servers, listenerName))
+
+var adminArgs = Array("--bootstrap-server", 
TestUtils.bootstrapServers(servers, listenerName))
+if (commandConfig != null) {
+  adminArgs ++= Array("--command-config", commandConfig.getAbsolutePath)
+}
+this.adminArgs = adminArgs
+  }
+
+  private def callMain(args: Array[String]): (String, String) = {
+TestUtils.grabConsoleOutputAndError(AclCommand.main(args))
   }
 
   private def testAclCli(cmdArgs: Array[String]): Unit = {
 for ((resources, resourceCmd) <- ResourceToCommand) {
   for (permissionType <- Set(ALLOW, DENY)) {
 val operationToCmd = ResourceToOperations(resources)
 val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
-  AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 
:+ "--add")
-  for (resource <- resources) {
-withAuthorizer() { authorizer =>
-  TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
-}
+val (addOut, addErr) = callMain(cmdArgs ++ cmd ++ resourceCmd ++ 
operationToCmd._2 :+ "--add")
+assertOutputContains("Adding ACLs", resources, resourceCmd, addOut)
+assertOutputContains("Current ACLs", resources, resourceCmd, addOut)
+
+Assert.assertEquals("", addErr)

Review comment:
   nit: Could you move the empty line above this line to bellow this line?

##
File path: core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
##
@@ -130,22 +132,61 @@ class AclCommandTest extends ZooKeeperTestHarness with 
Logging {
   private def createServer(): Unit = {
 servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
 val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-adminArgs = Array("--bootstrap-server", 
TestUtils.bootstrapServers(servers, listenerName))
+
+val tmp = File.createTempFile(classOf[AclCommandTest].getName, 
"createServer")
+tmp.deleteOnExit()
+val pw = new PrintWriter(tmp)
+pw.println("client.id=my-client")
+pw.close()
+
+adminArgs = Array("--bootstrap-server", 
TestUtils.bootstrapServers(servers, listenerName),
+  "--command-config", tmp.getAbsolutePath)
+  }
+
+  private def callMain(args: Array[String]): (String, String) = {
+val appender = LogCaptureAppender.createAndRegister()
+val previousLevel = 
LogCaptureAppender.setClassLoggerLevel(classOf[AppInfoParser], Level.WARN)
+val outErr = TestUtils.grabConsoleOutputAndError(AclCommand.main(args))
+LogCaptureAppender.setClassLoggerLevel(classOf[AppInfoParser], 
previousLevel)
+LogCaptureAppender.unregister(appender)
+Assert.assertEquals("Command should execute without logging errors or 
warnings",
+  "",
+  appender.getMessages.map{event => s"${event.getLevel} 
${event.getMessage}" }.mkString("\n"))
+outErr
   }
 
   private def testAclCli(cmdArgs: Array[String]): Unit = {
+
 for ((resources, resourceCmd) <- ResourceToCommand) {
   for (permissionType <- Set(ALLOW, DENY)) {
 val operationToCmd = ResourceToOperations(resources)
 val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
-  AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 
:+ "--add")
-  for (resource <- resources) {
-withAuthorizer() { authorizer =>
-  TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
-}
+val (addOut, addErr) = callMain(cmdArgs ++ cmd ++ resourceCmd ++ 
operationToCmd._2 :+ "--add")
+assertOutputContains("Adding ACLs", resources, resourceCmd, addOut)

Review comment:
   @tombentley Yes, I meant something along these lines. I am not a huge 
fan of having one test that verifies everything so I wish that we could move 
towards having smaller and simpler test cases in the future. I do agree that 
this is 

[GitHub] [kafka] tombentley commented on pull request #8449: KAFKA-7613: Enable -Xlint:try, fixing warnings

2020-07-03 Thread GitBox


tombentley commented on pull request #8449:
URL: https://github.com/apache/kafka/pull/8449#issuecomment-653523853


   Rebased for conflict. @ijuma could you re-run the tests and maybe take 
another look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

2020-07-03 Thread GitBox


dajac commented on pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#issuecomment-653514573


   @hachikuji or @junrao Could one of you two help me with getting this merged?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8909: KAFKA-6733: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter (rebased)

2020-07-03 Thread GitBox


dajac commented on a change in pull request #8909:
URL: https://github.com/apache/kafka/pull/8909#discussion_r449546534



##
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##
@@ -460,46 +466,31 @@ class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
   var printValue = true
   var printPartition = false
-  var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
-  var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
+  var printOffset = false
+  var printHeaders = false
+  var keySeparator = utfBytes("\t")
+  var lineSeparator = utfBytes("\n")
+  var headersSeparator = utfBytes(",")
 
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
+  var headersDeserializer: Option[Deserializer[_]] = None
 
   override def init(props: Properties): Unit = {

Review comment:
   @badaiaqrandista FYI, the `init` method has been deprecated as part of 
KIP-597: https://github.com/apache/kafka/pull/8604. You will have to adapt your 
PR to support it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

2020-07-03 Thread GitBox


dajac commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r449545905



##
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##
@@ -466,7 +465,9 @@ class DefaultMessageFormatter extends MessageFormatter {
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
 
-  override def init(props: Properties): Unit = {
+  override def configure(configs: Map[String, _]): Unit = {
+val props = new java.util.Properties()
+configs.asScala.foreach { case (key, value) => props.put(key, 
value.toString) }

Review comment:
   That's a good point. Probably, the correct way to do this in the future 
would be to define a `ConfigDef` to parse the Map. We can tackle this as part 
of https://github.com/apache/kafka/pull/8909.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

2020-07-03 Thread GitBox


dajac commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r449544703



##
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##
@@ -309,7 +308,7 @@ object ConsoleConsumer extends Logging {
   
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer)
 }
 
-formatter.init(formatterArgs)
+formatter.configure(formatterArgs.asScala.asJava)

Review comment:
   I was thinking about adding a second utility method 
`CommandLineUtils.parseKeyValueArgsAsMap` and keep the existing one unchanged. 
We can do this as a follow up for sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

2020-07-03 Thread GitBox


dajac opened a new pull request #8977:
URL: https://github.com/apache/kafka/pull/8977


   This PR does two things:
   
   1. It introduces `QuotaEnforcementType` which allows to choose when the 
quota enforcement is done within the `Sensor`. The default `Permissive` mode 
works as previously. It enforces the quota after having accounted the new 
value. The new `Strict` mode enforces the quota before and thus does not 
account the value if the quota is violated.
   
   2. It introduces a new `SampledStat` names `TokenBucket`. Our current quota 
enforcement mechanism does not cope well with bursty workloads, especially when 
the burst is significantly larger than the quota. The reason is that a large 
recorded burst remains in the samples until it gets pasted the window. During 
this time, the computed rate remains the same until it gets dropped. The 
`TokenBucket` sampled stat alleviate this issue by decreasing each sample by 
Quota * the time between itself and the previous sample. This mimics the token 
bucket algorithm which gives back credits at every second.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

2020-07-03 Thread GitBox


mimaison merged pull request #8604:
URL: https://github.com/apache/kafka/pull/8604


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

2020-07-03 Thread GitBox


mimaison commented on pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#issuecomment-653436024


   All failures were Streams, merging.
   
   - JDK 11 and Scala 2.13:
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   - JDK 14 and Scala 2.13:
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   - JDK 8 and Scala 2.12:
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8975: MINOR: Document that max.block.ms affects some transaction methods

2020-07-03 Thread GitBox


tombentley commented on pull request #8975:
URL: https://github.com/apache/kafka/pull/8975#issuecomment-653416263


   Thanks @abbccdda, I reworded it slightly, hope that's OK.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] heritamas commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-07-03 Thread GitBox


heritamas commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r449440511



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -132,7 +132,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-.filter(x -> x.downstreamOffset() > 0)  // ignore offsets we 
cannot translate accurately
+.filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we 
cannot translate accurately

Review comment:
   Then, this filter matches everything, doesn't it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org