[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140208#comment-17140208
 ] 

Matthias J. Sax commented on KAFKA-10179:
-

What you say is fair I guess. Given the current code, if you want to do any of 
those, you need to disable the optimization.

However, for the actual bug this ticket is about, the problem seems to be, that 
if the optimization is turned on, at some point in the code we pass the 
changelog topic name into the serde instead of the source topic name. And thus 
the schema cannot be found and the serde crashes. Thus, this ticket should 
focus on this bug.

Not sure if KAFKA-8037 covers all cases you describe. Maybe you want to follow 
up on this ticket (so we can extent its scope) or create a new ticket that 
describes the shortcomings of the current implementation.

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140203#comment-17140203
 ] 

Matthias J. Sax commented on KAFKA-9509:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7061/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/]
{quote}ava.lang.AssertionError: Connector MirrorCheckpointConnector tasks did 
not start in time on cluster: 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster@5376246b at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:120)
 at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationTest.java:191)
 at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.setup(MirrorConnectorsIntegrationTest.java:184){quote}

> Fix flaky test MirrorConnectorsIntegrationTest.testReplication
> --
>
> Key: KAFKA-9509
> URL: https://issues.apache.org/jira/browse/KAFKA-9509
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sanjana Kaundinya
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.5.0
>
>
> The test 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
>  is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of 
> when the connectors and tasks are started up. The fix for this would make it 
> such that when the connectors are started up, to wait until the REST endpoint 
> returns a positive number of tasks to be confident that we can start testing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state

2020-06-18 Thread GitBox


mjsax commented on pull request #8886:
URL: https://github.com/apache/kafka/pull/8886#issuecomment-646427756


   Java 8 passed.
   Java 11:
   ```
   
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9846) Race condition can lead to severe lag underestimate for active tasks

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140193#comment-17140193
 ] 

Sophie Blee-Goldman commented on KAFKA-9846:


Just realized I never replied [~vinoth]. To answer your question about how to 
get a task stuck in CREATED during a test, one way would be to start up an 
instance with two threads and have one of them hang indefinitely. It will drop 
out of the group and its task will be assigned to the other thread, but since 
the first thread hasn't released the task state directory lock, this task will 
be stuck in CREATED.

Anyways, just bringing this up since [~vvcephei] is setting up the 2.5.1 
release. It seems like we understand the problem and the fix is quite 
straightforward, can we get this patched for 2.5.1?

> Race condition can lead to severe lag underestimate for active tasks
> 
>
> Key: KAFKA-9846
> URL: https://issues.apache.org/jira/browse/KAFKA-9846
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Sophie Blee-Goldman
>Assignee: Vinoth Chandar
>Priority: Critical
> Fix For: 2.6.0
>
>
> In KIP-535 we added the ability to query still-restoring and standby tasks. 
> To give users control over how out of date the data they fetch can be, we 
> added an API to KafkaStreams that fetches the end offsets for all changelog 
> partitions and computes the lag for each local state store.
> During this lag computation, we check whether an active task is in RESTORING 
> and calculate the actual lag if so. If not, we assume it's in RUNNING and 
> return a lag of zero. However, tasks may be in other states besides running 
> and restoring; notably they first pass through the CREATED state before 
> getting to RESTORING. A CREATED task may happen to be caught-up to the end 
> offset, but in many cases it is likely to be lagging or even completely 
> uninitialized.
> This introduces a race condition where users may be led to believe that a 
> task has zero lag and is "safe" to query even with the strictest correctness 
> guarantees, while the task is actually lagging by some unknown amount.  
> During transfer of ownership of the task between different threads on the 
> same machine, tasks can actually spend a while in CREATED while the new owner 
> waits to acquire the task directory lock. So, this race condition may not be 
> particularly rare in multi-threaded Streams applications



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2020-06-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-7271:
---
Fix Version/s: (was: 2.6.0)
   2.7.0

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-18 Thread GitBox


ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r442619557



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -69,7 +68,6 @@
 private final ChangelogReader changelogReader;
 private final UUID processId;
 private final String logPrefix;
-private final StreamsMetricsImpl streamsMetrics;

Review comment:
   My IDEA pointed out that this was never used





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-18 Thread GitBox


ableegoldman commented on a change in pull request #8900:
URL: https://github.com/apache/kafka/pull/8900#discussion_r442619459



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -522,7 +522,7 @@ private void close(final boolean clean) {
 if (clean && commitNeeded) {
 log.debug("Tried to close clean but there was pending uncommitted 
data, this means we failed to"
   + " commit and should close as dirty instead");
-throw new StreamsException("Tried to close dirty task as clean");

Review comment:
   This was a sort-of bug: because we don't close things during 
`handleRevocation`, we want to make sure the TM will close this as dirty during 
`handleAssignment`. So we throw this just to force it to call `closeDirty` -- 
but it wasn't necessarily a fatal exception that caused commit to fail, so we 
should just throw TaskMigrated here.
   That said, it doesn't really matter since the ConsumerCoordinator will save 
and rethrow only the first exception, which is the `handleRevocation` 
exception. Anything we throw in `handleAssignment` is "lost" -- but we should 
do the right thing anyway





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji merged pull request #8850:
URL: https://github.com/apache/kafka/pull/8850


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140166#comment-17140166
 ] 

Rohan Desai commented on KAFKA-10179:
-

Also, it's not really clear from the documentation that 
`serialize(deserialize())` is assumed to be the identity function  for 
`ktable(..)`.

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9974) Flaky Test OptimizedKTableIntegrationTest#shouldApplyUpdatesToStandbyStore

2020-06-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140162#comment-17140162
 ] 

Luke Chen commented on KAFKA-9974:
--

I don't have thought on fixing the errors. See if anyone wants to investigate 
it. Thanks.

> Flaky Test OptimizedKTableIntegrationTest#shouldApplyUpdatesToStandbyStore
> --
>
> Key: KAFKA-9974
> URL: https://issues.apache.org/jira/browse/KAFKA-9974
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/321/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/]
> {quote}java.lang.AssertionError: Expected: is <0L> but: was <43L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore(OptimizedKTableIntegrationTest.java:138){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9974) Flaky Test OptimizedKTableIntegrationTest#shouldApplyUpdatesToStandbyStore

2020-06-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-9974:


Assignee: (was: Luke Chen)

> Flaky Test OptimizedKTableIntegrationTest#shouldApplyUpdatesToStandbyStore
> --
>
> Key: KAFKA-9974
> URL: https://issues.apache.org/jira/browse/KAFKA-9974
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/321/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/]
> {quote}java.lang.AssertionError: Expected: is <0L> but: was <43L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore(OptimizedKTableIntegrationTest.java:138){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140158#comment-17140158
 ] 

Rohan Desai edited comment on KAFKA-10179 at 6/19/20, 3:19 AM:
---

Deserialization may itself be a transformation. For example, suppose I have 
source data with 10 fields, but only care about 3 of them for my stream 
processing app. It seems that it would be reasonable to provide a deserializer 
that just extracts those 3 fields. I suppose you could express this as a 
projection after creating the table, but that does preclude optimizations that 
use selective deserialization. And it may be much more expensive to do the 
materialization (since you're potentially materializing lots of data 
unnecessarily). I think there should be some way to achieve each of the 
following: 

 
 * optimized and the data in the store is exactly the same as the topic data . 
In this case (what's implemented today) the data can be restored by writing the 
source records into the store
 * optimized and the deserializer transforms the data somehow. In this case the 
data can be restored by deserializing/serializing each row from the source 
topic before writing it into the store. I don't think this is possible today.
 * not optimized (w/ which you could have a transforming deserializer and 
faster recovery, at the cost of extra data in kafka). I don't think this is 
possible today without turning all optimizations off.

 

> This is a known issue and tracked via: 
>https://issues.apache.org/jira/browse/KAFKA-8037
  ack - thanks!


was (Author: desai.p.rohan):
Deserialization may itself be a transformation. For example, suppose I have 
source data with 10 fields, but only care about 3 of them for my stream 
processing app. It seems that it would be reasonable to provide a deserializer 
that just extracts those 3 fields. I suppose you could express this as a 
projection after creating the table, but that does preclude optimizations that 
use selective deserialization. And it may be much more expensive to do the 
materialization (since you're potentially materializing lots of data 
unnecessarily). I think there should be some way to achieve each of the 
following: 

 
 * optimized and the data in the store is exactly the same as the topic data . 
In this case (what's implemented today) the data can be restored by writing the 
source records into the store
 * optimized and the deserializer transforms the data somehow. In this case the 
data can be restored by deserializing/serializing each row from the source 
topic before writing it into the store. I don't think this is possible today.
 * not optimized (which would you have a transforming deserializer and faster 
recovery, at the cost of extra data in kafka). I don't think this is possible 
today without turning all optimizations off.

 

> This is a known issue and tracked via: 
>https://issues.apache.org/jira/browse/KAFKA-8037
 ack - thanks!

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140158#comment-17140158
 ] 

Rohan Desai commented on KAFKA-10179:
-

Deserialization may itself be a transformation. For example, suppose I have 
source data with 10 fields, but only care about 3 of them for my stream 
processing app. It seems that it would be reasonable to provide a deserializer 
that just extracts those 3 fields. I suppose you could express this as a 
projection after creating the table, but that does preclude optimizations that 
use selective deserialization. And it may be much more expensive to do the 
materialization (since you're potentially materializing lots of data 
unnecessarily). I think there should be some way to achieve each of the 
following: 

 
 * optimized and the data in the store is exactly the same as the topic data . 
In this case (what's implemented today) the data can be restored by writing the 
source records into the store
 * optimized and the deserializer transforms the data somehow. In this case the 
data can be restored by deserializing/serializing each row from the source 
topic before writing it into the store. I don't think this is possible today.
 * not optimized (which would you have a transforming deserializer and faster 
recovery, at the cost of extra data in kafka). I don't think this is possible 
today without turning all optimizations off.

 

> This is a known issue and tracked via: 
>https://issues.apache.org/jira/browse/KAFKA-8037
 ack - thanks!

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon edited a comment on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

2020-06-18 Thread GitBox


showuon edited a comment on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-646406717


   @ryannedolan @skaundinya15 , thanks for your good suggestion. Yes, what I 
did to add retries is the same as increasing the timeout value. 
   
   And I found the `waitForCondition` is not good in this case, because the 
`consume` method in `EmbeddedKafkaCluster.java` will not return the 
consumeRecords to let us increment the partial consumeRecords if it can't 
consume the expected records size in time. Instead, it'll throw exception 
directly. In other words, `comsume` method is already doing a 
`waitForCondition` job. 
   
   So, what we need to do is just to **increase the timeout value** to give the 
MM2 replication more time. It should make this test more stable. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

2020-06-18 Thread GitBox


showuon commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-646406717


   @ryannedolan @skaundinya15 , thanks for your good suggestion. Yes, what I 
did to add retries is the same as increasing the timeout value. And after some 
investigation, I found the `waitForCondition` is not good in this case because 
the `consume` method in `EmbeddedKafkaCluster.java` will not return the 
consumeRecords if the consumeRecords doesn't >= expected records, it'll throw 
exception directly. In other words, `comsume` method is already doing a 
`waitForCondition` job. 
   
   So, what we need to do is just to **increase the timeout value** to give the 
MM2 replication more time. It should make this test more stable. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-06-18 Thread harsha (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140138#comment-17140138
 ] 

harsha commented on KAFKA-7500:
---

Hi,

Does Mirror maker 2 support Kafka version 0.10.0.0 ?

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140089#comment-17140089
 ] 

Sophie Blee-Goldman commented on KAFKA-10184:
-

This is all pretty pathetic lol. Good 'ol Jenkins

Decreasing the message size might help though. I don't have any real evidence 
to back up the claim that startup might be occupying a large portion of this 
time, it just seems like good practice. Who knows what Jenkins is doing – maybe 
it's taking so long just to create topics?

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> 

[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140088#comment-17140088
 ] 

John Roesler commented on KAFKA-10184:
--

What in the world... How can we not have processed even 500 records in two 
minutes?

I agree waiting for start up first would probably help. Do we have any logs 
that could confirm the hypothesis that the startup phase is eating up a bunch 
of our timeout?

We should probably decrease the size of the records down from a whopping 1kB. 
My intent was to bridge the integration and system test worlds by creating 
“realistic” data here, but maybe that was expecting too much of the CIT 
infrastructure. 

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 

[jira] [Resolved] (KAFKA-10113) LogTruncationException sets fetch offsets incorrectly

2020-06-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10113.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

> LogTruncationException sets fetch offsets incorrectly
> -
>
> Key: KAFKA-10113
> URL: https://issues.apache.org/jira/browse/KAFKA-10113
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.6.0
>
>
> LogTruncationException, which extends OffsetOutOfRangeException, takes the 
> divergent offsets in the constructor. These are the first offsets known to 
> diverge from what the consumer read. These are then passed to the 
> OffsetOutOfRangeException incorrectly as the out of range fetch offsets. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10113) LogTruncationException sets fetch offsets incorrectly

2020-06-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10113:

Affects Version/s: 2.5.0
   2.4.1

> LogTruncationException sets fetch offsets incorrectly
> -
>
> Key: KAFKA-10113
> URL: https://issues.apache.org/jira/browse/KAFKA-10113
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.6.0
>
>
> LogTruncationException, which extends OffsetOutOfRangeException, takes the 
> divergent offsets in the constructor. These are the first offsets known to 
> diverge from what the consumer read. These are then passed to the 
> OffsetOutOfRangeException incorrectly as the out of range fetch offsets. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


hachikuji merged pull request #8822:
URL: https://github.com/apache/kafka/pull/8822


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140082#comment-17140082
 ] 

Matthias J. Sax commented on KAFKA-8266:


The other ticket is resolved, but I just saw another failure for this test: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7034/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/]
{quote}org.scalatest.exceptions.TestFailedException: The remaining consumers in 
the group could not fetch the expected records at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at 
org.scalatest.Assertions.fail(Assertions.scala:1091) at 
org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:329){quote}
\cc [~dajac]

> Improve 
> `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
> 
>
> Key: KAFKA-8266
> URL: https://issues.apache.org/jira/browse/KAFKA-8266
> Project: Kafka
>  Issue Type: Test
>Reporter: Jason Gustafson
>Priority: Major
>
> Some additional validation could be done after the member gets kicked out. 
> The main thing is showing that the group can continue to consume data and 
> commit offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-06-18 Thread Arun R (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arun R updated KAFKA-10186:
---
Comment: was deleted

(was: I would love to take a look if no one else is looking at it.)

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state

2020-06-18 Thread GitBox


mjsax commented on pull request #8886:
URL: https://github.com/apache/kafka/pull/8886#issuecomment-646376402


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state

2020-06-18 Thread GitBox


mjsax commented on pull request #8886:
URL: https://github.com/apache/kafka/pull/8886#issuecomment-646376357


   Java 8 passed.
   Java 11:
   ```
   
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

2020-06-18 Thread GitBox


mjsax commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r442574574



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
 Set lockedTaskDirectories() {
 return Collections.unmodifiableSet(lockedTaskDirectories);
 }
+
+public static void executeAndMaybeSwallow(final boolean clean,
+  final Runnable runnable,
+  final 
java.util.function.Consumer actionIfClean,
+  final 
java.util.function.Consumer actionIfNotClean) {

Review comment:
   I am wondering if adding this method to `TaskManager` is the best choice 
(cf https://issues.apache.org/jira/browse/KAFKA-10055). Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

2020-06-18 Thread GitBox


mjsax commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r442574113



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -741,29 +741,23 @@ void shutdown(final boolean clean) {
 
 for (final Task task : tasks.values()) {
 if (task.isActive()) {
-try {
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-} catch (final RuntimeException e) {
-if (clean) {
-firstException.compareAndSet(null, e);
-} else {
-log.warn("Ignoring an exception while closing task " + 
task.id() + " producer.", e);
-}
-}
+executeAndMaybeSwallow(
+clean,
+() -> 
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+e -> firstException.compareAndSet(null, e),
+e -> log.warn("Ignoring an exception while closing task " 
+ task.id() + " producer.", e)
+);
 }
 }
 
 tasks.clear();
 
-try {
-activeTaskCreator.closeThreadProducerIfNeeded();
-} catch (final RuntimeException e) {
-if (clean) {
-firstException.compareAndSet(null, e);
-} else {
-log.warn("Ignoring an exception while closing thread 
producer.", e);
-}
-}
+executeAndMaybeSwallow(
+clean,
+() -> activeTaskCreator.closeThreadProducerIfNeeded(),

Review comment:
   This is a lambda. Do you mean method reference? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140075#comment-17140075
 ] 

Matthias J. Sax commented on KAFKA-10179:
-

{quote}I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the 
store would always have its own changelog topic. And thus, the input-topic 
schema registered in the SR should not be "touched", and the write to the 
changelog topic should register a new scheme using the changelog topic name. 
Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic. To ensure this, we don't allow any processing to 
happen in between. The data would be deserialized and re-serialized using the 
same Serde (this is inefficiency we pay, as we also need to send the 
de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.
{quote}
This is a know issue and tracked via: 
https://issues.apache.org/jira/browse/KAFKA-8037

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140075#comment-17140075
 ] 

Matthias J. Sax edited comment on KAFKA-10179 at 6/19/20, 12:42 AM:


{quote}I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the 
store would always have its own changelog topic. And thus, the input-topic 
schema registered in the SR should not be "touched", and the write to the 
changelog topic should register a new scheme using the changelog topic name. 
Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic. To ensure this, we don't allow any processing to 
happen in between. The data would be deserialized and re-serialized using the 
same Serde (this is inefficiency we pay, as we also need to send the 
de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.
{quote}
This is a known issue and tracked via: 
https://issues.apache.org/jira/browse/KAFKA-8037


was (Author: mjsax):
{quote}I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the 
store would always have its own changelog topic. And thus, the input-topic 
schema registered in the SR should not be "touched", and the write to the 
changelog topic should register a new scheme using the changelog topic name. 
Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic. To ensure this, we don't allow any processing to 
happen in between. The data would be deserialized and re-serialized using the 
same Serde (this is inefficiency we pay, as we also need to send the 
de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.
{quote}
This is a know issue and tracked via: 
https://issues.apache.org/jira/browse/KAFKA-8037

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use 

[jira] [Commented] (KAFKA-10114) Kafka producer stuck after broker crash

2020-06-18 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140073#comment-17140073
 ] 

Jason Gustafson commented on KAFKA-10114:
-

[~ibinyami] Hmm, this is still not very clear to me.

> I think records were not timed out because the sequence that times them out 
> happens after you have a producer id. Network thread is stuck on 
> maybeWaitForProducerId(called from Sender.java:306) while the relevant 
> failBatch invocation is only called from sendProducerData() which is executed 
> after maybeWaitForProducerId (called from Sender.java:334)

The logic in `awaitNodeReady` is not blocked on batch completion, but request 
completion. We do not need `failBatch` in order to complete a pending request 
and free up room for additional requests. If you can reproduce this, it would 
be very helpful to see TRACE level logging from the producer.

> Kafka producer stuck after broker crash
> ---
>
> Key: KAFKA-10114
> URL: https://issues.apache.org/jira/browse/KAFKA-10114
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Itamar Benjamin
>Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 6 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 
> sleeping [0x7ff55c177000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
> at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
> at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
> - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for 
> all nodes a new request to check broker availability and reconnect to it 
> cannot be submitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646369171







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646368944


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-06-18 Thread Arun R (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140069#comment-17140069
 ] 

Arun R commented on KAFKA-10186:


I would love to take a look if no one else is looking at it.

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

2020-06-18 Thread GitBox


mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-646367241


   @sneakyburro Any update on this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8871: MINOR: code cleanup for inconsistent naming

2020-06-18 Thread GitBox


mjsax commented on pull request #8871:
URL: https://github.com/apache/kafka/pull/8871#issuecomment-646365697


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


mumrah commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646361035


   Sounds good to me



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on pull request #8898: KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests

2020-06-18 Thread GitBox


vinothchandar commented on pull request #8898:
URL: https://github.com/apache/kafka/pull/8898#issuecomment-646360446


   @cmccabe Please take a pass when you can.. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-18 Thread GitBox


ableegoldman opened a new pull request #8900:
URL: https://github.com/apache/kafka/pull/8900


   If there's any pending data and we haven't flushed the producer when we 
abort a transaction, a KafkaException is returned for the previous `send`. This 
is a bit misleading, since the situation is not an unrecoverable error and so 
the Kafka Exception is really non-fatal. For now, we should just catch and 
swallow this in the RecordCollector (see also: 
[KAFKA-10169](https://issues.apache.org/jira/browse/KAFKA-10186))
   
   The reason we ended up aborting an un-flushed transaction was due to the 
combination of
   a. always aborting the ongoing transaction when any task is closed/revoked
   b. only committing (and flushing) if at least one of the revoked tasks needs 
to be committed
   
   Given the above, we can end up with an ongoing transaction that isn't 
committed since none of the revoked tasks have any data in the transaction. We 
then abort the transaction anyway, when those tasks are closed. So in addition 
to the above (swallowing this exception), we should avoid unnecessarily 
aborting data for tasks that haven't been revoked.
   
   We can handle this by splitting the RecordCollector's `close` into a dirty 
and clean flavor: if dirty, we need to abort the transaction since it may be 
dirty due to the commit attempt failing. But if clean, we can skip aborting the 
transaction since we know that either we just committed and thus there is no 
ongoing transaction to abort, or else the transaction in flight contains no 
data from the tasks being closed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-10167.
---
Resolution: Fixed

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0
>
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


hachikuji commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646348733


   @mumrah Hmm, I think I like the current approach of discarding the response 
if we're no longer in the same state in which the fetch state was sent. Mainly 
because it's simple. Arguably we could do something more refined. For example, 
a topic authorization error is still going to be relevant even if the partition 
is being reset. However, since we're talking about rare cases, it doesn't seem 
too worthwhile to try and optimize; worst case, we'll send the request again 
and get the same error.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #8899: MINOR: Gate test coverage plugin behind Gradle property

2020-06-18 Thread GitBox


ijuma opened a new pull request #8899:
URL: https://github.com/apache/kafka/pull/8899


   Most builds don't require test coverage output, so it's wasteful
   to spend cycles tracking coverage information for each method
   invoked.
   
   I ran a quick test in a fast desktop machine, the absolute
   difference will be larger in a slower machine. The tests were
   executed after `./gradlew clean` and with a gradlew daemon
   that was started just before the test (and mildly warmed up
   with `./gradlew clean` again).
   
   `./gradlew unitTest --continue --profile`:
   * With coverage enabled: 6m32s
   * With coverage disabled: 5m47s
   
   I ran the same test twice and the results were within 2s of
   each other, so reasonably consistent.
   
   16% reduction in the time taken to run the unit tests is a
   reasonable gain with little downside, so I think this is a
   good change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


mumrah commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646338829


   @hachikuji yea that's the check I was referring to (where we disregard the 
fetch response, errors included). Do you think any of the errors we handle 
besides OOOR are worth handling in the case that we're no longer in the 
FETCHING state? Like maybe one of the errors that triggers a metadata update?
   
   However, that might be adding complexity for little gain. I'm fine with it 
either way.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-06-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10186:

Labels: needs-kip newbie newbie++  (was: needs-kip)

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


cmccabe commented on a change in pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#discussion_r442535630



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -647,8 +647,14 @@ class KafkaController(val config: KafkaConfig,
 info(s"Skipping reassignment of $tp since the topic is currently being 
deleted")
 new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does 
not exist.")
   } else {
-val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
-if (assignedReplicas.nonEmpty) {
+val assignment = controllerContext.partitionFullReplicaAssignment(tp)
+if (assignment == ReplicaAssignment.empty) {
+  new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does 
not exist.")
+} else if (assignment == reassignment) {

Review comment:
   It seems like the thing to agree on is the final state, right?  This 
comparison is taking into account the current replica set which may change over 
the course of the reassignment...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10169) KafkaException: Failing batch since transaction was aborted

2020-06-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10169:
---

Assignee: Sophie Blee-Goldman

> KafkaException: Failing batch since transaction was aborted
> ---
>
> Key: KAFKA-10169
> URL: https://issues.apache.org/jira/browse/KAFKA-10169
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> We've seen the following exception in our eos-beta test application recently:
> {code:java}
> [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-25-changelog for task 
> 1_2 due to: [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> Exception handler choose to FAIL the processing, no more records would be 
> sent. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1347)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>  at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> Caused by: org.apache.kafka.common.KafkaException: Failing batch since 
> transaction was aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  ... 3 more
> {code}
> Somewhat unclear if this is an issue with eos-beta specifically, or just eos 
> in general. But several threads have died over the course of a few days in 
> the eos-beta application, while none so far have died on the eos-alpha 
> application.
> It's also unclear (at least to me) whether this is definitely an issue in 
> Streams or possibly a bug in the producer (or even the broker, although that 
> seems unlikely)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-06-18 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10186:
---

 Summary: Aborting transaction with pending data should throw 
non-fatal exception
 Key: KAFKA-10186
 URL: https://issues.apache.org/jira/browse/KAFKA-10186
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: Sophie Blee-Goldman


Currently if you try to abort a transaction with any pending (non-flushed) 
data, the send exception is set to
{code:java}
 KafkaException("Failing batch since transaction was aborted"){code}
This exception type is generally considered fatal, but this is a valid state to 
be in -- the point of throwing the exception is to alert that the records will 
not be sent, not that you are in an unrecoverable error state.

We should throw a different (possibly new) type of exception here to 
distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

2020-06-18 Thread GitBox


vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442531300



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
##
@@ -223,6 +227,7 @@ public void 
shouldInitializeChangelogAndCheckForCompletion() {
 @Test
 public void shouldPollWithRightTimeout() {
 
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 
5L));

Review comment:
   This is moderately obnoxious... The addition of logging these values 
means that these tests will get a NullPointerException unless we mock this 
call, but the mock is irrelevant to the test outcome.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


cmccabe commented on a change in pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#discussion_r442533488



##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -1685,10 +1702,9 @@ object ReassignPartitionsCommand extends Logging {
   opts.cancelOpt -> collection.immutable.Seq(
 opts.reassignmentJsonFileOpt
   ),
-  opts.listOpt -> collection.immutable.Seq(
-  )
+  opts.listOpt -> collection.immutable.Seq.empty

Review comment:
   seems to break the symmetry a bit, doesn't it?  Although, I don't feel 
strongly about this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


cmccabe commented on pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#issuecomment-646334265


   The refactors to the tool generally look good.  I thought we always allowed 
throttles to be set to 0 for some reason, though?  Looks like this change 
removes that ability which we probably don't want... although I doubt many 
people are using it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

2020-06-18 Thread GitBox


vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442529975



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -496,8 +539,9 @@ private void bufferChangelogRecords(final ChangelogMetadata 
changelogMetadata, f
 } else {
 changelogMetadata.bufferedRecords.add(record);
 final long offset = record.offset();
-if (changelogMetadata.restoreEndOffset == null || offset < 
changelogMetadata.restoreEndOffset)
+if (changelogMetadata.restoreEndOffset == null || offset < 
changelogMetadata.restoreEndOffset) {
 changelogMetadata.bufferedLimitIndex = 
changelogMetadata.bufferedRecords.size();
+}

Review comment:
   I've rolled back a bunch of accidental formatting changes, but left the 
ones that are actually code style compliance issues (like using brackets around 
conditional bodies).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar opened a new pull request #8898: KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests

2020-06-18 Thread GitBox


vinothchandar opened a new pull request #8898:
URL: https://github.com/apache/kafka/pull/8898


   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-17--001.1592453352--vinothchandar--KC342-ducktape--e64cc463b/report.html
   
   Both ThrottlingTest and ReassignPartitionsTest, which invokes these methods 
pass locally and twice in CI. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments

2020-06-18 Thread GitBox


dajac opened a new pull request #8897:
URL: https://github.com/apache/kafka/pull/8897


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8881: KIP-557: Add emit on change support to Kafka Streams

2020-06-18 Thread GitBox


ConcurrencyPractitioner commented on pull request #8881:
URL: https://github.com/apache/kafka/pull/8881#issuecomment-646304787


   @vvcephei Think you have time to look at this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-06-18 Thread GitBox


vvcephei commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r442497609



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -148,27 +150,35 @@
 private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
 private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
 
-private final List infos = asList(
-new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
-);
-
-private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, 
EMPTY_TASKS, EMPTY_TASKS);
+private final List partitionInfos = getPartitionInfos(3, 3);
+{
+partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new 
Node[0], new Node[0]));
+}
 
 private final Cluster metadata = new Cluster(
 "cluster",
 Collections.singletonList(Node.noNode()),
-infos,
+partitionInfos,
 emptySet(),
-emptySet());
+emptySet()
+);
+
+/* Used by the scale test for large apps/clusters */
+private static final int NUM_TOPICS_XL = 10;
+private static final int NUM_PARTITIONS_PER_TOPIC_XL = 1_000;
+private static final int NUM_CONSUMERS_XL = 100;
+private static final List TOPICS_LIST_XL = new ArrayList<>();
+private static final Map CHANGELOG_END_OFFSETS_XL = 
new HashMap<>();
+private static final List PARTITION_INFOS_XL = 
getPartitionInfos(NUM_TOPICS_XL, NUM_PARTITIONS_PER_TOPIC_XL);
+private static final Cluster CLUSTER_METADATA_XL = new Cluster(
+"cluster",
+Collections.singletonList(Node.noNode()),
+PARTITION_INFOS_XL,
+emptySet(),
+emptySet()
+);

Review comment:
   If there's a whole set of constants only used by one test, one might 
wonder whether that test shouldn't just be in its own class...

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -148,27 +150,35 @@
 private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
 private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
 
-private final List infos = asList(
-new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
-);
-
-private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, 
EMPTY_TASKS, EMPTY_TASKS);
+private final List partitionInfos = getPartitionInfos(3, 3);
+{
+partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new 
Node[0], new Node[0]));
+}

Review comment:
   Can you just pass this as an argument to `getPartitionInfos` so that we 
can do all the initialization in the assignment instead of needing an 
initialization block? The fact that this field is used in another field 
initialization statement makes the initialization block kind of questionable, 
since you have to read the JVM spec to know if this block executes before or 
after the usage.
   
   Alternatively, maybe the prior code was actually better, because you can see 
exactly what data you're testing with, instead of having to go read another 
method to understand what `getPartitioninfos(3, 3)` might mean.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub 

[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-06-18 Thread GitBox


vvcephei commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-646303241


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10160) Kafka MM2 consumer configuration

2020-06-18 Thread sats (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sats reassigned KAFKA-10160:


Assignee: sats

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-06-18 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139963#comment-17139963
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


LGTM thanks for looking into it and making sure that 2.5.1/2.6 should be safe 
to use

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.5.0, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, 
> state_store_operations.txt, tasks_assignment.txt
>
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp

[jira] [Commented] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-18 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139955#comment-17139955
 ] 

Boyang Chen commented on KAFKA-10185:
-

Could we add the context to this ticket?

> Streams should log summarized restoration information at info level
> ---
>
> Key: KAFKA-10185
> URL: https://issues.apache.org/jira/browse/KAFKA-10185
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

2020-06-18 Thread GitBox


dajac commented on pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#issuecomment-646271944


   @hachikuji I just rebased and fixed the build issue. Could you re-trigger 
jenkins please?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

2020-06-18 Thread GitBox


skaundinya15 commented on a change in pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#discussion_r442445728



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -207,23 +212,45 @@ public void close() {
 backup.stop();
 }
 
+// throw exception after 3 retries, and print expected error messages
+private void assertEqualsWithConsumeRetries(final String errorMsg,
+final int numRecordsProduces,
+final int timeout,
+final ClusterType clusterType,
+final String... topics) throws 
InterruptedException {
+int retries = 3;
+while (retries-- > 0) {
+try {
+int actualNum = clusterType == ClusterType.PRIMARY ?
+primary.kafka().consume(numRecordsProduces, timeout, 
topics).count() :
+backup.kafka().consume(numRecordsProduces, timeout, 
topics).count();
+if (numRecordsProduces == actualNum)
+return;
+} catch (Throwable e) {
+log.error("Could not find enough records with {} retries 
left", retries, e);
+}
+}
+throw new InterruptedException(errorMsg);
+}
+
 @Test
 public void testReplication() throws InterruptedException {
 MirrorClient primaryClient = new 
MirrorClient(mm2Config.clientConfig("primary"));
 MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig("backup"));
 
-assertEquals("Records were not produced to primary cluster.", 
NUM_RECORDS_PRODUCED,

Review comment:
   I'd agree with @ryannedolan here. We could use the `waitForCondition` in 
`TestUtils.java` instead to wait for the condition necessary instead. More 
details on that is here: 
https://github.com/apache/kafka/blob/d8cc6fe8e36329c647736773d9d66de89c447409/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L370-L371





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139926#comment-17139926
 ] 

Rohan Desai commented on KAFKA-10179:
-

I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

This has consequences in practice when used with a schema registry using the 
confluent serializers. If we use the same topic, `serialize` might register a 
different schema with the source subject, which we probably don't want.

I think the technically correct thing to do (though this is of course more 
expensive) would be (when the source table is optimized) to deserialize and 
serialize each record when restoring.

Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.

 

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139925#comment-17139925
 ] 

Sophie Blee-Goldman commented on KAFKA-10005:
-

Yeah, I didn't mean to imply we'd be synchronization-free with only standbys in 
a separate thread. But at least we'd only need to sync at rebalances, whereas 
with restoration in a separate thread we'll need to continually check for 
newly-restored tasks that should be taken over by the main thread. Anyways I 
agree, a POC and maybe some benchmarking should have the last word

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139533#comment-17139533
 ] 

Guozhang Wang edited comment on KAFKA-10005 at 6/18/20, 6:58 PM:
-

So just to have a quick summary, my proposal is primarily in three folds:

1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to 
add batch of records as SST files directly, this is to replace the impact of 
bulk loading.
2) move the restoration off the stream thread to a different thread (pool), for 
both restoring active tasks as well as updating standby tasks.
3) if needed, we also disable compaction during the restoration, and do a 
one-phase full compaction when we complete. I'm keeping it as "optional" for 
now since disabling compaction has both pros and cons, and if we have good 
performance from 1/2) alone then maybe we can afford to keep compaction enabled.

We already have an internal BulkLoadStore interface which e.g. RocksDBStore 
extends, we can leverage that interface to "toggle" restoration mode for 1) and 
3) above.

cc [~cadonna]


was (Author: guozhang):
So just to have a quick summary, my proposal is primarily in three folds:

1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to 
add batch of records as SST files directly, this is to replace the impact of 
bulk loading.
2) move the restoration off the stream thread to a different thread (pool), for 
both restoring active tasks as well as updating standby tasks.
3) if needed, we also disable compaction during the restoration, and do a 
one-phase full compaction when we complete.

We already have an internal BulkLoadStore interface which e.g. RocksDBStore 
extends, we can leverage that interface to "toggle" restoration mode for 1) and 
3) above.

cc [~cadonna]

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139919#comment-17139919
 ] 

Guozhang Wang commented on KAFKA-10005:
---

I think even if we only move standbys to separate threads, there are still 
synchronization required for IQ and rebalance needs, but I think we need to get 
to a POC to have a clear idea how much thread synchronization overhead would be 
incurred. We can discuss about this further if we see the synchronization 
overhead with just standby and standby + active is very different.

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139916#comment-17139916
 ] 

Sophie Blee-Goldman commented on KAFKA-10005:
-

I've been thinking about this lately and I'm not quite convinced we should move 
restoration to a separate thread as well (only standbys). With KIP-441, the 
majority of restoration will actually be done as a standby task. Only the last 
(hopefully-trivial) tail end of the changelog will be restored as an active 
task. Is that worth the overhead of thread synchronization to hand off tasks 
between the restore thread(s) and the main one? I'm not sure

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8876: KAFKA-10167: use the admin client to read end-offset

2020-06-18 Thread GitBox


guozhangwang commented on pull request #8876:
URL: https://github.com/apache/kafka/pull/8876#issuecomment-646234217


   Cherry-pick to 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8876: KAFKA-10167: use the admin client to read end-offset

2020-06-18 Thread GitBox


guozhangwang merged pull request #8876:
URL: https://github.com/apache/kafka/pull/8876


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-646217062


   Thanks, @dajac for the comments. I've modified the PR per your suggestions. 
   @rajinisivaram Do you think we can start testing?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

2020-06-18 Thread GitBox


vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442397523



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -415,19 +418,20 @@ public void restore() {
 // for restoring active and updating standby we may prefer 
different poll time
 // in order to make sure we call the main consumer#poll in 
time.
 // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
-polledRecords = 
restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? 
Duration.ZERO : pollTime);
+polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);

Review comment:
   trivial cleanup

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -415,19 +418,20 @@ public void restore() {
 // for restoring active and updating standby we may prefer 
different poll time
 // in order to make sure we call the main consumer#poll in 
time.
 // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
-polledRecords = 
restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? 
Duration.ZERO : pollTime);
+polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
 } catch (final InvalidOffsetException e) {
-log.warn("Encountered {} fetching records from restore 
consumer for partitions {}, it is likely that " +
+log.warn("Encountered " + e.getClass().getName() +
+" fetching records from restore consumer for partitions " 
+ e.partitions() + ", it is likely that " +
 "the consumer's position has fallen out of the topic 
partition offset range because the topic was " +
 "truncated or compacted on the broker, marking the 
corresponding tasks as corrupted and re-initializing" +
-" it later.", e.getClass().getName(), e.partitions());
+" it later.", e);

Review comment:
   Added the exception itself as the "cause" of the warning. The actual 
message of the IOE is actually pretty good at explaining the root cause.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -446,6 +450,38 @@ public void restore() {
 }
 
 maybeUpdateLimitOffsetsForStandbyChangelogs();
+
+maybeLogRestorationProgress();

Review comment:
   This is the main change. Once every ten seconds, we will log the 
progress for each active restoring changelog.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -415,19 +418,20 @@ public void restore() {
 // for restoring active and updating standby we may prefer 
different poll time
 // in order to make sure we call the main consumer#poll in 
time.
 // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
-polledRecords = 
restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? 
Duration.ZERO : pollTime);
+polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
 } catch (final InvalidOffsetException e) {
-log.warn("Encountered {} fetching records from restore 
consumer for partitions {}, it is likely that " +
+log.warn("Encountered " + e.getClass().getName() +
+" fetching records from restore consumer for partitions " 
+ e.partitions() + ", it is likely that " +
 "the consumer's position has fallen out of the topic 
partition offset range because the topic was " +
 "truncated or compacted on the broker, marking the 
corresponding tasks as corrupted and re-initializing" +
-" it later.", e.getClass().getName(), e.partitions());
+" it later.", e);
 
 final Map> 
taskWithCorruptedChangelogs = new HashMap<>();
 for (final TopicPartition partition : e.partitions()) {
 final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
 taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> 
new HashSet<>()).add(partition);
 }
-throw new TaskCorruptedException(taskWithCorruptedChangelogs);
+throw new TaskCorruptedException(taskWithCorruptedChangelogs, 
e);

Review comment:
  

[GitHub] [kafka] hachikuji commented on pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


hachikuji commented on pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#issuecomment-646209534


   Had some discussion offline with @cmccabe . The intention in KIP-455 is to 
use --additional to resubmit the reassignment and change the quota. I found 
that this did not work as expected when I tried it, so let me try to modify 
this patch so that the new integration tests use this behavior. Possibly we 
don't need --alter-throttle, but the tests and improved documentation would 
still be useful.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442394058



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;
+this.ratio = ratio;
+this.jitter = jitter;
+this.expMax = termMax > scaleFactor ?
+Math.log(termMax / (double) Math.max(scaleFactor, 1)) / 
Math.log(ratio) : 0;
+}
+
+public long term(long n) {

Review comment:
   As we noticed in your earlier comments, the same value of `attempts` may 
correspond to different terms.
   connection_timeout = constant * 2 ^ (attempts)
   reconnect_backoff = constant * 2 ^ (attempts - 1)
   (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) 
   So I think using `retries` or `attempts` instead of `n` might also confuse 
people. Shall we think of another naming? 

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;
+this.ratio = ratio;
+this.jitter = jitter;
+this.expMax = termMax > scaleFactor ?
+Math.log(termMax / (double) Math.max(scaleFactor, 1)) / 
Math.log(ratio) : 0;
+}
+
+public long term(long n) {

Review comment:
   As we noticed in your earlier comments, the same value of `attempts` may 
correspond to different terms.
   
   connection_timeout = constant * 2 ^ (attempts)
   reconnect_backoff = constant * 2 ^ (attempts - 1)
   (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) 
   
   So I think using `retries` or `attempts` instead of `n` might also confuse 
people. Shall we think of another naming? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact 

[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442389711



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
   Good idea. Will go for `ExponentialBackoff`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8896: KAFKA-10185: Restoration info logging

2020-06-18 Thread GitBox


vvcephei opened a new pull request #8896:
URL: https://github.com/apache/kafka/pull/8896


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442387660



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+@Test
+public void testGeometricProgression() {
+long scaleFactor = 100;
+int ratio = 2;
+long termMax = 2000;
+double jitter = 0.2;
+GeometricProgression geometricProgression = new GeometricProgression(
+scaleFactor, ratio, termMax, jitter
+);
+
+for (int i = 0; i <= 100; i++) {
+for (int n = 0; n <= 4; n++) {
+assertEquals(scaleFactor * Math.pow(ratio, n), 
geometricProgression.term(n),
+scaleFactor * Math.pow(ratio, n) * jitter);
+}
+System.out.println(geometricProgression.term(5));

Review comment:
   Oh, right. I missed removing it.

##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+@Test
+public void testGeometricProgression() {
+long scaleFactor = 100;
+int ratio = 2;
+long termMax = 2000;
+double jitter = 0.2;
+GeometricProgression geometricProgression = new GeometricProgression(
+scaleFactor, ratio, termMax, jitter
+);
+
+for (int i = 0; i <= 100; i++) {
+for (int n = 0; n <= 4; n++) {
+assertEquals(scaleFactor * Math.pow(ratio, n), 
geometricProgression.term(n),
+scaleFactor * Math.pow(ratio, n) * jitter);
+}
+System.out.println(geometricProgression.term(5));

Review comment:
   Oh, right. I forgot removing it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10185:


 Summary: Streams should log summarized restoration information at 
info level
 Key: KAFKA-10185
 URL: https://issues.apache.org/jira/browse/KAFKA-10185
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442382471



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##
@@ -149,6 +155,16 @@
 atLeast(0),
 Importance.MEDIUM,
 REQUEST_TIMEOUT_MS_DOC)
+
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+Type.LONG,
+10 * 1000,
+Importance.MEDIUM,
+
CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+Type.LONG,
+127 * 1000,

Review comment:
   Sounds good. WIll refactor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


cmccabe commented on a change in pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#discussion_r442378048



##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -930,6 +935,38 @@ object ReassignPartitionsCommand extends Logging {
 (brokerListToReassign, topicsToReassign)
   }
 
+  /**
+   * The entry point for --alter-throttles. At least one throttle value must 
be provided.
+   *
+   * @param admin The Admin instance to use
+   * @param interBrokerThrottle The new inter-broker throttle or -1 to leave 
it unchanged
+   * @param logDirThrottle The new alter-log-dir throttle or -1 to leave it 
unchanged
+   */
+  def alterThrottles(admin: Admin,
+ interBrokerThrottle: Long,
+ logDirThrottle: Long): Unit = {
+if (interBrokerThrottle < 0 && logDirThrottle < 0) {
+  throw new TerseReassignmentFailureException("No valid throttle values 
provided to --alter-throttle")

Review comment:
   It would be good to include the flags needed to pass in a throttle in 
this message





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442375424



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -786,6 +808,29 @@ private void handleAbortedSends(List 
responses) {
 abortedSends.clear();
 }
 
+/**
+ * Handle socket channel connection timeout. The timeout will hit iff a 
connection
+ * stays at the ConnectionState.CONNECTING state longer than the timeout 
value,
+ * as indicated by ClusterConnectionStates.NodeConnectionState.
+ *
+ * @param responses The list of responses to update
+ * @param now The current time
+ */
+private void handleTimeoutConnections(List responses, long 
now) {
+Set connectingNodes = connectionStates.connectingNodes();
+for (String nodeId: connectingNodes) {

Review comment:
   Refactored





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442372873



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,12 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
built. If the connection is not built before the timeout elapses the network 
client will close the socket channel. The default value will be 10 seconds.";
+
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = 
"socket.connection.setup.timeout.max.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
"The maximum amount of time the client will wait for the initial socket 
connection to be built. The connection setup timeout will increase 
exponentially for each consecutive connection failure up to this maximum. To 
avoid connection storms, a randomization factor of 0.2 will be applied to the 
backoff resulting in a random range between 20% below and 20% above the 
computed value. The default value will be 127 seconds.";

Review comment:
   Refactored





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

2020-06-18 Thread GitBox


hachikuji commented on pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#issuecomment-646173735







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


hachikuji commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646168538


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139548#comment-17139548
 ] 

Sophie Blee-Goldman commented on KAFKA-10184:
-

Or instead (or in addition to the above) maybe we should wait for the streams 
to be in RUNNING before we start the timeout for writing records

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> 

[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139546#comment-17139546
 ] 

Sophie Blee-Goldman commented on KAFKA-10184:
-

[~vvcephei] Could we maybe do something like "write as many records as you can 
in the 12ms timeout"? Since as you said the whole point is just to make 
sure we have enough records to represent a "reasonably large" number, if it 
takes 2 minutes to write only 100 records then those 100 records represent a 
heavy load (apparently...)

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  

[GitHub] [kafka] edoardocomar commented on pull request #4204: KAFKA-5238: BrokerTopicMetrics can be recreated after topic is deleted

2020-06-18 Thread GitBox


edoardocomar commented on pull request #4204:
URL: https://github.com/apache/kafka/pull/4204#issuecomment-646162106


   After @hachikuji fixes in https://github.com/apache/kafka/pull/8586 
   the metrics are no longer ticked at the end of a DelayedFetch, so the time 
window for topic deletion is almost non existent and the only guard code needed 
is left in `KafkaApis`, as shown by the unit test added by this PR.
   This PR is now tiny and would be nice to have it merged :-)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139543#comment-17139543
 ] 

Sophie Blee-Goldman commented on KAFKA-10184:
-

Yeah, it's failing on the setup and hasn't even gotten to the real test at all 
cc/ [~vvcephei] seems like "500" was still too high :/

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> 

[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442363632



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) {
 } else if 
(connectionStates.isPreparingConnection(node.idString())) {
 foundConnecting = node;
 } else if (canConnect(node, now)) {
-foundCanConnect = node;
+if (foundCanConnect == null ||
+
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
+
this.connectionStates.lastConnectAttemptMs(node.idString())) {
+foundCanConnect = node;
+}

Review comment:
   Yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442362673



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,12 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
built. If the connection is not built before the timeout elapses the network 
client will close the socket channel. The default value will be 10 seconds.";

Review comment:
   Make sense. I'll change the description and remove the defaults in the 
doc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
(Line 145 & Line 157) and `disconnected`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
and `disconnected`. (Line 145 & Line 157)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
and `disconnected`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10184:
--
Issue Type: Test  (was: Bug)

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at 

[jira] [Updated] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10184:
--
Priority: Minor  (was: Major)

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at 

[jira] [Created] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10184:
-

 Summary: Flaky 
HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
 Key: KAFKA-10184
 URL: https://issues.apache.org/jira/browse/KAFKA-10184
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Guozhang Wang


{code}
Stacktrace
java.lang.AssertionError: Condition not met within timeout 12. Input 
records haven't all been written to the changelog: 442
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
at 
org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
at 
org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 

[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   Good catch. I'll make the logic record it in both `connecting` and 
`disconnected`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442354307



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;

Review comment:
   No. The caller will ensure that the node is in the connecting state. 
I'll add an IllegalStateException here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17139533#comment-17139533
 ] 

Guozhang Wang commented on KAFKA-10005:
---

So just to have a quick summary, my proposal is primarily in three folds:

1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to 
add batch of records as SST files directly, this is to replace the impact of 
bulk loading.
2) move the restoration off the stream thread to a different thread (pool), for 
both restoring active tasks as well as updating standby tasks.
3) if needed, we also disable compaction during the restoration, and do a 
one-phase full compaction when we complete.

We already have an internal BulkLoadStore interface which e.g. RocksDBStore 
extends, we can leverage that interface to "toggle" restoration mode for 1) and 
3) above.

cc [~cadonna]

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-06-18 Thread GitBox


ijuma commented on pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#issuecomment-646140719


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646126053


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646125880


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646125726


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >