[GitHub] [kafka] satishbellapu commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2020-06-23 Thread GitBox


satishbellapu commented on pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#issuecomment-648602105


   @omkreddy cc



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-06-23 Thread GitBox


skaundinya15 commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r444631685



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -84,6 +84,9 @@
 public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
 public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to 
wait before attempting to retry a failed request to a given topic partition. 
This avoids repeatedly sending requests in a tight loop under some failure 
scenarios.";
 
+public static final String RETRY_BACKOFF_MAX_MS_CONFIG = 
"retry.backoff.max.ms";
+public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount 
of time in milliseconds to wait when retrying a request to the broker that has 
repeatedly failed. If provided, the backoff per client will increase 
exponentially for each failed request, up to this maximum. To prevent all 
clients from being synchronized upon retry, a randomization factor of 0.2 will 
be applied to the backoff, resulting in a random range between 20% below and 
20% above the computed value. If retry.backoff.ms is set to be higher than 
retry.backoff.max.ms, then retry.backoff.max.ms will be used as a constant 
backoff from the beginning without any exponential increase";

Review comment:
   Nit: Can we change the wording to the following?
   ```suggestion
   public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum 
amount of time in milliseconds to wait when retrying a request to the broker 
that has repeatedly failed. If provided, the backoff per client will increase 
exponentially for each failed request, up to this maximum. To prevent all 
clients from being synchronized upon retry, a randomized jitter with a factor 
of 0.2 will be applied to the backoff, resulting in the backoff falling within 
a range between 20% below and 20% above the computed value. If retry.backoff.ms 
is set to be higher than retry.backoff.max.ms, then retry.backoff.max.ms will 
be used as a constant backoff from the beginning without any exponential 
increase";
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-06-23 Thread GitBox


skaundinya15 commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r444631685



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -84,6 +84,9 @@
 public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
 public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to 
wait before attempting to retry a failed request to a given topic partition. 
This avoids repeatedly sending requests in a tight loop under some failure 
scenarios.";
 
+public static final String RETRY_BACKOFF_MAX_MS_CONFIG = 
"retry.backoff.max.ms";
+public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount 
of time in milliseconds to wait when retrying a request to the broker that has 
repeatedly failed. If provided, the backoff per client will increase 
exponentially for each failed request, up to this maximum. To prevent all 
clients from being synchronized upon retry, a randomization factor of 0.2 will 
be applied to the backoff, resulting in a random range between 20% below and 
20% above the computed value. If retry.backoff.ms is set to be higher than 
retry.backoff.max.ms, then retry.backoff.max.ms will be used as a constant 
backoff from the beginning without any exponential increase";

Review comment:
   Nit: Can we change the wording to the following?
   ```suggestion
   public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum 
amount of time in milliseconds to wait when retrying a request to the broker 
that has repeatedly failed. If provided, the backoff per client will increase 
exponentially for each failed request, up to this maximum. To prevent all 
clients from being synchronized upon retry, a randomization jitter with a 
factor of 0.2 will be applied to the backoff, resulting in the backoff falling 
within a range between 20% below and 20% above the computed value. If 
retry.backoff.ms is set to be higher than retry.backoff.max.ms, then 
retry.backoff.max.ms will be used as a constant backoff from the beginning 
without any exponential increase";
   ```

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;

Review comment:
   It seems like that refactoring and renaming to `ExponentialBackoff` has 
happened in the patch #8683. Could you refactor this patch on top of that so 
it's more inline with what will be going into `trunk`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-23 Thread Sanjana Kaundinya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124340#comment-17124340
 ] 

Sanjana Kaundinya edited comment on KAFKA-9800 at 6/24/20, 4:24 AM:


Here are my thoughts:
 1) There should be no changes to the underlying NetworkClient interface. More 
details on what interfaces should change are outlined in the KIP here: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients|https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients).].
 As [~d8tltanc] mentioned, the key difference between the AdminClient and the 
other clients is that the AdminClient supports a per request timeout whereas 
the Producer and Consumer are using the NetworkClient to handle the timeouts. 
In our implementation we should add a per-request support for all clients and 
reuse the code existing in CallRetryContext to apply to all clients to do this.

2) I agree with [~ijuma], I don't see really much of a benefit for doing a 
different backoff strategy per client, plus that kind of improvement goes 
beyond the scope of this KIP.

 

[~d8tltanc] could you shed more light on how to refactor the AdminClient to 
take out the redundant logic? I want to understand how much this would change 
and if all this change would still be in the scope of the KIP.


was (Author: skaundinya):
Here are my thoughts:
 1) There should be no changes to the underlying NetworkClient interface. More 
details on what interfaces should change are outlined in the KIP 
[here|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients|https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients).]].
 As [~d8tltanc] mentioned, the key difference between the AdminClient and the 
other clients is that the AdminClient supports a per request timeout whereas 
the Producer and Consumer are using the NetworkClient to handle the timeouts. 
In our implementation we should add a per-request support for all clients and 
reuse the code existing in CallRetryContext to apply to all clients to do this.

2) I agree with [~ijuma], I don't see really much of a benefit for doing a 
different backoff strategy per client, plus that kind of improvement goes 
beyond the scope of this KIP.

 

[~d8tltanc] could you shed more light on how to refactor the AdminClient to 
take out the redundant logic? I want to understand how much this would change 
and if all this change would still be in the scope of the KIP.

> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for retry backoff ms.
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, I already wrapped the exponential backoff/timeout util class in 
> my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of attempts and returns the backoff/timeout value at 
> the corresponding level. Thus, we can add a new class property to those 
> classes containing retriable data in order to record the number of failed 
> attempts.
>  
> Changes:
> KafkaProducer:
>  # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts. So we can let the Accumulator 
> calculate the new retry backoff for each bach when it enqueues them, to avoid 
> instantiate the util class multiple times.
>  # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new 
> class property of type `Long` to record the number of attempts.
> KafkaConsumer:
>  # Some synchronous retry use cases. Record the failed attempts in the 
> blocking loop.
>  # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
> Though the actual requests are packed for each node, the current 
> implementation is applying backoff to each topic partition, where the backoff 
> value is kept 

[jira] [Comment Edited] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-23 Thread Sanjana Kaundinya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124340#comment-17124340
 ] 

Sanjana Kaundinya edited comment on KAFKA-9800 at 6/24/20, 4:24 AM:


Here are my thoughts:
 1) There should be no changes to the underlying NetworkClient interface. More 
details on what interfaces should change are outlined in the KIP 
[here|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients|https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients).]].
 As [~d8tltanc] mentioned, the key difference between the AdminClient and the 
other clients is that the AdminClient supports a per request timeout whereas 
the Producer and Consumer are using the NetworkClient to handle the timeouts. 
In our implementation we should add a per-request support for all clients and 
reuse the code existing in CallRetryContext to apply to all clients to do this.

2) I agree with [~ijuma], I don't see really much of a benefit for doing a 
different backoff strategy per client, plus that kind of improvement goes 
beyond the scope of this KIP.

 

[~d8tltanc] could you shed more light on how to refactor the AdminClient to 
take out the redundant logic? I want to understand how much this would change 
and if all this change would still be in the scope of the KIP.


was (Author: skaundinya):
Here are my thoughts:
1) There should be no changes to the underlying NetworkClient interface. More 
details on what interfaces should change are outlined in the KIP 
[here]([https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients).].
 As [~d8tltanc] mentioned, the key difference between the AdminClient and the 
other clients is that the AdminClient supports a per request timeout whereas 
the Producer and Consumer are using the NetworkClient to handle the timeouts. 
In our implementation we should add a per-request support for all clients and 
reuse the code existing in CallRetryContext to apply to all clients to do this.

2) I agree with [~ijuma], I don't see really much of a benefit for doing a 
different backoff strategy per client, plus that kind of improvement goes 
beyond the scope of this KIP.

 

[~d8tltanc] could you shed more light on how to refactor the AdminClient to 
take out the redundant logic? I want to understand how much this would change 
and if all this change would still be in the scope of the KIP.

> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for retry backoff ms.
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, I already wrapped the exponential backoff/timeout util class in 
> my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of attempts and returns the backoff/timeout value at 
> the corresponding level. Thus, we can add a new class property to those 
> classes containing retriable data in order to record the number of failed 
> attempts.
>  
> Changes:
> KafkaProducer:
>  # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts. So we can let the Accumulator 
> calculate the new retry backoff for each bach when it enqueues them, to avoid 
> instantiate the util class multiple times.
>  # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new 
> class property of type `Long` to record the number of attempts.
> KafkaConsumer:
>  # Some synchronous retry use cases. Record the failed attempts in the 
> blocking loop.
>  # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
> Though the actual requests are packed for each node, the current 
> implementation is applying backoff to each topic partition, where the backoff 
> value is 

[jira] [Resolved] (KAFKA-9678) Introduce bounded exponential backoff in clients

2020-06-23 Thread Sanjana Kaundinya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sanjana Kaundinya resolved KAFKA-9678.
--
Resolution: Duplicate

> Introduce bounded exponential backoff in clients
> 
>
> Key: KAFKA-9678
> URL: https://issues.apache.org/jira/browse/KAFKA-9678
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, consumer, producer 
>Reporter: Guozhang Wang
>Assignee: Sanjana Kaundinya
>Priority: Major
>  Labels: needs-kip
>
> In all clients (consumer, producer, admin, and streams) we have retry 
> mechanisms with fixed backoff to handle transient connection issues with 
> brokers. However, with small backoff (many defaults to 100ms) we could send 
> 10s of requests per second to the broker, and if the connection issue is 
> prolonged it means a huge overhead.
> We should consider introducing upper-bounded exponential backoff universally 
> in those clients to reduce the num of retry requests during the period of 
> connection partitioning.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9678) Introduce bounded exponential backoff in clients

2020-06-23 Thread Sanjana Kaundinya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143505#comment-17143505
 ] 

Sanjana Kaundinya commented on KAFKA-9678:
--

The work for this is being done by [~d8tltanc] in KAFKA-9800, so I will close 
this ticket out as he has a PR in progress against that ticket.

> Introduce bounded exponential backoff in clients
> 
>
> Key: KAFKA-9678
> URL: https://issues.apache.org/jira/browse/KAFKA-9678
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, consumer, producer 
>Reporter: Guozhang Wang
>Assignee: Sanjana Kaundinya
>Priority: Major
>  Labels: needs-kip
>
> In all clients (consumer, producer, admin, and streams) we have retry 
> mechanisms with fixed backoff to handle transient connection issues with 
> brokers. However, with small backoff (many defaults to 100ms) we could send 
> 10s of requests per second to the broker, and if the connection issue is 
> prolonged it means a huge overhead.
> We should consider introducing upper-bounded exponential backoff universally 
> in those clients to reduce the num of retry requests during the period of 
> connection partitioning.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10177) Replace/improve Percentiles metrics

2020-06-23 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10177:

Description: 
There's an existing – but seemingly unused – implementation of percentile 
metrics that we attempted to use for end-to-end latency metrics in Streams. 
Unfortunately a number of limitations became apparent, and we ultimately pulled 
the metrics from the 2.6 release pending further investigation/improvement.

The problems we encountered were
 # Need to set a static upper/lower limit for the values
 # Not well suited to a distribution with a long tail, ie setting the max value 
too high caused the accuracy to plummet
 # Required a lot of memory per metric for reasonable accuracy and caused us to 
hit OOM (unclear if there was actually a memory leak, or it was just gobbling 
up unnecessarily large amounts in general)

Since the Percentiles class is part of the public API, we may need to create a 
new class altogether and possibly deprecate/remove the old one. Alternatively 
we can consider just re-implementing the existing class from scratch, and just 
deprecating the current constructors and associated implementation (eg the 
constructor accepts a max)

  was:
There's an existing – but unused – implementation of percentile metrics that we 
attempted to use for end-to-end latency metrics in Streams. Unfortunately a 
number of limitations became apparent, and we ultimately pulled the metrics 
from the 2.6 release pending further investigation/improvement.

The problems we encountered were
 # Need to set a static upper/lower limit for the values
 # For reasonable accuracy over a large range of possible values (see above), a 
lot of memory per metric was required.
 # Possible memory leak, which combined with issue #2 caused us to hit OOM and 
die

Since the Percentiles class is part of the public API, we may need to create a 
new class altogether and possibly deprecate/remove the old one. Alternatively 
we can consider just re-implementing the existing class from scratch, and just 
deprecating the current constructors and associated implementation (eg the 
constructor accepts a max)


> Replace/improve Percentiles metrics
> ---
>
> Key: KAFKA-10177
> URL: https://issues.apache.org/jira/browse/KAFKA-10177
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> There's an existing – but seemingly unused – implementation of percentile 
> metrics that we attempted to use for end-to-end latency metrics in Streams. 
> Unfortunately a number of limitations became apparent, and we ultimately 
> pulled the metrics from the 2.6 release pending further 
> investigation/improvement.
> The problems we encountered were
>  # Need to set a static upper/lower limit for the values
>  # Not well suited to a distribution with a long tail, ie setting the max 
> value too high caused the accuracy to plummet
>  # Required a lot of memory per metric for reasonable accuracy and caused us 
> to hit OOM (unclear if there was actually a memory leak, or it was just 
> gobbling up unnecessarily large amounts in general)
> Since the Percentiles class is part of the public API, we may need to create 
> a new class altogether and possibly deprecate/remove the old one. 
> Alternatively we can consider just re-implementing the existing class from 
> scratch, and just deprecating the current constructors and associated 
> implementation (eg the constructor accepts a max)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10195) Move offset management codes from ConsumerCoordinator to a new class

2020-06-23 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-10195:
---
Description: 
ConsumerCoordinator has 2 main functions:
 # partitions assignment
 # offset management

We are adding some new features in it, for example KAFKA-9657 add a field 
`throwOnFetchStableOffsetsUnsupported` which only used in offset management.

And the 2 functions almost don't interact with each other, so it's not wise to 
put these code in one single class, can we try to move offset management code 
to a new class?

For example, the below fields only used in offset management:
 ```

// can be move to another class directly
 private final OffsetCommitCallback defaultOffsetCommitCallback;
 private final ConsumerInterceptors interceptors;
 private final AtomicInteger pendingAsyncCommits;
 private final ConcurrentLinkedQueue 
completedOffsetCommits;
 private AtomicBoolean asyncCommitFenced;
 private final boolean throwOnFetchStableOffsetsUnsupported;
 private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;

 

// used in `onJoinComplete` but can also be moved out.

private final boolean autoCommitEnabled;
 private final int autoCommitIntervalMs;
 private Timer nextAutoCommitTimer;
 ```

So we can just create a new class `OffsetManageCoordinator` and move the 
related codes into it. Similarly, a new class `SubscribeManager` can also be 
created. here is the UML class diagram:

!image-2020-06-24-10-47-44-733.png!

 

The above is the current design in which KafkaConsumer interact with Consumer 
directly. the below is the new design, we add a `ConsumerCoordinatorFacade` in 
which we put `OffsetCoordinator` and `SubscribeCoordinator` to manage offset 
and assigning respectively. both `OffsetCoordinator` and `SubscribeCoordinator` 
need a `AbstractCoordinator` cause they will interact with each other(even 
rarely).

 

  was:
ConsumerCoordinator has 2 main functions:
 # partitions assignment
 # offset management

We are adding some new features in it, for example KAFKA-9657 add a field 
`throwOnFetchStableOffsetsUnsupported` which only used in offset management.

And the 2 functions almost don't interact with each other, so it's not wise to 
put these code in one single class, can we try to move offset management code 
to a new class.

For example, the below fields only used in offset management:
 ```

// can be move to another class directly
 private final OffsetCommitCallback defaultOffsetCommitCallback;
 private final ConsumerInterceptors interceptors;
 private final AtomicInteger pendingAsyncCommits;
 private final ConcurrentLinkedQueue 
completedOffsetCommits;
 private AtomicBoolean asyncCommitFenced;
 private final boolean throwOnFetchStableOffsetsUnsupported;
 private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;

 

// used in `onJoinComplete` but can also be moved out.

private final boolean autoCommitEnabled;
 private final int autoCommitIntervalMs;
 private Timer nextAutoCommitTimer;
 ```

So we can just create a new class `OffsetManageCoordinator` and move the 
related codes into it. Similarly, a new class `SubscribeManager` can also be 
created. here is the UML class diagram:

!image-2020-06-24-10-47-44-733.png!

 

The above is the current design in which KafkaConsumer interact with Consumer 
directly. the below is the new design, we add a `ConsumerCoordinatorFacade` in 
which we put `OffsetCoordinator` and `SubscribeCoordinator` to manage offset 
and assigning respectively. both `OffsetCoordinator` and `SubscribeCoordinator` 
need a `AbstractCoordinator` cause they will interact with each other(even 
rarely).

 


> Move offset management codes from ConsumerCoordinator to a new class
> 
>
> Key: KAFKA-10195
> URL: https://issues.apache.org/jira/browse/KAFKA-10195
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
> Attachments: image-2020-06-24-10-47-44-733.png
>
>
> ConsumerCoordinator has 2 main functions:
>  # partitions assignment
>  # offset management
> We are adding some new features in it, for example KAFKA-9657 add a field 
> `throwOnFetchStableOffsetsUnsupported` which only used in offset management.
> And the 2 functions almost don't interact with each other, so it's not wise 
> to put these code in one single class, can we try to move offset management 
> code to a new class?
> For example, the below fields only used in offset management:
>  ```
> // can be move to another class directly
>  private final OffsetCommitCallback defaultOffsetCommitCallback;
>  private final ConsumerInterceptors interceptors;
>  private final AtomicInteger pendingAsyncCommits;
>  private final 

[jira] [Updated] (KAFKA-10195) Move offset management codes from ConsumerCoordinator to a new class

2020-06-23 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-10195:
---
Description: 
ConsumerCoordinator has 2 main functions:
 # partitions assignment
 # offset management

We are adding some new features in it, for example KAFKA-9657 add a field 
`throwOnFetchStableOffsetsUnsupported` which only used in offset management.

And the 2 functions almost don't interact with each other, so it's not wise to 
put these code in one single class, can we try to move offset management code 
to a new class.

For example, the below fields only used in offset management:
 ```

// can be move to another class directly
 private final OffsetCommitCallback defaultOffsetCommitCallback;
 private final ConsumerInterceptors interceptors;
 private final AtomicInteger pendingAsyncCommits;
 private final ConcurrentLinkedQueue 
completedOffsetCommits;
 private AtomicBoolean asyncCommitFenced;
 private final boolean throwOnFetchStableOffsetsUnsupported;
 private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;

 

// used in `onJoinComplete` but can also be moved out.

private final boolean autoCommitEnabled;
 private final int autoCommitIntervalMs;
 private Timer nextAutoCommitTimer;
 ```

So we can just create a new class `OffsetManageCoordinator` and move the 
related codes into it. Similarly, a new class `SubscribeManager` can also be 
created. here is the UML class diagram:

!image-2020-06-24-10-47-44-733.png!

 

The above is the current design in which KafkaConsumer interact with Consumer 
directly. the below is the new design, we add a `ConsumerCoordinatorFacade` in 
which we put `OffsetCoordinator` and `SubscribeCoordinator` to manage offset 
and assigning respectively. both `OffsetCoordinator` and `SubscribeCoordinator` 
need a `AbstractCoordinator` cause they will interact with each other(even 
rarely).

 

  was:
ConsumerCoordinator has 2 main functions:
 # partitions assignment
 # offset management

We are adding some new features in it, for example KAFKA-9657 add a field 
`throwOnFetchStableOffsetsUnsupported` which only used in offset management.

And the 2 functions almost don't interact with each other, so it's not wise to 
put these code in one single class, can we try to move offset management code 
to a new class.

For example, the below fields only used in offset management:
 ```

// can be move to another class directly
 private final OffsetCommitCallback defaultOffsetCommitCallback;
 private final ConsumerInterceptors interceptors;
 private final AtomicInteger pendingAsyncCommits;
 private final ConcurrentLinkedQueue 
completedOffsetCommits;
 private AtomicBoolean asyncCommitFenced;
 private final boolean throwOnFetchStableOffsetsUnsupported;
 private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;

 

// used in `onJoinComplete` but can also be moved out.

private final boolean autoCommitEnabled;
private final int autoCommitIntervalMs;
private Timer nextAutoCommitTimer;
 ```

so we can just create a new class `OffsetManageCoordinator` and move the 
related codes into it.

 


> Move offset management codes from ConsumerCoordinator to a new class
> 
>
> Key: KAFKA-10195
> URL: https://issues.apache.org/jira/browse/KAFKA-10195
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
> Attachments: image-2020-06-24-10-47-44-733.png
>
>
> ConsumerCoordinator has 2 main functions:
>  # partitions assignment
>  # offset management
> We are adding some new features in it, for example KAFKA-9657 add a field 
> `throwOnFetchStableOffsetsUnsupported` which only used in offset management.
> And the 2 functions almost don't interact with each other, so it's not wise 
> to put these code in one single class, can we try to move offset management 
> code to a new class.
> For example, the below fields only used in offset management:
>  ```
> // can be move to another class directly
>  private final OffsetCommitCallback defaultOffsetCommitCallback;
>  private final ConsumerInterceptors interceptors;
>  private final AtomicInteger pendingAsyncCommits;
>  private final ConcurrentLinkedQueue 
> completedOffsetCommits;
>  private AtomicBoolean asyncCommitFenced;
>  private final boolean throwOnFetchStableOffsetsUnsupported;
>  private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
>  
> // used in `onJoinComplete` but can also be moved out.
> private final boolean autoCommitEnabled;
>  private final int autoCommitIntervalMs;
>  private Timer nextAutoCommitTimer;
>  ```
> So we can just create a new class `OffsetManageCoordinator` and move the 
> related codes into it. Similarly, a new class 

[jira] [Updated] (KAFKA-10177) Replace/improve Percentiles metrics

2020-06-23 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10177:

Summary: Replace/improve Percentiles metrics  (was: Replace/improve/remove 
Percentiles metric)

> Replace/improve Percentiles metrics
> ---
>
> Key: KAFKA-10177
> URL: https://issues.apache.org/jira/browse/KAFKA-10177
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> There's an existing – but unused – implementation of percentile metrics that 
> we attempted to use for end-to-end latency metrics in Streams. Unfortunately 
> a number of limitations became apparent, and we ultimately pulled the metrics 
> from the 2.6 release pending further investigation/improvement.
> The problems we encountered were
>  # Need to set a static upper/lower limit for the values
>  # For reasonable accuracy over a large range of possible values (see above), 
> a lot of memory per metric was required.
>  # Possible memory leak, which combined with issue #2 caused us to hit OOM 
> and die
> Since the Percentiles class is part of the public API, we may need to create 
> a new class altogether and possibly deprecate/remove the old one. 
> Alternatively we can consider just re-implementing the existing class from 
> scratch, and just deprecating the current constructors and associated 
> implementation (eg the constructor accepts a max)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10195) Move offset management codes from ConsumerCoordinator to a new class

2020-06-23 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-10195:
---
Attachment: image-2020-06-24-10-47-44-733.png

> Move offset management codes from ConsumerCoordinator to a new class
> 
>
> Key: KAFKA-10195
> URL: https://issues.apache.org/jira/browse/KAFKA-10195
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
> Attachments: image-2020-06-24-10-47-44-733.png
>
>
> ConsumerCoordinator has 2 main functions:
>  # partitions assignment
>  # offset management
> We are adding some new features in it, for example KAFKA-9657 add a field 
> `throwOnFetchStableOffsetsUnsupported` which only used in offset management.
> And the 2 functions almost don't interact with each other, so it's not wise 
> to put these code in one single class, can we try to move offset management 
> code to a new class.
> For example, the below fields only used in offset management:
>  ```
> // can be move to another class directly
>  private final OffsetCommitCallback defaultOffsetCommitCallback;
>  private final ConsumerInterceptors interceptors;
>  private final AtomicInteger pendingAsyncCommits;
>  private final ConcurrentLinkedQueue 
> completedOffsetCommits;
>  private AtomicBoolean asyncCommitFenced;
>  private final boolean throwOnFetchStableOffsetsUnsupported;
>  private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
>  
> // used in `onJoinComplete` but can also be moved out.
> private final boolean autoCommitEnabled;
> private final int autoCommitIntervalMs;
> private Timer nextAutoCommitTimer;
>  ```
> so we can just create a new class `OffsetManageCoordinator` and move the 
> related codes into it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10196) Add missing '--version' option to Kafka command producer-performance

2020-06-23 Thread jiamei xie (Jira)
jiamei xie created KAFKA-10196:
--

 Summary: Add missing '--version' option to Kafka command 
producer-performance
 Key: KAFKA-10196
 URL: https://issues.apache.org/jira/browse/KAFKA-10196
 Project: Kafka
  Issue Type: Bug
  Components: producer , tools
Reporter: jiamei xie
Assignee: jiamei xie


Option '--version'  is missing in Kafka command producer-performance



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10175) MetadataCache::getClusterMetadata returns null for offline replicas

2020-06-23 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143453#comment-17143453
 ] 

Chia-Ping Tsai commented on KAFKA-10175:


just curious, does it cause exception on production?

> MetadataCache::getClusterMetadata returns null for offline replicas
> ---
>
> Key: KAFKA-10175
> URL: https://issues.apache.org/jira/browse/KAFKA-10175
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vikas Singh
>Priority: Major
>
> This line in the code always returns null:
> [MetadataCache::getClusterMetadata|https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/server/MetadataCache.scala#L272]
> The reason is that the `map(node)` part uses `aliveNodes` to create `Node` 
> object otherwise default to `null`. Offline replicas thus end up always as 
> null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10140) Incremental config api excludes plugin config changes

2020-06-23 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143449#comment-17143449
 ] 

Chia-Ping Tsai commented on KAFKA-10140:


It seems to me users should be aware of the config they are trying to append or 
subtract. If the kind of config is NOT list, it is nice to produce quick 
failure. If we can't get the type of config, we should do what users want to do 
rather than obstruct users from doing anything.

For another, there are two minor we can address before truly resolving this one.

1. add comment to Admin#incrementalAlterConfigs to remind users that both 
APPEND and SUBTRACT don't work for plugin components' configs.
2. replace NoSuchElementException by InvalidConfigurationException. the later 
is what we should throw.

[~ijuma] WDYT?

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
> Fix For: 2.6.0
>
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> ```
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
>   at scala.collection.MapLike.default(MapLike.scala:235)
>   at scala.collection.MapLike.default$(MapLike.scala:234)
>   at scala.collection.AbstractMap.default(Map.scala:65)
>   at scala.collection.MapLike.apply(MapLike.scala:144)
>   at scala.collection.MapLike.apply$(MapLike.scala:143)
>   at scala.collection.AbstractMap.apply(Map.scala:65)
>   at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
>   at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
>   at 
> kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
>   at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
>   at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`. 
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8919: HOTFIX: use .equals to compare HostInfo in StreamsMetadataState

2020-06-23 Thread GitBox


mjsax commented on pull request #8919:
URL: https://github.com/apache/kafka/pull/8919#issuecomment-648534515


   Merged to `2.5` and cherry-picked to `2.4`.
   
   It's fixed in `trunk` and `2.6` via #8900



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8919: HOTFIX: use .equals to compare HostInfo in StreamsMetadataState

2020-06-23 Thread GitBox


mjsax merged pull request #8919:
URL: https://github.com/apache/kafka/pull/8919


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9846) Race condition can lead to severe lag underestimate for active tasks

2020-06-23 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143437#comment-17143437
 ] 

Sophie Blee-Goldman commented on KAFKA-9846:


I think we can just leave it open and maybe someone from the community will 
pick it up

> Race condition can lead to severe lag underestimate for active tasks
> 
>
> Key: KAFKA-9846
> URL: https://issues.apache.org/jira/browse/KAFKA-9846
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Sophie Blee-Goldman
>Assignee: Vinoth Chandar
>Priority: Critical
> Fix For: 2.6.0
>
>
> In KIP-535 we added the ability to query still-restoring and standby tasks. 
> To give users control over how out of date the data they fetch can be, we 
> added an API to KafkaStreams that fetches the end offsets for all changelog 
> partitions and computes the lag for each local state store.
> During this lag computation, we check whether an active task is in RESTORING 
> and calculate the actual lag if so. If not, we assume it's in RUNNING and 
> return a lag of zero. However, tasks may be in other states besides running 
> and restoring; notably they first pass through the CREATED state before 
> getting to RESTORING. A CREATED task may happen to be caught-up to the end 
> offset, but in many cases it is likely to be lagging or even completely 
> uninitialized.
> This introduces a race condition where users may be led to believe that a 
> task has zero lag and is "safe" to query even with the strictest correctness 
> guarantees, while the task is actually lagging by some unknown amount.  
> During transfer of ownership of the task between different threads on the 
> same machine, tasks can actually spend a while in CREATED while the new owner 
> waits to acquire the task directory lock. So, this race condition may not be 
> particularly rare in multi-threaded Streams applications



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #8919: HOTFIX: use .equals to compare HostInfo in StreamsMetadataState

2020-06-23 Thread GitBox


ableegoldman commented on pull request #8919:
URL: https://github.com/apache/kafka/pull/8919#issuecomment-648526072


   Wow, all tests passed on the first try! cc @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


guozhangwang commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648525527


   Merged to trunk and cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


guozhangwang merged pull request #8900:
URL: https://github.com/apache/kafka/pull/8900


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishbellapu commented on a change in pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2020-06-23 Thread GitBox


satishbellapu commented on a change in pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#discussion_r444586705



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -123,6 +125,10 @@
 private static final String CONSUMER_POLL_TIMEOUT_MILLIS_DOC = "Timeout 
when polling source cluster.";
 public static final long CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT = 1000L;
 
+public static final String CONSUMER_AUTO_OFFSET_RESET = 
"consumer.auto.offset.reset";
+private static final String CONSUMER_AUTO_OFFSET_RESET_DOC = "Consumer 
Auto offset reset, defaults to earliest unless specify.";

Review comment:
   Updated "Consumer Auto offset reset, default to earliest unless 
specified."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on a change in pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2020-06-23 Thread GitBox


ryannedolan commented on a change in pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#discussion_r444580176



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -123,6 +125,10 @@
 private static final String CONSUMER_POLL_TIMEOUT_MILLIS_DOC = "Timeout 
when polling source cluster.";
 public static final long CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT = 1000L;
 
+public static final String CONSUMER_AUTO_OFFSET_RESET = 
"consumer.auto.offset.reset";
+private static final String CONSUMER_AUTO_OFFSET_RESET_DOC = "Consumer 
Auto offset reset, defaults to earliest unless specify.";

Review comment:
   can leave off "unless specified" -- is redundant with "default".





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9935) Kafka not releasing member from Consumer Group

2020-06-23 Thread Steve Kecskes (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143428#comment-17143428
 ] 

Steve Kecskes commented on KAFKA-9935:
--

Agreed, it does look similar to KAFKA-10105.
I see further related issues here:
https://github.com/apache/kafka/pull/7753
https://issues.apache.org/jira/browse/KAFKA-9752

I will try to reproduce against 2.5.0

> Kafka not releasing member from Consumer Group
> --
>
> Key: KAFKA-9935
> URL: https://issues.apache.org/jira/browse/KAFKA-9935
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.1
> Environment: Linux
>Reporter: Steve Kecskes
>Priority: Major
>
> Hello. I am experiencing an issue where Kafka is not releasing members from a 
> consumer group when the member crashes. The consumer group is then stuck in 
> rebalancing state indefinitely.
> In this consumer group, there is only 1 client. This client has the following 
> related settings:
> {code:java}
> auto.commit.interval.ms = 5000
>  auto.offset.reset = earliest
>  bootstrap.servers = [austgkafka01.hk.eclipseoptions.com:9092]
>  check.crcs = true
>  client.dns.lookup = default
>  client.id = TraderAutomationViewServer_workaround_stuck_rebalance_20200427-0
>  connections.max.idle.ms = 54
>  default.api.timeout.ms = 6
>  enable.auto.commit = true
>  exclude.internal.topics = true
>  fetch.max.bytes = 52428800
>  fetch.max.wait.ms = 500
>  fetch.min.bytes = 1
>  group.id = TraderAutomationViewServer_workaround_stuck_rebalance_20200427
>  heartbeat.interval.ms = 3000
>  interceptor.classes = []
>  internal.leave.group.on.close = true
>  isolation.level = read_uncommitted
>  key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>  max.partition.fetch.bytes = 1048576
>  max.poll.interval.ms = 30
>  max.poll.records = 1
>  metadata.max.age.ms = 30
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.recording.level = INFO
>  metrics.sample.window.ms = 3
>  partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>  receive.buffer.bytes = 16777216
>  reconnect.backoff.max.ms = 1000
>  reconnect.backoff.ms = 50
>  request.timeout.ms = 3
>  retry.backoff.ms = 100
>  sasl.client.callback.handler.class = null
>  sasl.jaas.config = null
>  sasl.kerberos.kinit.cmd = /usr/bin/kinit
>  sasl.kerberos.min.time.before.relogin = 6
>  sasl.kerberos.service.name = null
>  sasl.kerberos.ticket.renew.jitter = 0.05
>  sasl.kerberos.ticket.renew.window.factor = 0.8
>  sasl.login.callback.handler.class = null
>  sasl.login.class = null
>  sasl.login.refresh.buffer.seconds = 300
>  sasl.login.refresh.min.period.seconds = 60
>  sasl.login.refresh.window.factor = 0.8
>  sasl.login.refresh.window.jitter = 0.05
>  sasl.mechanism = GSSAPI
>  security.protocol = PLAINTEXT
>  send.buffer.bytes = 131072
>  session.timeout.ms = 1
>  ssl.cipher.suites = null
>  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>  ssl.endpoint.identification.algorithm = https
>  ssl.key.password = null
>  ssl.keymanager.algorithm = SunX509
>  ssl.keystore.location = null
>  ssl.keystore.password = null
>  ssl.keystore.type = JKS
>  ssl.protocol = TLS
>  ssl.provider = null
>  ssl.secure.random.implementation = null
>  ssl.trustmanager.algorithm = PKIX
>  ssl.truststore.location = null
>  ssl.truststore.password = null
>  ssl.truststore.type = JKS
>  value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> {code}
> If the client crashes (not a graceful exit from group) the group remains in 
> the following state indefinitely.
> {code}
> Warning: Consumer group 
> 'TraderAutomationViewServer_workaround_stuck_rebalance' is rebalancing.
> GROUP TOPIC 
> PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID 
> HOSTCLIENT-ID
> TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 10
>  6984061 7839599 855538  -   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics8 
>  128459531   143736443   15276912-   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 12
>  7216495 8106030 889535  -   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics6 
>  122921729   137377358   14455629-   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 14
>  5457618 6171142 713524  -   -
>-
> 

[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration

2020-06-23 Thread sats (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143427#comment-17143427
 ] 

sats commented on KAFKA-10160:
--

Created a pull request [https://github.com/apache/kafka/pull/8921]

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishbellapu opened a new pull request #8921: Update MirrorConnectorConfig.java

2020-06-23 Thread GitBox


satishbellapu opened a new pull request #8921:
URL: https://github.com/apache/kafka/pull/8921


   KAFKA-10160: Removed hardcoded auto.offset.reset in MM2 consumer 
configuration, retained default as earliest unless specified.
   
   ### Committer Checklist (excluded from commit message)
   - [*] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8920: DOCS-4446: document timestamped state stores

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8920:
URL: https://github.com/apache/kafka/pull/8920#discussion_r444554053



##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -49,6 +49,7 @@
 Defining and creating a 
State Store
 Fault-tolerant State 
Stores
 Enable or Disable Fault Tolerance of State Stores (Store 
Changelogs)
+Timestamped State Stores

Review comment:
   Should we also extend `Defining and creating a State Store` ? -- It 
seems the section you added below, focuses on their usage?
   
   

##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -343,6 +344,38 @@ 
 
 
 
+
+Timestamped State Stores
+
+Starting in Kafka Streams 2.3, you can store record 
timestamps in KTables.

Review comment:
   > Starting in Kafka Streams 2.3
   
   Do we need this?
   
   > can store
   
   `KTables` always store timestamps by default. Should we rephrase?

##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -343,6 +344,38 @@ 
 
 
 
+
+Timestamped State Stores
+
+Starting in Kafka Streams 2.3, you can store record 
timestamps in KTables.
+A timestamped state store improves stream processing 
semantics and enables
+handling out-of-order data in source KTables, detecting 
out-of-order joins and aggregations,
+and getting the timestamp of the latest update in an 
Interactive Query.
+
+You can query timestamped state stores both with and 
without a timestamp.
+
+Use TimestampedKeyValueStore
+when you need a key-(value/timestamp) store that supports 
put/get/delete and range queries. 
+
+
+Use TimestampedWindowStore
+when you need to store windowedKey-(value/timestamp) 
pairs.
+
+The Stores
+class provides corresponding factory methods for 
timestamped stores.
+

Review comment:
   Seems this part belong to section `Defining and creating a State Store` ?

##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -343,6 +344,38 @@ 
 
 
 
+
+Timestamped State Stores
+
+Starting in Kafka Streams 2.3, you can store record 
timestamps in KTables.
+A timestamped state store improves stream processing 
semantics and enables
+handling out-of-order data in source KTables, detecting 
out-of-order joins and aggregations,
+and getting the timestamp of the latest update in an 
Interactive Query.
+
+You can query timestamped state stores both with and 
without a timestamp.
+
+Use TimestampedKeyValueStore
+when you need a key-(value/timestamp) store that supports 
put/get/delete and range queries. 
+
+
+Use TimestampedWindowStore
+when you need to store windowedKey-(value/timestamp) 
pairs.
+
+The Stores
+class provides corresponding factory methods for 
timestamped stores.
+
+Upgrade note: All users upgrade with a single 
rolling bounce per instance.
+
+For Platform API users, nothing changes in 
existing applications, and you

Review comment:
   `Platform API` ? Do you mean `Processor API` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r444555724



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 int remainingRetries = retries;
 Set topicsNotReady = new HashSet<>(topics.keySet());
 final Set newlyCreatedTopics = new HashSet<>();
+final HashSet leaderNotAvailableTopics = new HashSet<>();

Review comment:
   Can we give this a more descriptive name? It might be obvious to you, 
but I think someone just looking at this code for the first time would not get 
that this actually means topics that may or may not already exist.
   That said, I'm struggling to think of a good alternative...maybe 
`possiblyCreatedTopics` or `unknownTopics`...any better ideas?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult 
describeTopics(Collection topic
 future.completeExceptionally(new 
UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
 topicDescriptions.put(requestedTopic, future);
 }
+// try to simulate the leader not available situation when topic 
name is "LeaderNotAvailableTopic"
+if (requestedTopic.equals("LeaderNotAvailableTopic")) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new 
LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not 
available."));

Review comment:
   Is it possible to use `EasyMock` instead of adding this to the actual 
`MockAdminClient`? I know it's kind of a pain to set up but I think it'll make 
the test a lot more clear. I did something similar in 
StreamsPartitionAssignorTest to mock the results of the `listOffsets` request

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -157,16 +161,16 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
 }
 
 
-if (!topicsNotReady.isEmpty()) {
-log.info("Topics {} can not be made ready with {} retries 
left", topicsNotReady, retries);
+if (isNeedRetry(topicsNotReady)) {
+log.info("Topics {} can not be made ready with {} retries 
left", topicsNotReady, remainingRetries);

Review comment:
   Good catch

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -242,11 +256,16 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
 log.error(errorMsg);
 throw new StreamsException(errorMsg);
 }
-} else {
+} else if (!leaderNotAvailableTopics.contains(topicName)) {
 topicsToCreate.add(topicName);
 }
 }
 
 return topicsToCreate;
 }
+
+private boolean shouldRetry(final Set topicsNotReady, final 
HashSet leaderNotAvailableTopics) {
+// If there's topic with LeaderNotAvailableException, we still need 
retry
+return !topicsNotReady.isEmpty() || leaderNotAvailableTopics.size() > 
0;

Review comment:
   Can we just use `!isEmpty` for both sets?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


guozhangwang commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648472153


   LGTM! I will merge after green jenkins.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r444552897



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorStateManager.java
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+
+public abstract class AbstractProcessorStateManager implements StateManager {

Review comment:
   Do we really need this? It seems like overkill to add this abstract 
class just for one trivial shared method. I'm also wondering if we really need 
`changelogFor` at all -- can't we just use `changelogTopicPartitionFor` and get 
the topic name from that?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn opened a new pull request #8920: DOCS-4446: document timestamped state stores

2020-06-23 Thread GitBox


JimGalasyn opened a new pull request #8920:
URL: https://github.com/apache/kafka/pull/8920


   Add a section on the timestamped state store interfaces, per 
[KIP-258](https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r444541033



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -402,4 +402,9 @@ public TaskType taskType() {
 public Map changelogOffsets() {
 return Collections.unmodifiableMap(checkpointFileCache);
 }
+
+@Override
+public TopicPartition changelogTopicPartitionFor(final String storeName) {
+return null;

Review comment:
   Should we better throw an exception as this method should never be 
called for global state stores?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -118,16 +111,24 @@ public void logChange(final String storeName,
   final byte[] value,
   final long timestamp) {
 throwUnsupportedOperationExceptionIfStandby("logChange");
+
+final TopicPartition changelogPartition = 
stateManager().changelogTopicPartitionFor(storeName);
+if (changelogPartition == null) {
+throw new IllegalStateException("Sending records to state store " 
+ storeName +
+" which has not been registered.");

Review comment:
   I thinks the store could still be registered, but just does not have a 
changelog topic? Wondering if the error message might be miss leading?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -578,4 +577,10 @@ private StateStoreMetadata findStore(final TopicPartition 
changelogPartition) {
 
 return found.isEmpty() ? null : found.get(0);
 }
+
+@Override
+public TopicPartition changelogTopicPartitionFor(final String storeName) {
+final StateStoreMetadata storeMetadata = stores.get(storeName);
+return storeMetadata == null ? null : storeMetadata.changelogPartition;

Review comment:
   Cf. my comment from above: we could raise the exception if 
`storeMetadata == null`

##
File path: 
streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
##
@@ -88,4 +88,14 @@ public StateStore getGlobalStore(final String name) {
 public TaskType taskType() {
 return TaskType.GLOBAL;
 }
+
+@Override
+public String changelogFor(final String storeName) {
+return null;
+}
+
+@Override
+public TopicPartition changelogTopicPartitionFor(final String storeName) {
+return null;

Review comment:
   As above: should we throw here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


mjsax commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-648462369


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8585: KAFKA-9938; Debug consumer should be able to fetch from followers

2020-06-23 Thread GitBox


abbccdda commented on pull request #8585:
URL: https://github.com/apache/kafka/pull/8585#issuecomment-648460501


   Only known flaky test failures:
   ```
   
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
   
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8107: MINOR: Remove Diamond and code code Alignment

2020-06-23 Thread GitBox


abbccdda commented on pull request #8107:
URL: https://github.com/apache/kafka/pull/8107#issuecomment-648459539


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #6737: KAFKA-8338: consumer offset expiration should consider subscription.

2020-06-23 Thread GitBox


abbccdda commented on pull request #6737:
URL: https://github.com/apache/kafka/pull/6737#issuecomment-648459250


   Closed according to @dajac source.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda closed pull request #6737: KAFKA-8338: consumer offset expiration should consider subscription.

2020-06-23 Thread GitBox


abbccdda closed pull request #6737:
URL: https://github.com/apache/kafka/pull/6737


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648457282


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648456919


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444538711



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##
@@ -474,6 +492,7 @@ public void 
shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
 " indicating the task may be migrated out; it means all tasks 
belonging to this thread should be migrated.")
 );
 
+collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);

Review comment:
   Not 100% sure how `OnSubsequentCall` is meant either. But what you say 
seems to make sense and thus it should be different test. Thanks for the extra 
mile splitting them up!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8919: HOTFIX: use .equals to compare HostInfo in StreamsMetadataState

2020-06-23 Thread GitBox


mjsax commented on pull request #8919:
URL: https://github.com/apache/kafka/pull/8919#issuecomment-648456709


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8919: HOTFIX: use .equals to compare HostInfo in StreamsMetadataState

2020-06-23 Thread GitBox


ableegoldman commented on pull request #8919:
URL: https://github.com/apache/kafka/pull/8919#issuecomment-648455230


   cc @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444537297



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -152,9 +152,9 @@ public StreamsMetadata getLocalMetadata() {
 }
 
 if (globalStores.contains(storeName)) {
-// global stores are on every node. if we dont' have the host info
+// global stores are on every node. if we don't have the host info
 // for this host then just pick the first metadata
-if (thisHost == UNKNOWN_HOST) {
+if (thisHost.equals(UNKNOWN_HOST)) {

Review comment:
   https://github.com/apache/kafka/pull/8919





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8919: HOTFIX: use .equals to compare HostInfo in StreamsMetadataState

2020-06-23 Thread GitBox


ableegoldman opened a new pull request #8919:
URL: https://github.com/apache/kafka/pull/8919


   Hotfix for patching this in older branches



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #8845: KAFKA-10126:Add a warning message for ConsumerPerformance

2020-06-23 Thread GitBox


abbccdda merged pull request #8845:
URL: https://github.com/apache/kafka/pull/8845


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication

2020-06-23 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143314#comment-17143314
 ] 

Boyang Chen commented on KAFKA-9509:


Failed again:
h3. Stacktrace

java.lang.RuntimeException: Could not find enough records. found 0, expected 
100 at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435)
 at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:222)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

 

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7083/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/]

> Fix flaky test MirrorConnectorsIntegrationTest.testReplication
> --
>
> Key: KAFKA-9509
> URL: https://issues.apache.org/jira/browse/KAFKA-9509
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sanjana Kaundinya
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.5.0
>
>
> The test 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
>  is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of 
> when the connectors and tasks are started up. The fix for this would make it 
> such that when the connectors are started up, to wait until the REST endpoint 
> returns a positive number of tasks to be confident that we can start testing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #8845: KAFKA-10126:Add a warning message for ConsumerPerformance

2020-06-23 Thread GitBox


abbccdda commented on pull request #8845:
URL: https://github.com/apache/kafka/pull/8845#issuecomment-648449655


   Having tested 3 * 3 times, and only flaky test failed such as this one:
   https://issues.apache.org/jira/browse/KAFKA-9509
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444530988



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##
@@ -474,6 +492,7 @@ public void 
shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
 " indicating the task may be migrated out; it means all tasks 
belonging to this thread should be migrated.")
 );
 
+collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);

Review comment:
   I guess I should just break these up into different tests then, huh. 
Will do





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444528622



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##
@@ -474,6 +492,7 @@ public void 
shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
 " indicating the task may be migrated out; it means all tasks 
belonging to this thread should be migrated.")
 );
 
+collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);

Review comment:
   Maybe I misinterpreted this, but I took the `OnSubsequentCall` in the 
name to mean that it would throw on the next (ie subsequent) call after the 
_send_, not that it would continue to throw on all subsequent calls. ie I think 
it should actually be several different tests (one for each "call" that should 
throw) but got mashed into just one





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444526844



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##
@@ -474,6 +492,7 @@ public void 
shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
 " indicating the task may be migrated out; it means all tasks 
belonging to this thread should be migrated.")
 );
 
+collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);

Review comment:
   For those particular test, considering their names, it seem the tests 
are void now?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444526844



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
##
@@ -474,6 +492,7 @@ public void 
shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedIn
 " indicating the task may be migrated out; it means all tasks 
belonging to this thread should be migrated.")
 );
 
+collector.send(topic, "3", "0", null, null, stringSerializer, 
stringSerializer, streamPartitioner);

Review comment:
   For this particular test, considering the name, it seem the test is void 
now?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444525997



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -679,92 +675,166 @@ private void cleanupTask(final Task task) {
 void shutdown(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
 
-final Set tasksToClose = new HashSet<>();
+final Set tasksToCloseDirty = new HashSet<>();
+tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, 
firstException));
+tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, 
firstException));
+
+for (final Task task : tasksToCloseDirty) {
+closeTaskDirty(task);
+}
+
+for (final Task task : activeTaskIterable()) {
+executeAndMaybeSwallow(
+clean,

Review comment:
   I think this is more in line with the general code flow elsewhere. Note 
that if we started out clean but became dirty and had to close some tasks as 
such, we would have already caught an exception somewhere. So 
`AtomicReference#compareAndSet` would be a no-op, and it actually doesn't 
matter what we pass in here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444522543



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -152,9 +152,9 @@ public StreamsMetadata getLocalMetadata() {
 }
 
 if (globalStores.contains(storeName)) {
-// global stores are on every node. if we dont' have the host info
+// global stores are on every node. if we don't have the host info
 // for this host then just pick the first metadata
-if (thisHost == UNKNOWN_HOST) {
+if (thisHost.equals(UNKNOWN_HOST)) {

Review comment:
   Will do





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444518329



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -679,92 +675,166 @@ private void cleanupTask(final Task task) {
 void shutdown(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
 
-final Set tasksToClose = new HashSet<>();
+final Set tasksToCloseDirty = new HashSet<>();
+tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, 
firstException));
+tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, 
firstException));
+
+for (final Task task : tasksToCloseDirty) {
+closeTaskDirty(task);
+}
+
+for (final Task task : activeTaskIterable()) {
+executeAndMaybeSwallow(
+clean,
+() -> 
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+e -> firstException.compareAndSet(null, e),
+e -> log.warn("Ignoring an exception while closing task " + 
task.id() + " producer.", e)
+);
+}
+
+executeAndMaybeSwallow(
+clean,
+activeTaskCreator::closeThreadProducerIfNeeded,
+e -> firstException.compareAndSet(null, e),
+e -> log.warn("Ignoring an exception while closing thread 
producer.", e)
+);
+
+tasks.clear();
+
+
+// this should be called after closing all tasks, to make sure we 
unlock the task dir for tasks that may
+// have still been in CREATED at the time of shutdown, since 
Task#close will not do so
+executeAndMaybeSwallow(
+clean,

Review comment:
   As above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444517907



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -679,92 +675,166 @@ private void cleanupTask(final Task task) {
 void shutdown(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
 
-final Set tasksToClose = new HashSet<>();
+final Set tasksToCloseDirty = new HashSet<>();
+tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, 
firstException));
+tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, 
firstException));
+
+for (final Task task : tasksToCloseDirty) {
+closeTaskDirty(task);
+}
+
+for (final Task task : activeTaskIterable()) {
+executeAndMaybeSwallow(
+clean,
+() -> 
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+e -> firstException.compareAndSet(null, e),
+e -> log.warn("Ignoring an exception while closing task " + 
task.id() + " producer.", e)
+);
+}
+
+executeAndMaybeSwallow(
+clean,

Review comment:
   As above?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444517746



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -679,92 +675,166 @@ private void cleanupTask(final Task task) {
 void shutdown(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
 
-final Set tasksToClose = new HashSet<>();
+final Set tasksToCloseDirty = new HashSet<>();
+tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, 
firstException));
+tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, 
firstException));
+
+for (final Task task : tasksToCloseDirty) {
+closeTaskDirty(task);
+}
+
+for (final Task task : activeTaskIterable()) {
+executeAndMaybeSwallow(
+clean,

Review comment:
   If `tasksToCloseDirty` is not empty, should we close dirty, too, ie pass 
in `clean && tasksToCloseDirty.isEmpty()` ?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


mjsax commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r444516858



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -152,9 +152,9 @@ public StreamsMetadata getLocalMetadata() {
 }
 
 if (globalStores.contains(storeName)) {
-// global stores are on every node. if we dont' have the host info
+// global stores are on every node. if we don't have the host info
 // for this host then just pick the first metadata
-if (thisHost == UNKNOWN_HOST) {
+if (thisHost.equals(UNKNOWN_HOST)) {

Review comment:
   Should we fix this in older branches (2.5/2.4), too? (ie follow up PR)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #8174: KAFKA-9439: add KafkaProducer API unit tests

2020-06-23 Thread GitBox


abbccdda merged pull request #8174:
URL: https://github.com/apache/kafka/pull/8174


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

2020-06-23 Thread GitBox


mimaison commented on a change in pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#discussion_r444504939



##
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##
@@ -560,16 +561,15 @@ class LoggingMessageFormatter extends MessageFormatter 
with LazyLogging {
 }
 
 class NoOpMessageFormatter extends MessageFormatter {
-  override def init(props: Properties): Unit = {}
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream): Unit = {}
 }
 
 class ChecksumMessageFormatter extends MessageFormatter {
   private var topicStr: String = _
 
-  override def init(props: Properties): Unit = {
-topicStr = props.getProperty("topic")
+  override def configure(configs: Map[String, _]): Unit = {
+topicStr = configs.get("topic").toString

Review comment:
   Good catch! I've pushed a change to address it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


ableegoldman commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648404327


   One unrelated failure: `PlaintextProducerSendTest.testNonBlockingProducer`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9846) Race condition can lead to severe lag underestimate for active tasks

2020-06-23 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143275#comment-17143275
 ] 

Vinoth Chandar commented on KAFKA-9846:
---

Sounds good. Close this as "Wont fix" then?

> Race condition can lead to severe lag underestimate for active tasks
> 
>
> Key: KAFKA-9846
> URL: https://issues.apache.org/jira/browse/KAFKA-9846
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Sophie Blee-Goldman
>Assignee: Vinoth Chandar
>Priority: Critical
> Fix For: 2.6.0
>
>
> In KIP-535 we added the ability to query still-restoring and standby tasks. 
> To give users control over how out of date the data they fetch can be, we 
> added an API to KafkaStreams that fetches the end offsets for all changelog 
> partitions and computes the lag for each local state store.
> During this lag computation, we check whether an active task is in RESTORING 
> and calculate the actual lag if so. If not, we assume it's in RUNNING and 
> return a lag of zero. However, tasks may be in other states besides running 
> and restoring; notably they first pass through the CREATED state before 
> getting to RESTORING. A CREATED task may happen to be caught-up to the end 
> offset, but in many cases it is likely to be lagging or even completely 
> uninitialized.
> This introduces a race condition where users may be led to believe that a 
> task has zero lag and is "safe" to query even with the strictest correctness 
> guarantees, while the task is actually lagging by some unknown amount.  
> During transfer of ownership of the task between different threads on the 
> same machine, tasks can actually spend a while in CREATED while the new owner 
> waits to acquire the task directory lock. So, this race condition may not be 
> particularly rare in multi-threaded Streams applications



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-23 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143238#comment-17143238
 ] 

Sophie Blee-Goldman commented on KAFKA-10179:
-

[~desai.p.rohan] I'm not sure I understand why it's a problem for the 
deserializer to modify the value slightly, by dropping fields to take your 
example. We would end up restoring the full bytes into the store, sure, but the 
plain bytes are never actually used right? We would always go through the 
deserializer when reading the value from the store and using it in an 
operation. So the "extra" fields would still get dropped.

Maybe if your values are bloated with a lot of useful information that you 
didn't want to store, this could blow up the disk usage. But I think there's a 
difference between a simple operation on data to extract only the relevant bits 
– eg dropping a field you don't care about – and fundamentally transforming the 
data to get it into a different form. The former seems reasonable to do during 
a deserialization, but the latter should be its own operation in the topology.

Of course, this just applies to modifying the values. If your deserializer 
modifies the key in any way, this would be a problem since lookups by key would 
fail after a restoration copies over the plain bytes. But I would argue that 
it's illegal to modify the key during de/serialization at all, not because of 
the restoration issue but because it can cause incorrect partitioning.

Anyways, I'm probably overlooking something obvious, but I'm struggling to see 
exactly where and how this breaks. That said I do agree we should clarify that 
`serialize(deserialize())` must be the identity for keys

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9846) Race condition can lead to severe lag underestimate for active tasks

2020-06-23 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143228#comment-17143228
 ] 

Sophie Blee-Goldman commented on KAFKA-9846:


Ok no worries. I don't think it's critical, just wanted to bring it up in case 
you wanted to get it fixed. Since 2.6 is almost out, we can always point people 
who want to use this feature to use the latest version

> Race condition can lead to severe lag underestimate for active tasks
> 
>
> Key: KAFKA-9846
> URL: https://issues.apache.org/jira/browse/KAFKA-9846
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Sophie Blee-Goldman
>Assignee: Vinoth Chandar
>Priority: Critical
> Fix For: 2.6.0
>
>
> In KIP-535 we added the ability to query still-restoring and standby tasks. 
> To give users control over how out of date the data they fetch can be, we 
> added an API to KafkaStreams that fetches the end offsets for all changelog 
> partitions and computes the lag for each local state store.
> During this lag computation, we check whether an active task is in RESTORING 
> and calculate the actual lag if so. If not, we assume it's in RUNNING and 
> return a lag of zero. However, tasks may be in other states besides running 
> and restoring; notably they first pass through the CREATED state before 
> getting to RESTORING. A CREATED task may happen to be caught-up to the end 
> offset, but in many cases it is likely to be lagging or even completely 
> uninitialized.
> This introduces a race condition where users may be led to believe that a 
> task has zero lag and is "safe" to query even with the strictest correctness 
> guarantees, while the task is actually lagging by some unknown amount.  
> During transfer of ownership of the task between different threads on the 
> same machine, tasks can actually spend a while in CREATED while the new owner 
> waits to acquire the task directory lock. So, this race condition may not be 
> particularly rare in multi-threaded Streams applications



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cyrusv opened a new pull request #8918: Use debug level logging for noisy log messages in Connect

2020-06-23 Thread GitBox


cyrusv opened a new pull request #8918:
URL: https://github.com/apache/kafka/pull/8918


   This very simply reduces the log level for these very frequently logged, 
very infrequently useful log messages to debug level.
   My experience operating these connectors is that these logs make reading 
info level logs quite cluttered. The MDC context certainly helps filter, but I 
think debug is more appropriate here -- if the log level were debug and someone 
wanted to promote it to info, that certainly would be an odd desire!
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving

2020-06-23 Thread Gokul Ramanan Subramanian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143168#comment-17143168
 ] 

Gokul Ramanan Subramanian commented on KAFKA-10105:
---

[~ableegoldman], we are unable to reproduce the bug systematically. It shows up 
once in a while. We had a few streams applications using the official Java 
2.4.1 client on a 2.4.1 cluster. Each streams application uses a few threads. 
All the threads across all these applications try to form a single consumer 
group. Not sure what exactly causes the issue, but over a day past shutting 
down the applications, the group coordinator still has some zombie members in 
its metadata. It takes a restart to fix this. If the delayed heartbeat 
operation were functioning as expected, I simply cannot understand how these 
zombie members are still stuck in the coordinator. Wouldn't the coordinator 
remove them from the group? Further, even when starting up the streams 
applications again (after the 1 day hiatus), the zombie members continue to be 
in the group metadata. The group continues to be in PendingRebalance state. 
Wouldn't the GroupCoordinator.onJoinComplete (which should eventually be 
triggered after the DelayedJoin expiration) ensure that zombie members which 
probably did not send any JoinGroup requests in the meantime are kicked out of 
the member list?

Theoretically speaking, what would it take for Kafka to reach this state that 
is as good as a consumer live lock?

> Regression in group coordinator dealing with flaky clients joining while 
> leaving
> 
>
> Key: KAFKA-10105
> URL: https://issues.apache.org/jira/browse/KAFKA-10105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: Kafka 2.4.1 on jre 11 on debian 9 in docker
>Reporter: William Reynolds
>Priority: Major
>
> Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals 
> correctly with a consumer sending a join after a leave correctly.
> What happens no is that if a consumer sends a leaving then follows up by 
> trying to send a join again as it is shutting down the group coordinator adds 
> the leaving member to the group but never seems to heartbeat that member.
> Since the consumer is then gone when it joins again after starting it is 
> added as a new member but the zombie member is there and is included in the 
> partition assignment which means that those partitions never get consumed 
> from. What can also happen is that one of the zombies gets group leader so 
> rebalance gets stuck forever and the group is entirely blocked.
> I have not been able to track down where this got introduced between 1.1.0 
> and 2.4.1 but I will look further into this. Unfortunately the logs are 
> essentially silent about the zombie mebers and I only had INFO level logging 
> on during the issue and by stopping all the consumers in the group and 
> restarting the broker coordinating that group we could get back to a working 
> state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9846) Race condition can lead to severe lag underestimate for active tasks

2020-06-23 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143158#comment-17143158
 ] 

Vinoth Chandar commented on KAFKA-9846:
---

Probably don't have time this week.. But if y'all can take a quick pass at the 
patch above, I can work on this next week.. does that work

> Race condition can lead to severe lag underestimate for active tasks
> 
>
> Key: KAFKA-9846
> URL: https://issues.apache.org/jira/browse/KAFKA-9846
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Sophie Blee-Goldman
>Assignee: Vinoth Chandar
>Priority: Critical
> Fix For: 2.6.0
>
>
> In KIP-535 we added the ability to query still-restoring and standby tasks. 
> To give users control over how out of date the data they fetch can be, we 
> added an API to KafkaStreams that fetches the end offsets for all changelog 
> partitions and computes the lag for each local state store.
> During this lag computation, we check whether an active task is in RESTORING 
> and calculate the actual lag if so. If not, we assume it's in RUNNING and 
> return a lag of zero. However, tasks may be in other states besides running 
> and restoring; notably they first pass through the CREATED state before 
> getting to RESTORING. A CREATED task may happen to be caught-up to the end 
> offset, but in many cases it is likely to be lagging or even completely 
> uninitialized.
> This introduces a race condition where users may be led to believe that a 
> task has zero lag and is "safe" to query even with the strictest correctness 
> guarantees, while the task is actually lagging by some unknown amount.  
> During transfer of ownership of the task between different threads on the 
> same machine, tasks can actually spend a while in CREATED while the new owner 
> waits to acquire the task directory lock. So, this race condition may not be 
> particularly rare in multi-threaded Streams applications



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10140) Incremental config api excludes plugin config changes

2020-06-23 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143153#comment-17143153
 ] 

Ismael Juma commented on KAFKA-10140:
-

I understand now. Yeah, I ran into this issue before when working with Maps 
instead of Configs.

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
> Fix For: 2.6.0
>
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> ```
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
>   at scala.collection.MapLike.default(MapLike.scala:235)
>   at scala.collection.MapLike.default$(MapLike.scala:234)
>   at scala.collection.AbstractMap.default(Map.scala:65)
>   at scala.collection.MapLike.apply(MapLike.scala:144)
>   at scala.collection.MapLike.apply$(MapLike.scala:143)
>   at scala.collection.AbstractMap.apply(Map.scala:65)
>   at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
>   at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
>   at 
> kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
>   at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
>   at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`. 
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10192) Flaky test BlockingConnectorTest#testBlockInConnectorStop

2020-06-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143135#comment-17143135
 ] 

Chris Egerton commented on KAFKA-10192:
---

I think what might be happening is that the worker is taking too long to 
initialize and the first request [to create a 
connector|https://github.com/apache/kafka/blob/14137def712a43b5bbbf02b067f2b1d4a12926b6/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java#L140]
 is timing out. This isn't a problem in other integration tests because the 
REST request timeout for Connect is 90 seconds, but that timeout is 
[artificially 
reduced|https://github.com/apache/kafka/blob/14137def712a43b5bbbf02b067f2b1d4a12926b6/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java#L66-L67]
 for these tests since some requests are supposed to time out and 
minute-and-a-half-long integration tests would be painful to deal with.

This is impossible to verify without either a reproduction or complete logs 
from the test run, though, and I can't reproduce locally.

> Flaky test BlockingConnectorTest#testBlockInConnectorStop
> -
>
> Key: KAFKA-10192
> URL: https://issues.apache.org/jira/browse/KAFKA-10192
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Chris Egerton
>Priority: Major
>
> h3. [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1218/]
> h3. Error Message
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"}
> h3. Stacktrace
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.putConnectorConfig(EmbeddedConnectCluster.java:346)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.configureConnector(EmbeddedConnectCluster.java:300)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.createConnectorWithBlock(BlockingConnectorTest.java:185)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop(BlockingConnectorTest.java:140)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest - call KafkaStreams#cleanU…

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#discussion_r444383211



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
##
@@ -130,6 +130,8 @@ public static void main(final String[] args) throws 
Exception {
 }
 });
 
+if (streamsProperties.containsKey("streams.cleanup")
+&& 
Boolean.parseBoolean(streamsProperties.getProperty("streams.cleanup"))) 
streams.cleanUp();

Review comment:
   One liners are the best  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-23 Thread GitBox


guozhangwang commented on pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#issuecomment-648298106


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest - call KafkaStreams#cleanU…

2020-06-23 Thread GitBox


chia7712 commented on a change in pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#discussion_r444374011



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
##
@@ -130,6 +130,8 @@ public static void main(final String[] args) throws 
Exception {
 }
 });
 
+if (streamsProperties.containsKey("streams.cleanup")
+&& 
Boolean.parseBoolean(streamsProperties.getProperty("streams.cleanup"))) 
streams.cleanUp();

Review comment:
   Copy that. one line change for this PR :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest - call KafkaStreams#cleanU…

2020-06-23 Thread GitBox


chia7712 commented on a change in pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#discussion_r444373556



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -303,7 +303,8 @@ public void computeTaskLags(final UUID uuid, final 
Map allTaskEndO
 public long lagFor(final TaskId task) {
 final Long totalLag = taskLagTotals.get(task);
 if (totalLag == null) {
-throw new IllegalStateException("Tried to lookup lag for unknown 
task " + task);
+throw new IllegalStateException("Tried to lookup lag for unknown 
task: " + task
++ " (This exception may be caused by that you don't call 
KafkaStreams#cleanUp when topology optimization is enabled)");

Review comment:
   you are right. I will revert it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10140) Incremental config api excludes plugin config changes

2020-06-23 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143112#comment-17143112
 ] 

Chia-Ping Tsai commented on KAFKA-10140:


alterConfigs does not support APPEND and SUBTRACT so it does not need to check 
the "LIST" type.

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
> Fix For: 2.6.0
>
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> ```
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
>   at scala.collection.MapLike.default(MapLike.scala:235)
>   at scala.collection.MapLike.default$(MapLike.scala:234)
>   at scala.collection.AbstractMap.default(Map.scala:65)
>   at scala.collection.MapLike.apply(MapLike.scala:144)
>   at scala.collection.MapLike.apply$(MapLike.scala:143)
>   at scala.collection.AbstractMap.apply(Map.scala:65)
>   at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
>   at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
>   at 
> kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
>   at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
>   at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`. 
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nizhikov opened a new pull request #8917: KAFKA-10180: Fix security_config caching.

2020-06-23 Thread GitBox


nizhikov opened a new pull request #8917:
URL: https://github.com/apache/kafka/pull/8917


   After 8b22b8159673bfe22d8ac5dcd4e4312d4f2c863c `security_config` properties 
are modified during `setup_node`: tls_version patched according to the node jdk 
version. But python `@property` decorator doesn't store property values and 
fresh instance of `SecurityConfig` created on each `kafka.security_config` call
   
   This patch caches `SecurityConfig` instances so correct tls_version will be 
used during the test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest - call KafkaStreams#cleanU…

2020-06-23 Thread GitBox


ableegoldman commented on a change in pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#discussion_r444368127



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -303,7 +303,8 @@ public void computeTaskLags(final UUID uuid, final 
Map allTaskEndO
 public long lagFor(final TaskId task) {
 final Long totalLag = taskLagTotals.get(task);
 if (totalLag == null) {
-throw new IllegalStateException("Tried to lookup lag for unknown 
task " + task);
+throw new IllegalStateException("Tried to lookup lag for unknown 
task: " + task
++ " (This exception may be caused by that you don't call 
KafkaStreams#cleanUp when topology optimization is enabled)");

Review comment:
   Can we drop this comment? While it's definitely true, it's possible this 
might be caused by other reasons and I wouldn't want someone to jump the gun 
and reset their application unnecessarily.
   If someone hits this, they'll probably open a file or send a question to the 
mailing list and then we can follow up with them to see if this might be the 
case

##
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
##
@@ -130,6 +130,8 @@ public static void main(final String[] args) throws 
Exception {
 }
 });
 
+if (streamsProperties.containsKey("streams.cleanup")
+&& 
Boolean.parseBoolean(streamsProperties.getProperty("streams.cleanup"))) 
streams.cleanUp();

Review comment:
   I think we can just call `cleanUp` here without passing in a flag. We 
only _need_ to do it the second time around, but it doesn't hurt to just call 
it every time and it keeps things simple (the system tests are already 
complicated enough )





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-23 Thread GitBox


d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-648288498


   @dajac @rajinisivaram Thanks for the second round of review. I've addressed 
your comments and adopted your suggestions. Please let me know if you have more 
thoughts on this PR. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10140) Incremental config api excludes plugin config changes

2020-06-23 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143108#comment-17143108
 ] 

Ismael Juma commented on KAFKA-10140:
-

We should check why alterConfigs handles this correctly and follow a similar 
approach if it makes sense.

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
> Fix For: 2.6.0
>
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> ```
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
>   at scala.collection.MapLike.default(MapLike.scala:235)
>   at scala.collection.MapLike.default$(MapLike.scala:234)
>   at scala.collection.AbstractMap.default(Map.scala:65)
>   at scala.collection.MapLike.apply(MapLike.scala:144)
>   at scala.collection.MapLike.apply$(MapLike.scala:143)
>   at scala.collection.AbstractMap.apply(Map.scala:65)
>   at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
>   at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
>   at 
> kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
>   at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
>   at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`. 
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest - call KafkaStreams#cleanUp before starting the application up the second time

2020-06-23 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143105#comment-17143105
 ] 

Sophie Blee-Goldman commented on KAFKA-10191:
-

Cool. Since this is failing pretty regularly, I agree that we should get the 
bare minimum fix in quickly and it's fine to follow up with the full "correct" 
fix next

> fix flaky StreamsOptimizedTest - call KafkaStreams#cleanUp before starting 
> the application up the second time
> -
>
> Key: KAFKA-10191
> URL: https://issues.apache.org/jira/browse/KAFKA-10191
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 
> 2_0Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at 
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
>  at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) 
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) 
> at java.util.TreeMap.put(TreeMap.java:552) at 
> java.util.TreeSet.add(TreeSet.java:255) at 
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at 
> java.util.TreeSet.addAll(TreeSet.java:312) at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762)
>  at 
> 

[jira] [Comment Edited] (KAFKA-10140) Incremental config api excludes plugin config changes

2020-06-23 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143094#comment-17143094
 ] 

Chia-Ping Tsai edited comment on KAFKA-10140 at 6/23/20, 4:49 PM:
--

The plugin component, like JmxReporter, does not offer ConfigKey.

In order to get completed checks for altering, the plugin component should use 
ConfigKey also. However, it brings a lot of changes of interface. As a 
workaround, we can cast the value to String and split it to list if it can't 
offer ConfigKey and the exception from casting can be wrapped to 
InvalidRequestException.


was (Author: chia7712):
The plugin component, like JmxReporter, does not offer ConfigKey.

In order to get completed checks for altering, the plugin component should use 
ConfigKey also. However, it brings a lot of changes of interface. As a 
workaround, we can cast the value to List if it can't offer ConfigKey 
and the exception from casting can be wrapped to InvalidRequestException.

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
> Fix For: 2.6.0
>
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> ```
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
>   at scala.collection.MapLike.default(MapLike.scala:235)
>   at scala.collection.MapLike.default$(MapLike.scala:234)
>   at scala.collection.AbstractMap.default(Map.scala:65)
>   at scala.collection.MapLike.apply(MapLike.scala:144)
>   at scala.collection.MapLike.apply$(MapLike.scala:143)
>   at scala.collection.AbstractMap.apply(Map.scala:65)
>   at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
>   at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
>   at 
> kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
>   at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
>   at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`. 
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10194) run the reset tool between stopping StreamsOptimizedTest and starting the new one

2020-06-23 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10194:

Labels: newbie  (was: )

> run the reset tool between stopping StreamsOptimizedTest and starting the new 
> one
> -
>
> Key: KAFKA-10194
> URL: https://issues.apache.org/jira/browse/KAFKA-10194
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: newbie
>
> inspired by [~ableegoldman]
> {quote}
> So, that's exactly what we should do in this test. We need to
> 1. call KafkaStreams#cleanup before starting the application up the second 
> time (ie when we turn optimizations on)
> 2. run the reset tool between stopping the original application and starting 
> the new one
> I think that 1. alone would technically be enough to stop this test from 
> failing, but we should really be doing both. 
> {quote}
> the comment.1 is addressed by KAFKA-10191 and this issue aims to address 
> another one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10194) run the reset tool between stopping StreamsOptimizedTest and starting the new one

2020-06-23 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10194:

Component/s: system tests
 streams

> run the reset tool between stopping StreamsOptimizedTest and starting the new 
> one
> -
>
> Key: KAFKA-10194
> URL: https://issues.apache.org/jira/browse/KAFKA-10194
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>
> inspired by [~ableegoldman]
> {quote}
> So, that's exactly what we should do in this test. We need to
> 1. call KafkaStreams#cleanup before starting the application up the second 
> time (ie when we turn optimizations on)
> 2. run the reset tool between stopping the original application and starting 
> the new one
> I think that 1. alone would technically be enough to stop this test from 
> failing, but we should really be doing both. 
> {quote}
> the comment.1 is addressed by KAFKA-10191 and this issue aims to address 
> another one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] skaundinya15 commented on pull request #8894: KAFKA-9509: increase consume timeout to fix flaky test

2020-06-23 Thread GitBox


skaundinya15 commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-648280522


   @showuon no more comments 
   
   > Hi @ryannedolan @skaundinya15 , do you have any other comments? I think we 
should merge this PR soon since the tests fails today's test again. Thanks.
   
   @showuon No more comments from my side, looks good.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 edited a comment on pull request #8894: KAFKA-9509: increase consume timeout to fix flaky test

2020-06-23 Thread GitBox


skaundinya15 edited a comment on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-648280522


   > Hi @ryannedolan @skaundinya15 , do you have any other comments? I think we 
should merge this PR soon since the tests fails today's test again. Thanks.
   
   @showuon No more comments from my side, looks good.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10140) Incremental config api excludes plugin config changes

2020-06-23 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143094#comment-17143094
 ] 

Chia-Ping Tsai commented on KAFKA-10140:


The plugin component, like JmxReporter, does not offer ConfigKey.

In order to get completed checks for altering, the plugin component should use 
ConfigKey also. However, it brings a lot of changes of interface. As a 
workaround, we can cast the value to List if it can't offer ConfigKey 
and the exception from casting can be wrapped to InvalidRequestException.

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
> Fix For: 2.6.0
>
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> ```
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
>   at scala.collection.MapLike.default(MapLike.scala:235)
>   at scala.collection.MapLike.default$(MapLike.scala:234)
>   at scala.collection.AbstractMap.default(Map.scala:65)
>   at scala.collection.MapLike.apply(MapLike.scala:144)
>   at scala.collection.MapLike.apply$(MapLike.scala:143)
>   at scala.collection.AbstractMap.apply(Map.scala:65)
>   at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
>   at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
>   at 
> kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
>   at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
>   at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`. 
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10192) Flaky test BlockingConnectorTest#testBlockInConnectorStop

2020-06-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143092#comment-17143092
 ] 

Chris Egerton commented on KAFKA-10192:
---

[~bchen225242] I have a hypothesis for why these tests are failing but I cannot 
reproduce locally and the Jenkins logs I've been able to find are incomplete 
(based on lines like "{{...[truncated 2495514 chars]..."}}). Is there a way to 
get complete logs for this test from Jenkins?

> Flaky test BlockingConnectorTest#testBlockInConnectorStop
> -
>
> Key: KAFKA-10192
> URL: https://issues.apache.org/jira/browse/KAFKA-10192
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>
> h3. [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1218/]
> h3. Error Message
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"}
> h3. Stacktrace
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.putConnectorConfig(EmbeddedConnectCluster.java:346)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.configureConnector(EmbeddedConnectCluster.java:300)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.createConnectorWithBlock(BlockingConnectorTest.java:185)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop(BlockingConnectorTest.java:140)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> 

[jira] [Assigned] (KAFKA-10192) Flaky test BlockingConnectorTest#testBlockInConnectorStop

2020-06-23 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-10192:
-

Assignee: Chris Egerton

> Flaky test BlockingConnectorTest#testBlockInConnectorStop
> -
>
> Key: KAFKA-10192
> URL: https://issues.apache.org/jira/browse/KAFKA-10192
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Chris Egerton
>Priority: Major
>
> h3. [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1218/]
> h3. Error Message
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"}
> h3. Stacktrace
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.putConnectorConfig(EmbeddedConnectCluster.java:346)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.configureConnector(EmbeddedConnectCluster.java:300)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.createConnectorWithBlock(BlockingConnectorTest.java:185)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop(BlockingConnectorTest.java:140)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
>  at 

[GitHub] [kafka] cadonna commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


cadonna commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-648271480


   @ableegoldman could you also have a look at this PR since I think I changed 
some code you are quite familiar with.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade

2020-06-23 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143075#comment-17143075
 ] 

John Roesler commented on KAFKA-10173:
--

Hi [~karsten.schnitter] ,

Thanks for your efforts. I'm also adding some additional system tests 
specifically for suppression in combination with an upgrade from 2.3.1 to 2.5.0.

Thanks for the confirmation about the context. I think logging both the 
serialization and deserialization path should provide a lot of clarity for now. 
The stacktrace above implies the record was written by the 2.5.0 application, 
so it will be interesting to see if it really prints out your serialization log 
messages.

I'll also add some "trace" level log messages to suppression in AK to help in 
future debugging efforts.

If you're able, I guess this should be sufficient for now:
 * Log the who record during restore
 * Log the data in InMemoryTimeOrderedKeyValueBuffer#logValue, both the Key and 
BufferValue before serialization and the byte array that we return from 
BufferValue#serialize

In response to your last question, certainly! You'd know best what data this 
application is producing. If the value for two different records could be 
exactly the same, then the identical priorValue may be expected.

What doesn't seem expected to me is that there would be a priorValue at all for 
the record that was at offset zero of the input. It makes me wonder if the 
application state is corrupted somehow, but I can't wrap my head around _how_.

I'll let you know how my testing efforts progress today.

-John

> BufferUnderflowException during Kafka Streams Upgrade
> -
>
> Key: KAFKA-10173
> URL: https://issues.apache.org/jira/browse/KAFKA-10173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Karsten Schnitter
>Assignee: John Roesler
>Priority: Major
>  Labels: suppress
> Fix For: 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I 
> followed the steps described in the upgrade guide and set the property 
> {{migrate.from=2.3}}. On my dev system with just one running instance I got 
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during 
> processing:
> java.nio.BufferUnderflowException: null
>   at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
>   at java.base/java.nio.ByteBuffer.get(Unknown Source)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
>   at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>   at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the 
> suppress feature. If I rename the changelog topics during the migration, the 
> problem will not occur. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-23 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-648259565


   Thanks. Triggering another round of system tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade

2020-06-23 Thread Karsten Schnitter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143066#comment-17143066
 ] 

Karsten Schnitter commented on KAFKA-10173:
---

Hi [~vvcephei],

the original message is lost due to retention. I will implement the additional 
logging and start from a clean slate once again. Today, the system was 
unavailable to me, but I hope to get to more testing tomorrow or the day after. 
If you have any further suggestions, were I can add logging statements to 
extract information, please let me know. So far I can verify, that the context 
looks legitimate. There are actually 60 partitions, so 57 is fine. Can the 
equal priors be explained by very similar data in both partitions? It is not 
unlikely, that both partitions may have been aggregated to very similar data.

Best Regards,
Karsten

> BufferUnderflowException during Kafka Streams Upgrade
> -
>
> Key: KAFKA-10173
> URL: https://issues.apache.org/jira/browse/KAFKA-10173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Karsten Schnitter
>Assignee: John Roesler
>Priority: Major
>  Labels: suppress
> Fix For: 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I 
> followed the steps described in the upgrade guide and set the property 
> {{migrate.from=2.3}}. On my dev system with just one running instance I got 
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during 
> processing:
> java.nio.BufferUnderflowException: null
>   at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
>   at java.base/java.nio.ByteBuffer.get(Unknown Source)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
>   at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>   at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the 
> suppress feature. If I rename the changelog topics during the migration, the 
> problem will not occur. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r444334835



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
+import org.apache.kafka.streams.kstream.internals.KTableSource;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import 
org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({InternalTopologyBuilder.class})
+public class TableSourceNodeTest {
+
+@Test
+public void 
shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() {

Review comment:
   Ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-23 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-648254987


   > Could you rebase your PR?
   
   done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-23 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-648254407


   @chia7712 : I think the client compatibility test failures are probably 
because you haven't rebased the PR. #8841 was fixed 6 days ago. Could you 
rebase your PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-23 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-648249808


   There were still lots of client compatibility related failures 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-22--001.1592885123--chia7712--fix_8334_avoid_deadlock--142a6c4c0/report.html
 . Not sure why since those tests were passing in 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-16--001.1592335112--mumrah--KAFKA-10123--63c1b14a4/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r444325329



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -120,7 +118,7 @@ public void logChange(final String storeName,
 throwUnsupportedOperationExceptionIfStandby("logChange");
 // Sending null headers to changelog topics (KIP-244)
 collector.send(
-storeToChangelogTopic.get(storeName),
+stateManager.changelogFor(storeName),

Review comment:
   I added an exception to make sure we do not changelog not registered 
state stores.
   I also used the registered metadata for the stores in the `send()` call. I 
felt that it would be cleaner.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r444321667



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -103,7 +102,6 @@ public ProcessorStateManager stateManager() {
 @Override
 public void register(final StateStore store,
  final StateRestoreCallback stateRestoreCallback) {
-storeToChangelogTopic.put(store.name(), 
ProcessorStateManager.storeChangelogTopic(applicationId(), store.name()));
 super.register(store, stateRestoreCallback);

Review comment:
   Ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r444321079



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,12 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final InternalProcessorContext internalProcessorContext = 
(InternalProcessorContext) context;
+final String storeName = name();
+final String changelogTopic = 
internalProcessorContext.changelogFor(storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+changelogTopic != null
+? changelogTopic : 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),

Review comment:
   Ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-23 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r444319623



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,12 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final InternalProcessorContext internalProcessorContext = 
(InternalProcessorContext) context;
+final String storeName = name();
+final String changelogTopic = 
internalProcessorContext.changelogFor(storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+changelogTopic != null

Review comment:
   It would be `null` for the case where logging is disabled. In this case 
we would still need the store serdes to read records from the store. We just 
pass in the default changelog name, so that one can still use a changelog name 
in the serde, e.g., as subject name for Confluent's schema registry. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10192) Flaky test BlockingConnectorTest#testBlockInConnectorStop

2020-06-23 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17143052#comment-17143052
 ] 

Boyang Chen commented on KAFKA-10192:
-

Failed again: 
[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1225/testReport/junit/org.apache.kafka.connect.integration/BlockingConnectorTest/testBlockInConnectorStop/]

> Flaky test BlockingConnectorTest#testBlockInConnectorStop
> -
>
> Key: KAFKA-10192
> URL: https://issues.apache.org/jira/browse/KAFKA-10192
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>
> h3. [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1218/]
> h3. Error Message
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"}
> h3. Stacktrace
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> execute PUT request. Error response: \{"error_code":500,"message":"Request 
> timed out"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.putConnectorConfig(EmbeddedConnectCluster.java:346)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.configureConnector(EmbeddedConnectCluster.java:300)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.createConnectorWithBlock(BlockingConnectorTest.java:185)
>  at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop(BlockingConnectorTest.java:140)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
> 

[GitHub] [kafka] abbccdda commented on pull request #8845: KAFKA-10126:Add a warning message for ConsumerPerformance

2020-06-23 Thread GitBox


abbccdda commented on pull request #8845:
URL: https://github.com/apache/kafka/pull/8845#issuecomment-648242282


   Also failed: 
   ```
   kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs
   
   org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
within timeout after replica movement. Producer future 
Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
1 ms.))
at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
at org.scalatest.Assertions.fail(Assertions.scala:1091)
at org.scalatest.Assertions.fail$(Assertions.scala:1087)
at org.scalatest.Assertions$.fail(Assertions.scala:1389)
at 
kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs(PlaintextAdminIntegrationTest.scala:289)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:830)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >