[jira] [Commented] (KAFKA-9491) Fast election during reassignment can lead to replica fetcher failures

2020-02-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029595#comment-17029595
 ] 

ASF GitHub Bot commented on KAFKA-9491:
---

hachikuji commented on pull request #8037: KAFKA-9491; Increment high watermark 
after full log truncation
URL: https://github.com/apache/kafka/pull/8037
 
 
   When a follower's fetch offset is behind the leader's log start offset, the 
follower will do a full log truncation. When it does so, it must update both 
its log start offset and high watermark. Failure to do so can lead to out of 
range errors if the follower becomes leader before getting the latest high 
watermark from the previous leader. The out of range errors occur when we 
attempt to resolve the log position of the high watermark in `DelayedFetch` in 
order to determine if a fetch is satisfied.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fast election during reassignment can lead to replica fetcher failures
> --
>
> Key: KAFKA-9491
> URL: https://issues.apache.org/jira/browse/KAFKA-9491
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We have observed an unusual case in which a new replica became leader before 
> it had received an initial high watermark from the previous leader. This 
> resulted in an OffsetOutOfRangeException being raised while looking up the 
> segment position of the uninitialized high watermark, since it was lower than 
> the log start offset. The error was raised while handle the fetch request 
> from one of the followers and prevented it from making progress.
> {code}
> org.apache.kafka.common.errors.OffsetOutOfRangeException: Received request 
> for offset 0 for partition foo-0, but we only have log segments in the range 
> 20 to 20.
> {code}
> Here is what we have observed from the logs. The initial state of the 
> partition for the relevant sequence of events is the following:
> Initial state: replicas=[4,1,2,3], leader=1, isr=[1,2,3], adding=[4], 
> removing=[1], epoch=5, logStartOffset=20, logEndOffset=20
> We see the following events:
> t0: Replica 4 becomes follower and initializes log with hw=0, logStartOffset=0
> t1: Replica 4 begins fetching from offset 0 and receives an out of range error
> t2: After a ListOffset request to the leader, replica 4 initializes 
> logStartOffset to 20.
> t3: Replica 4 sends fetch request to the leader at start offset 20
> t4: Upon receiving the fetch request, the leader adds 4 to the ISR (i.e. 
> isr=[1,2,3,4])
> t5: The controller notices the ISR addition and makes 4 the leader since 1 is 
> to be removed and 4 is the new preferred leader
> t6: Replica 4 stops fetchers and becomes leader
> t7: We begin seeing the out of range errors as the other replicas begin 
> fetching from 4.
> We know from analysis of a heap dump from broker 4, that the high watermark 
> was still set to 0 some time after it had become leader. We also know that 
> broker 1 was under significant load. The time between events t4 and t6 was 
> less than 10ms. We don't know when the fetch response sent at t3 returned to 
> broker 4, but we speculate that it happened after t6 due to the heavy load on 
> the leader, which is why broker 4 had an uninitialized high watermark.
> A more mundane possibility is that there is a bug in the fetch session logic 
> and the partition was simply not included in the fetch response. However, the 
> code appears to anticipate this case. When a partition has an error, we set 
> the cached high watermark to -1 to ensure that it gets updated as soon as the 
> error clears.
> Regardless how we got there, the fix should be straightforward. When a broker 
> becomes leader, it should ensure its high watermark is at least as large as 
> the log start offset.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6607) Consumer Client and Kafka Streams lag not zero when input topic transactional

2020-02-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6607:
--

Assignee: Matthias J. Sax

> Consumer Client and Kafka Streams lag not zero when input topic transactional
> -
>
> Key: KAFKA-6607
> URL: https://issues.apache.org/jira/browse/KAFKA-6607
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> When an input topic for a Consumer or Kafka Streams application is written 
> using transaction, the client does not commit "endOffset" but "endOffset - 1" 
> (or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the 
> end of topic. The reason is the commit marker that is the last "message" in 
> the topic; Streams commit "offset of last processed message plus 1" and does 
> not take commit markers into account.
> This is not a correctness issue, but when one inspect the consumer lag via 
> {{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is 
> correct from consumer-group tool point of view.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9500) Foreign-Key Join creates an invalid topology

2020-02-03 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9500:
---
Fix Version/s: 2.5.0

> Foreign-Key Join creates an invalid topology
> 
>
> Key: KAFKA-9500
> URL: https://issues.apache.org/jira/browse/KAFKA-9500
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.5.0, 2.4.1
>
>
> Foreign-Key Join results are not required to be materialized by default, but 
> they might be needed if downstream operators need to perform lookups on the 
> result (such as when the join result participates in an equi-join).
> Currently, if the result is explicitly materialized (via Materialized), this 
> works correctly, but if the result is _not_ materialized explicitly, but _is_ 
> needed, the topology builder throws an exception that the result store isn't 
> added to the topology. This was an oversight in testing and review and needs 
> to be fixed ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9500) Foreign-Key Join creates an invalid topology

2020-02-03 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9500:
---
Fix Version/s: (was: 2.4.1, 2.5.0)

> Foreign-Key Join creates an invalid topology
> 
>
> Key: KAFKA-9500
> URL: https://issues.apache.org/jira/browse/KAFKA-9500
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>
> Foreign-Key Join results are not required to be materialized by default, but 
> they might be needed if downstream operators need to perform lookups on the 
> result (such as when the join result participates in an equi-join).
> Currently, if the result is explicitly materialized (via Materialized), this 
> works correctly, but if the result is _not_ materialized explicitly, but _is_ 
> needed, the topology builder throws an exception that the result store isn't 
> added to the topology. This was an oversight in testing and review and needs 
> to be fixed ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9500) Foreign-Key Join creates an invalid topology

2020-02-03 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9500:
---
Fix Version/s: 2.4.1

> Foreign-Key Join creates an invalid topology
> 
>
> Key: KAFKA-9500
> URL: https://issues.apache.org/jira/browse/KAFKA-9500
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.4.1
>
>
> Foreign-Key Join results are not required to be materialized by default, but 
> they might be needed if downstream operators need to perform lookups on the 
> result (such as when the join result participates in an equi-join).
> Currently, if the result is explicitly materialized (via Materialized), this 
> works correctly, but if the result is _not_ materialized explicitly, but _is_ 
> needed, the topology builder throws an exception that the result store isn't 
> added to the topology. This was an oversight in testing and review and needs 
> to be fixed ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9500) Foreign-Key Join creates an invalid topology

2020-02-03 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9500:
---
Priority: Blocker  (was: Critical)

> Foreign-Key Join creates an invalid topology
> 
>
> Key: KAFKA-9500
> URL: https://issues.apache.org/jira/browse/KAFKA-9500
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.4.1, 2.5.0
>
>
> Foreign-Key Join results are not required to be materialized by default, but 
> they might be needed if downstream operators need to perform lookups on the 
> result (such as when the join result participates in an equi-join).
> Currently, if the result is explicitly materialized (via Materialized), this 
> works correctly, but if the result is _not_ materialized explicitly, but _is_ 
> needed, the topology builder throws an exception that the result store isn't 
> added to the topology. This was an oversight in testing and review and needs 
> to be fixed ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9500) Foreign-Key Join creates an invalid topology

2020-02-03 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9500:
---
Fix Version/s: 2.4.1, 2.5.0

> Foreign-Key Join creates an invalid topology
> 
>
> Key: KAFKA-9500
> URL: https://issues.apache.org/jira/browse/KAFKA-9500
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.4.1, 2.5.0
>
>
> Foreign-Key Join results are not required to be materialized by default, but 
> they might be needed if downstream operators need to perform lookups on the 
> result (such as when the join result participates in an equi-join).
> Currently, if the result is explicitly materialized (via Materialized), this 
> works correctly, but if the result is _not_ materialized explicitly, but _is_ 
> needed, the topology builder throws an exception that the result store isn't 
> added to the topology. This was an oversight in testing and review and needs 
> to be fixed ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9500) Foreign-Key Join creates an invalid topology

2020-02-03 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9500:
---
Affects Version/s: (was: 2.4.1)
   (was: 2.5.0)

> Foreign-Key Join creates an invalid topology
> 
>
> Key: KAFKA-9500
> URL: https://issues.apache.org/jira/browse/KAFKA-9500
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.4.1, 2.5.0
>
>
> Foreign-Key Join results are not required to be materialized by default, but 
> they might be needed if downstream operators need to perform lookups on the 
> result (such as when the join result participates in an equi-join).
> Currently, if the result is explicitly materialized (via Materialized), this 
> works correctly, but if the result is _not_ materialized explicitly, but _is_ 
> needed, the topology builder throws an exception that the result store isn't 
> added to the topology. This was an oversight in testing and review and needs 
> to be fixed ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8211) Flaky Test: ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan

2020-02-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029413#comment-17029413
 ] 

Matthias J. Sax commented on KAFKA-8211:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4468/testReport/junit/kafka.admin/ResetConsumerGroupOffsetTest/testResetOffsetsExportImportPlan/]

> Flaky Test: ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan
> -
>
> Key: KAFKA-8211
> URL: https://issues.apache.org/jira/browse/KAFKA-8211
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.3.0
>Reporter: Bill Bejeck
>Assignee: huxihx
>Priority: Major
> Fix For: 2.5.0
>
>
> Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20778/]
>  
> {noformat}
> Error Message
> java.lang.AssertionError: Expected that consumer group has consumed all 
> messages from topic/partition.
> Stacktrace
> java.lang.AssertionError: Expected that consumer group has consumed all 
> messages from topic/partition.
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:381)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.awaitConsumerProgress(ResetConsumerGroupOffsetTest.scala:364)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.produceConsumeAndShutdown(ResetConsumerGroupOffsetTest.scala:359)
>   at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:323)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> 

[jira] [Commented] (KAFKA-8110) Flaky Test DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions

2020-02-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029412#comment-17029412
 ] 

Matthias J. Sax commented on KAFKA-8110:


Different test: testDescribeGroupWithShortInitializationTimeout

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4468/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/]
{quote}java.lang.AssertionError: assertion failed at 
scala.Predef$.assert(Predef.scala:267) at 
kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:585){quote}

> Flaky Test 
> DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions
> --
>
> Key: KAFKA-8110
> URL: https://issues.apache.org/jira/browse/KAFKA-8110
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/67/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeMembersWithConsumersWithoutAssignedPartitions/]
> {quote}java.lang.AssertionError: Partition [__consumer_offsets,0] metadata 
> not propagated after 15000 ms at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:381) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791) at 
> kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:880) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$3(TestUtils.scala:318) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$3$adapted(TestUtils.scala:317) at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at 
> scala.collection.immutable.Range.foreach(Range.scala:158) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:237) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:230) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
> kafka.utils.TestUtils$.createTopic(TestUtils.scala:317) at 
> kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:375) at 
> kafka.admin.DescribeConsumerGroupTest.testDescribeMembersWithConsumersWithoutAssignedPartitions(DescribeConsumerGroupTest.scala:372){quote}
> STDOUT
> {quote}[2019-03-14 20:01:52,347] WARN Ignoring unexpected runtime exception 
> (org.apache.zookeeper.server.NIOServerCnxnFactory:236) 
> java.nio.channels.CancelledKeyException at 
> sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at 
> sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at 
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
>  at java.lang.Thread.run(Thread.java:748) TOPIC PARTITION CURRENT-OFFSET 
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID foo 0 0 0 0 - - - TOPIC 
> PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID foo 0 
> 0 0 0 - - - COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS 
> localhost:44669 (0){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8110) Flaky Test DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions

2020-02-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029411#comment-17029411
 ] 

Matthias J. Sax commented on KAFKA-8110:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/456/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupMembersWithShortInitializationTimeout/]

> Flaky Test 
> DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions
> --
>
> Key: KAFKA-8110
> URL: https://issues.apache.org/jira/browse/KAFKA-8110
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/67/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeMembersWithConsumersWithoutAssignedPartitions/]
> {quote}java.lang.AssertionError: Partition [__consumer_offsets,0] metadata 
> not propagated after 15000 ms at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:381) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791) at 
> kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:880) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$3(TestUtils.scala:318) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$3$adapted(TestUtils.scala:317) at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at 
> scala.collection.immutable.Range.foreach(Range.scala:158) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:237) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:230) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
> kafka.utils.TestUtils$.createTopic(TestUtils.scala:317) at 
> kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:375) at 
> kafka.admin.DescribeConsumerGroupTest.testDescribeMembersWithConsumersWithoutAssignedPartitions(DescribeConsumerGroupTest.scala:372){quote}
> STDOUT
> {quote}[2019-03-14 20:01:52,347] WARN Ignoring unexpected runtime exception 
> (org.apache.zookeeper.server.NIOServerCnxnFactory:236) 
> java.nio.channels.CancelledKeyException at 
> sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at 
> sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at 
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
>  at java.lang.Thread.run(Thread.java:748) TOPIC PARTITION CURRENT-OFFSET 
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID foo 0 0 0 0 - - - TOPIC 
> PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID foo 0 
> 0 0 0 - - - COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS 
> localhost:44669 (0){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9490) Some factory methods in Grouped are missing generic parameters

2020-02-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029408#comment-17029408
 ] 

ASF GitHub Bot commented on KAFKA-9490:
---

mjsax commented on pull request #8028: KAFKA-9490: Fix generics for Grouped
URL: https://github.com/apache/kafka/pull/8028
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Some factory methods in Grouped are missing generic parameters
> --
>
> Key: KAFKA-9490
> URL: https://issues.apache.org/jira/browse/KAFKA-9490
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Dariusz Kordonski
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 2.5.0
>
>
> The following methods in {{Grouped}} class seem to be missing generic 
> parameters {{}} in the declared return type:
> {code:java}
> public static  Grouped keySerde(final Serde keySerde) { return new  
>  Grouped<>(null, keySerde, null); 
> }
> public static  Grouped valueSerde(final Serde valueSerde) { return new 
> Grouped<>(null, null, valueSerde); 
> } {code}
> I think it both cases it should be:
> {code:java}
> public static  Grouped ...() {code}
> This causes "unchecked call" compiler warnings when called by clients.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9500) Foreign-Key Join creates an invalid topology

2020-02-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9500:
---
Component/s: streams

> Foreign-Key Join creates an invalid topology
> 
>
> Key: KAFKA-9500
> URL: https://issues.apache.org/jira/browse/KAFKA-9500
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Critical
>
> Foreign-Key Join results are not required to be materialized by default, but 
> they might be needed if downstream operators need to perform lookups on the 
> result (such as when the join result participates in an equi-join).
> Currently, if the result is explicitly materialized (via Materialized), this 
> works correctly, but if the result is _not_ materialized explicitly, but _is_ 
> needed, the topology builder throws an exception that the result store isn't 
> added to the topology. This was an oversight in testing and review and needs 
> to be fixed ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9503) TopologyTestDriver processes intermediate results in the wrong order

2020-02-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9503:
---
Component/s: streams-test-utils

> TopologyTestDriver processes intermediate results in the wrong order
> 
>
> Key: KAFKA-9503
> URL: https://issues.apache.org/jira/browse/KAFKA-9503
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> TopologyTestDriver has the feature that it processes each input 
> synchronously, resolving one of the most significant challenges with 
> verifying the correctness of streaming applications.
> When processing an input, it feeds that record to the source node, which then 
> synchronously (it's always synchronous within a task) gets passed through the 
> subtopology via Context#forward calls. Ultimately, outputs from that input 
> are forwarded into the RecordCollector, which converts it to Producer.send 
> calls. In TopologyTestDriver, this Producer is a special one that actually 
> just captures the records.
> Some output topics from one subtopology are inputs to another subtopology. 
> For example, repartition topics. Immediately after the synchronous 
> subtopology process() invocation, TopologyTestDriver iterates over the 
> collected outputs from the special Producer. If they are purely output 
> records, it just enqueues them for later retrieval by testing code. If they 
> are records for internal topics, though, TopologyTestDriver immediately 
> processes them as inputs  for the relevant subtopology.
> The problem, and this is very subtle, is that TopologyTestDriver does this 
> recursively, which with some (apparently rare) programs can cause the output 
> to be observed in an invalid order.
> One such program is the one I wrote to test the fix for KAFKA-9487 . It 
> involves a foreign-key join whose result is joined back to one of its inputs.
> {noformat}
> Here's a simplified version:
> // foreign key join
> J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b))
> // equi-join
> OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j))
> Let's say we have the following initial condition:
> A:
> a1 = {v: X, b: b1}
> B:
> b1 = {v: Y}
> J:
> a1 = Pair({v: X}, b: b1}, {v: Y})
> OUT:
> a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Now, piping an update:
> a1: {v: Z, b: b1}
> results immediately in two buffered results in the Producer:
> (FK join subscription): b1: {a1}
> (OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Note that the FK join result isn't updated synchronously, since it's an async 
> operation, so the RHS lookup is temporarily incorrect, yielding the nonsense 
> intermediate result where the outer pair has the updated value for a1, but 
> the inner (fk result) one still has the old value for a1.
> However! We don't buffer that output record for consumption by testing code 
> yet, we leave it in the internal Producer while we process the first 
> intermediate record (the FK subscription).
> Processing that internal record means that we have a new internal record to 
> process:
> (FK join subscription response): a1: {b1: {v: Y}}
> so right now, our internal-records-to-process stack looks like:
> (FK join subscription response): a1: {b1: {v: Y}}
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Again, we start by processing the first thing, the FK join response, which 
> results in an updated FK join result:
> (J) a1: Pair({v: Z}, b: b1}, {v: Y})
> and output:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> and, we still haven't handled the earlier output, so now our 
> internal-records-to-process stack looks like:
> (J) a1: Pair({v: Z}, b: b1}, {v: Y})
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> At this point, there's nothing else to process in internal topics, so we just 
> copy the records one by one to the "output" collection for later handling by 
> testing code, but this yields the wrong final state of:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> That was an incorrect intermediate result, but because we're processing 
> internal records recursively (as a stack), it winds up emitted at the end 
> instead of in the middle.
> If we change the processing model from a stack to a queue, the correct order 
> is preserved, and the final state is:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> {noformat}
> This is what I did in https://github.com/apache/kafka/pull/8015



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9503) TopologyTestDriver processes intermediate results in the wrong order

2020-02-03 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9503:

Description: 
TopologyTestDriver has the feature that it processes each input synchronously, 
resolving one of the most significant challenges with verifying the correctness 
of streaming applications.

When processing an input, it feeds that record to the source node, which then 
synchronously (it's always synchronous within a task) gets passed through the 
subtopology via Context#forward calls. Ultimately, outputs from that input are 
forwarded into the RecordCollector, which converts it to Producer.send calls. 
In TopologyTestDriver, this Producer is a special one that actually just 
captures the records.

Some output topics from one subtopology are inputs to another subtopology. For 
example, repartition topics. Immediately after the synchronous subtopology 
process() invocation, TopologyTestDriver iterates over the collected outputs 
from the special Producer. If they are purely output records, it just enqueues 
them for later retrieval by testing code. If they are records for internal 
topics, though, TopologyTestDriver immediately processes them as inputs  for 
the relevant subtopology.

The problem, and this is very subtle, is that TopologyTestDriver does this 
recursively, which with some (apparently rare) programs can cause the output to 
be observed in an invalid order.

One such program is the one I wrote to test the fix for KAFKA-9487 . It 
involves a foreign-key join whose result is joined back to one of its inputs.

{noformat}
Here's a simplified version:
// foreign key join
J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b))
// equi-join
OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j))

Let's say we have the following initial condition:
A:
a1 = {v: X, b: b1}
B:
b1 = {v: Y}
J:
a1 = Pair({v: X}, b: b1}, {v: Y})
OUT:
a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

Now, piping an update:
a1: {v: Z, b: b1}
results immediately in two buffered results in the Producer:
(FK join subscription): b1: {a1}
(OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
Note that the FK join result isn't updated synchronously, since it's an async 
operation, so the RHS lookup is temporarily incorrect, yielding the nonsense 
intermediate result where the outer pair has the updated value for a1, but the 
inner (fk result) one still has the old value for a1.

However! We don't buffer that output record for consumption by testing code 
yet, we leave it in the internal Producer while we process the first 
intermediate record (the FK subscription).
Processing that internal record means that we have a new internal record to 
process:
(FK join subscription response): a1: {b1: {v: Y}}

so right now, our internal-records-to-process stack looks like:
(FK join subscription response): a1: {b1: {v: Y}}
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

Again, we start by processing the first thing, the FK join response, which 
results in an updated FK join result:
(J) a1: Pair({v: Z}, b: b1}, {v: Y})
and output:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
and, we still haven't handled the earlier output, so now our 
internal-records-to-process stack looks like:

(J) a1: Pair({v: Z}, b: b1}, {v: Y})
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

At this point, there's nothing else to process in internal topics, so we just 
copy the records one by one to the "output" collection for later handling by 
testing code, but this yields the wrong final state of:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

That was an incorrect intermediate result, but because we're processing 
internal records recursively (as a stack), it winds up emitted at the end 
instead of in the middle.

If we change the processing model from a stack to a queue, the correct order is 
preserved, and the final state is:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))

{noformat}

This is what I did in https://github.com/apache/kafka/pull/8015

  was:
TopologyTestDriver has the feature that it processes each input synchronously, 
resolving one of the most significant challenges with verifying the correctness 
of streaming applications.

When processing an input, it feeds that record to the source node, which then 
synchronously (it's always synchronous within a task) gets passed through the 
subtopology via Context#forward calls. Ultimately, outputs from that input are 
forwarded into the RecordCollector, which converts it to Producer.send calls. 
In TopologyTestDriver, this Producer is a special one that actually just 
captures the records.

Some output topics from one subtopology are inputs to another subtopology. For 
example, repartition topics. Immediately after the synchronous subtopology 

[jira] [Created] (KAFKA-9503) TopologyTestDriver processes intermediate results in the wrong order

2020-02-03 Thread John Roesler (Jira)
John Roesler created KAFKA-9503:
---

 Summary: TopologyTestDriver processes intermediate results in the 
wrong order
 Key: KAFKA-9503
 URL: https://issues.apache.org/jira/browse/KAFKA-9503
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler
Assignee: John Roesler


TopologyTestDriver has the feature that it processes each input synchronously, 
resolving one of the most significant challenges with verifying the correctness 
of streaming applications.

When processing an input, it feeds that record to the source node, which then 
synchronously (it's always synchronous within a task) gets passed through the 
subtopology via Context#forward calls. Ultimately, outputs from that input are 
forwarded into the RecordCollector, which converts it to Producer.send calls. 
In TopologyTestDriver, this Producer is a special one that actually just 
captures the records.

Some output topics from one subtopology are inputs to another subtopology. For 
example, repartition topics. Immediately after the synchronous subtopology 
process() invocation, TopologyTestDriver iterates over the collected outputs 
from the special Producer. If they are purely output records, it just enqueues 
them for later retrieval by testing code. If they are records for internal 
topics, though, TopologyTestDriver immediately processes them as inputs  for 
the relevant subtopology.

The problem, and this is very subtle, is that TopologyTestDriver does this 
recursively, which with some (apparently rare) programs can cause the output to 
be observed in an invalid order.

One such program is the one I wrote to test the fix for KAFKA-9487 . It 
involves a foreign-key join whose result is joined back to one of its inputs.

Here's a simplified version:
// foreign key join
J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b))
// equi-join
OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j))

Let's say we have the following initial condition:
A:
a1 = {v: X, b: b1}
B:
b1 = {v: Y}
J:
a1 = Pair({v: X}, b: b1}, {v: Y})
OUT:
a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

Now, piping an update:
a1: {v: Z, b: b1}
results immediately in two buffered results in the Producer:
(FK join subscription): b1: {a1}
(OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
Note that the FK join result isn't updated synchronously, since it's an async 
operation, so the RHS lookup is temporarily incorrect, yielding the nonsense 
intermediate result where the outer pair has the updated value for a1, but the 
inner (fk result) one still has the old value for a1.

However! We don't buffer that output record for consumption by testing code 
yet, we leave it in the internal Producer while we process the first 
intermediate record (the FK subscription).
Processing that internal record means that we have a new internal record to 
process:
(FK join subscription response): a1: {b1: {v: Y}}

so right now, our internal-records-to-process stack looks like:
(FK join subscription response): a1: {b1: {v: Y}}
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

Again, we start by processing the first thing, the FK join response, which 
results in an updated FK join result:
(J) a1: Pair({v: Z}, b: b1}, {v: Y})
and output:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
and, we still haven't handled the earlier output, so now our 
internal-records-to-process stack looks like:

(J) a1: Pair({v: Z}, b: b1}, {v: Y})
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

At this point, there's nothing else to process in internal topics, so we just 
copy the records one by one to the "output" collection for later handling by 
testing code, but this yields the wrong final state of:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

That was an incorrect intermediate result, but because we're processing 
internal records recursively (as a stack), it winds up emitted at the end 
instead of in the middle.

If we change the processing model from a stack to a queue, the correct order is 
preserved, and the final state is:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))

This is what I did in https://github.com/apache/kafka/pull/8015



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9422) Track the set of topics a connector is using

2020-02-03 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029354#comment-17029354
 ] 

Konstantine Karantasis commented on KAFKA-9422:
---

Not sure why this jira ticket was not updated when the PR above was merged. 
KIP-558 has now been implemented, therefore I'm closing this ticket and marking 
it _{{done}}_. 

PR: 
https://github.com/apache/kafka/pull/8017

> Track the set of topics a connector is using
> 
>
> Key: KAFKA-9422
> URL: https://issues.apache.org/jira/browse/KAFKA-9422
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 2.5.0
>
>
> Now that soon (after 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
>  is implemented) source connectors will be able to create topics during 
> runtime with custom topic-specific properties, in ways beyond what automatic 
> topic creation could allow, a nice new feature would be to also keep track 
> which topics are actually used per connector, after such a connector is 
> created. 
> This information could be exposed by extending the Connect REST API to add a 
> topics endpoint under the connector endpoint (similar to the status or config 
> endpoints). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9422) Track the set of topics a connector is using

2020-02-03 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-9422.
---
Resolution: Fixed

> Track the set of topics a connector is using
> 
>
> Key: KAFKA-9422
> URL: https://issues.apache.org/jira/browse/KAFKA-9422
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 2.5.0
>
>
> Now that soon (after 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
>  is implemented) source connectors will be able to create topics during 
> runtime with custom topic-specific properties, in ways beyond what automatic 
> topic creation could allow, a nice new feature would be to also keep track 
> which topics are actually used per connector, after such a connector is 
> created. 
> This information could be exposed by extending the Connect REST API to add a 
> topics endpoint under the connector endpoint (similar to the status or config 
> endpoints). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9502) Use explicit "purge data" for windowed changelog topics instead of "delete,compact" topic config

2020-02-03 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9502:
--

 Summary: Use explicit "purge data" for windowed changelog topics 
instead of "delete,compact" topic config
 Key: KAFKA-9502
 URL: https://issues.apache.org/jira/browse/KAFKA-9502
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Via https://issues.apache.org/jira/browse/KAFKA-4015, we introduced topic 
cleanup policy "compact,delete" to allow windowed changelog topic in Kafka 
Stream to purge old windows eventually. (Note, that the key space for windowed 
changelog topics is unbounded and "compact" policy itself would imply unbounded 
grows of the topic).

To guard against clock drift we also added config 
`windowstore.changelog.additional.retention.ms` via 
https://issues.apache.org/jira/browse/KAFKA-3595. This config also prevents 
changelog truncation if the application is offline. Note the broker retention 
time is a mix of event- and wallclock-time.

Later we improved retention time handling for local state stores client side 
(https://issues.apache.org/jira/browse/KAFKA-6978 and 
https://issues.apache.org/jira/browse/KAFKA-7072).

In some other work, we switched to use "purge data" calls 
(https://issues.apache.org/jira/browse/KAFKA-6150) and set the retention time 
to infinite (https://issues.apache.org/jira/browse/KAFKA-6535), to avoid data 
loss for repartition topics

To improve Kafka Streams further, we should consider to combine the lessons 
from above and apply them to changelog topics, too:

Because state store retention is purely event-time based in Kafka Streams, 
there is a gap between store retention and changelog retention. This could lead 
to data loss if an application is offline for a long time and loses its client 
side state store, because the broker may roll the changelog forward and 
eventually delete old segments based on its wallclock time progress.

Hence, we should change the changelog topic configs back to "compact" only (or 
if necessary keep "compact,delete" but set an infinite retention time) and use 
explicit "purge data" calls to delete data in the changelog topics only when 
the state store also deletes its local data. This will keep the local state 
store and the changelog in-sync, protect against data loss, and make the config 
`windowstore.changelog.additional.retention.ms` unnecessary (we should do a KIP 
for this ticket to deprecate and remove the config) what helps to reduce broker 
sides storage as no unnecessary changelog data will be kept.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9501) Promote Standby tasks to active tasks without closing them

2020-02-03 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9501:
--

 Summary: Promote Standby tasks to active tasks without closing them
 Key: KAFKA-9501
 URL: https://issues.apache.org/jira/browse/KAFKA-9501
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


The purpose of StandbyTasks in Kafka Streams is fast failover. However, atm we 
close StandbyTasks (and create a new active task) if they are promoted to 
active.

While this works ok for persistent state stores, it renders hot standbys for 
in-memory state stores useless, because we drop the in-memory state when we 
close the StandbyTask and thus the new active tasks needs to reread the 
changelog topic to recreate the in-memory state.

Hence, we should promote StandbyTasks to active tasks without closing them. 
This will not fix the issue for in-memory stores, but will make rebalancing 
faster for persistent state stores, too, because closing and reopening RocksDB 
has significant overhead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-03 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029299#comment-17029299
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror] We have implemented KIP-491 internally.   

Using a dynamic config leader.deprioritized.list to set it for the cluster 
global level  ('') so in case of controller failover, the new 
controller will inherit this dynamic config settings. 

{code}
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
--entity-default --alter --add-config leader.deprioritized.list=10001

$ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/''
('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', 
ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, 
mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, 
dataLength=59, numChildren=0, pzxid=25790129667))
{code}

This will put the broker 10001 to the lowest priority when controller is 
considering leadership for that partition,  regardless this broker is in the 
1st position of the assignment (namely : preferred leader),  if this is 
currently serving leadership,  the preferred leader election will move it to 
another broker in the ISR. 

We have also implemented another feature separate from KIP-491 that when an 
empty broker is starting up,  a dynamic config for that broker called 
"{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" 
like current upstream behavior),   just like a consumer, it will fetch from 
current leaders'  latest offsets instead of  earliest(start) offset.  So this 
makes the failed empty broker coming up very fast.This feature is used 
together with KIP-491  {{leader.deprioritized.list}} to blacklist this broker 
to serve traffic (because it does not have enough data).  After it's in 
replication for sometime (retention of the broker/topic level),  this broker is 
completely caught-up, and the {{leader.deprioritized.list}}  is removed. and 
when preferred leader is run, this broker can serve traffic again.  We haven't 
proposed this in any KIP yet.  But I think this is also a good features. 

maybe I will restart the KIP-491 discussion in the dev mailing list.


> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9500) Foreign-Key Join creates an invalid topology

2020-02-03 Thread John Roesler (Jira)
John Roesler created KAFKA-9500:
---

 Summary: Foreign-Key Join creates an invalid topology
 Key: KAFKA-9500
 URL: https://issues.apache.org/jira/browse/KAFKA-9500
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0, 2.5.0, 2.4.1
Reporter: John Roesler
Assignee: John Roesler


Foreign-Key Join results are not required to be materialized by default, but 
they might be needed if downstream operators need to perform lookups on the 
result (such as when the join result participates in an equi-join).

Currently, if the result is explicitly materialized (via Materialized), this 
works correctly, but if the result is _not_ materialized explicitly, but _is_ 
needed, the topology builder throws an exception that the result store isn't 
added to the topology. This was an oversight in testing and review and needs to 
be fixed ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-03 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029299#comment-17029299
 ] 

GEORGE LI edited comment on KAFKA-4084 at 2/3/20 9:26 PM:
--

[~blodsbror] We have implemented KIP-491 internally.   

Using a dynamic config leader.deprioritized.list to set it for the cluster 
global level  ('') so in case of controller failover, the new 
controller will inherit this dynamic config settings. 

{code}
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
--entity-default --alter --add-config leader.deprioritized.list=10001

$ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/''
('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', 
ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, 
mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, 
dataLength=59, numChildren=0, pzxid=25790129667))
{code}

This will put the broker 10001 to the lowest priority when controller is 
considering leadership for that partition,  regardless this broker is in the 
1st position of the assignment (namely : preferred leader),  if this is 
currently serving leadership,  the preferred leader election will move it to 
another broker in the ISR. 

We have also implemented another feature separate from KIP-491 that when an 
empty broker is starting up,  a dynamic config for that broker called 
"{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" 
like current upstream behavior),   just like a consumer, it will fetch from 
current leaders'  latest offsets instead of  earliest(start) offset.  So this 
makes the failed empty broker coming up very fast.This feature is used 
together with KIP-491  {{leader.deprioritized.list}} to blacklist this broker 
to serve traffic (because it does not have enough data).  After it's in 
replication for sometime (retention of the broker/topic level),  this broker is 
completely caught-up, and the {{leader.deprioritized.list}}  is removed. and 
when preferred leader is run, this broker can serve traffic again.  We haven't 
proposed this in any KIP yet.  But I think this is also a good feature. 

maybe I will restart the KIP-491 discussion in the dev mailing list.



was (Author: sql_consulting):
[~blodsbror] We have implemented KIP-491 internally.   

Using a dynamic config leader.deprioritized.list to set it for the cluster 
global level  ('') so in case of controller failover, the new 
controller will inherit this dynamic config settings. 

{code}
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
--entity-default --alter --add-config leader.deprioritized.list=10001

$ zkcli -h zk_host1 get /kafka_cluster_name/config/brokers/''
('{"version":1,"config":{"leader.deprioritized.list":"10001"}}', 
ZnodeStat(czxid=25790129667, mzxid=25791108491, ctime=1572677228451, 
mtime=1580117760492, version=21, cversion=0, aversion=0, ephemeralOwner=0, 
dataLength=59, numChildren=0, pzxid=25790129667))
{code}

This will put the broker 10001 to the lowest priority when controller is 
considering leadership for that partition,  regardless this broker is in the 
1st position of the assignment (namely : preferred leader),  if this is 
currently serving leadership,  the preferred leader election will move it to 
another broker in the ISR. 

We have also implemented another feature separate from KIP-491 that when an 
empty broker is starting up,  a dynamic config for that broker called 
"{{replica.start.offset.strategy}}" is set to "latest" (default is "earliest" 
like current upstream behavior),   just like a consumer, it will fetch from 
current leaders'  latest offsets instead of  earliest(start) offset.  So this 
makes the failed empty broker coming up very fast.This feature is used 
together with KIP-491  {{leader.deprioritized.list}} to blacklist this broker 
to serve traffic (because it does not have enough data).  After it's in 
replication for sometime (retention of the broker/topic level),  this broker is 
completely caught-up, and the {{leader.deprioritized.list}}  is removed. and 
when preferred leader is run, this broker can serve traffic again.  We haven't 
proposed this in any KIP yet.  But I think this is also a good features. 

maybe I will restart the KIP-491 discussion in the dev mailing list.


> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: 

[jira] [Assigned] (KAFKA-5662) We should be able to specify min.insync.replicas for the __consumer_offsets topic

2020-02-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-5662:
-

Assignee: Jose Armando Garcia Sancio  (was: Stanislav Kozlovski)

> We should be able to specify min.insync.replicas for the __consumer_offsets 
> topic
> -
>
> Key: KAFKA-5662
> URL: https://issues.apache.org/jira/browse/KAFKA-5662
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: needs-kip
>
> The transaction log has a {{transaction.state.log.min.isr}} setting to 
> control the min.isr for the transaction log (by default the min.isr is 2 and 
> replication.factor is 3).
> Unfortunately, we don't have a similar setting for the offsets topic. We 
> should add the following {{offsets.topic.min.isr}} setting and default that 
> to 2 so that we have durability on the offsets topic. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-03 Thread Harsha (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029256#comment-17029256
 ] 

Harsha commented on KAFKA-4084:
---

[~junrao] any interest in KIP-491. We are using this in production and has been 
a critical feature to help us meet our producer latencies SLA.

As [~blodsbror] pointed out it can help in auto leader rebalance as well. 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams

2020-02-03 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029231#comment-17029231
 ] 

Sophie Blee-Goldman commented on KAFKA-9455:


This ticket just involves an implementation detail, so no KIP necessary

> Consider using TreeMap for in-memory stores of Streams
> --
>
> Key: KAFKA-9455
> URL: https://issues.apache.org/jira/browse/KAFKA-9455
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> From [~ableegoldman]: It's worth noting that it might be a good idea to 
> switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap 
> allows us to safely perform range queries without copying over the entire 
> keyset, but the performance on point queries seems to scale noticeably worse 
> with the number of unique keys. Point queries are used by aggregations while 
> range queries are used by windowed joins, but of course both are available 
> within the PAPI and for interactive queries so it's hard to say which we 
> should prefer. Maybe rather than make that tradeoff we should have one 
> version for efficient range queries (a "JoinWindowStore") and one for 
> efficient point queries ("AggWindowStore") - or something. I know we've had 
> similar thoughts for a different RocksDB store layout for Joins (although I 
> can't find that ticket anywhere..), it seems like the in-memory stores could 
> benefit from a special "Join" version as well cc/ Guozhang Wang
> Here are some random thoughts:
> 1. For kafka streams processing logic (i.e. without IQ), it's better to make 
> all processing logic relying on point queries rather than range queries. 
> Right now the only processor that use range queries are, as mentioned above, 
> windowed stream-stream joins. I think we should consider using a different 
> window implementation for this (and as a result also get rid of the 
> retainDuplicate flags) to refactor the windowed stream-stream join operation.
> 2. With 1), range queries would only be exposed as IQ. Depending on its usage 
> frequency I think it makes lots of sense to optimize for single-point queries.
> Of course, even without step 1) we should still consider using tree-map for 
> windowed in-memory stores to have a better scaling effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9484) Unnecessary LeaderAndIsr update following reassignment completion

2020-02-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-9484:
-

Assignee: Jose Armando Garcia Sancio  (was: Jason Gustafson)

> Unnecessary LeaderAndIsr update following reassignment completion
> -
>
> Key: KAFKA-9484
> URL: https://issues.apache.org/jira/browse/KAFKA-9484
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Following the completion of the reassignment, the controller executes two 
> steps: first, it elects a new leader (if needed) and sends a LeaderAndIsr 
> update (in any case) with the new target replica set; second, it removes 
> unneeded replicas from the replica set and sends another round of 
> LeaderAndIsr updates. I am doubting the need for the first round of updates 
> in the case that the leader doesn't needed changing. 
> For example, suppose we have the following reassignment state: 
> replicas=[1,2,3,4], adding=[4], removing=[1], isr=[1,2,3,4], leader=2, 
> epoch=10
> First the controller will bump the epoch with the target replica set, which 
> will result in a round of to the target replica set with the following state: 
> replicas=[2,3,4], adding=[], removing=[], isr=[1,2,3,4], leader=2, epoch=11 
> Immediately following this, the controller will bump the epoch again and 
> remove the unneeded replica. This will result in another round of 
> LeaderAndIsr requests with the following state: 
> replicas=[2,3,4], adding=[], removing=[], isr=[2,3,4], leader=2, epoch=12 
> The first round of LeaderAndIsr updates puzzles me a bit. It is justified in 
> the code with this comment: 
> {code} 
> B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader 
> from adding any replica in TRS - ORS back in the isr. 
> {code} 
> (I think the comment is backwards. It should be ORS (original replica set) - 
> TRS (target replica set).) 
> It sounds like we are trying to prevent a member of ORS from being added back 
> to the ISR, but even if it did get added, it would be removed in the next 
> step anyway. In the uncommon case that an ORS replica is out of sync, there 
> does not seem to be any benefit to this first update since it is basically 
> paying the cost of one write in order to save the speculative cost of one 
> write. Additionally, it would be useful if the protocol could enforce the 
> invariant that the ISR is always a subset of the replica set.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9484) Unnecessary LeaderAndIsr update following reassignment completion

2020-02-03 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9484:
---
Description: 
Following the completion of the reassignment, the controller executes two 
steps: first, it elects a new leader (if needed) and sends a LeaderAndIsr 
update (in any case) with the new target replica set; second, it removes 
unneeded replicas from the replica set and sends another round of LeaderAndIsr 
updates. I am doubting the need for the first round of updates in the case that 
the leader doesn't needed changing. 

For example, suppose we have the following reassignment state: 

replicas=[1,2,3,4], adding=[4], removing=[1], isr=[1,2,3,4], leader=2, epoch=10

First the controller will bump the epoch with the target replica set, which 
will result in a round of to the target replica set with the following state: 

replicas=[2,3,4], adding=[], removing=[], isr=[1,2,3,4], leader=2, epoch=11 

Immediately following this, the controller will bump the epoch again and remove 
the unneeded replica. This will result in another round of LeaderAndIsr 
requests with the following state: 

replicas=[2,3,4], adding=[], removing=[], isr=[2,3,4], leader=2, epoch=12 

The first round of LeaderAndIsr updates puzzles me a bit. It is justified in 
the code with this comment: 

{code} 
B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader 
from adding any replica in TRS - ORS back in the isr. 
{code} 
(I think the comment is backwards. It should be ORS (original replica set) - 
TRS (target replica set).) 

It sounds like we are trying to prevent a member of ORS from being added back 
to the ISR, but even if it did get added, it would be removed in the next step 
anyway. In the uncommon case that an ORS replica is out of sync, there does not 
seem to be any benefit to this first update since it is basically paying the 
cost of one write in order to save the speculative cost of one write. 
Additionally, it would be useful if the protocol could enforce the invariant 
that the ISR is always a subset of the replica set.

  was:
Following the completion of the reassignment, the controller executes two 
steps: first, it elects a new leader (if needed) and sends a LeaderAndIsr 
update (in any case) with the new target replica set; second, it removes 
unneeded replicas from the replica set and sends another round of LeaderAndIsr 
updates. I am doubting the need for the first round of updates in the case that 
the leader doesn't needed changing. 

For example, suppose we have the following reassignment state: 

replicas=[1,2,3,4], adding=[4], removing=[1], isr=[1,2,3,4], leader=2, epoch=10

First the controller will bump the epoch with the target replica set, which 
will result in a round of to the target replica set with the following state: 

replicas=[2,3,4], adding=[], removing=[], isr=[1,2,3,4], leader=2, epoch=11 

Immediately following this, the controller will bump the epoch again and remove 
the unneeded replica. This will result in another round of LeaderAndIsr 
requests with the following state: 

replicas=[2,3,4], adding=[], removing=[], isr=[1,2,3], leader=2, epoch=12 

The first round of LeaderAndIsr updates puzzles me a bit. It is justified in 
the code with this comment: 

{code} 
B3. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader 
from adding any replica in TRS - ORS back in the isr. 
{code} 
(I think the comment is backwards. It should be ORS (original replica set) - 
TRS (target replica set).) 

It sounds like we are trying to prevent a member of ORS from being added back 
to the ISR, but even if it did get added, it would be removed in the next step 
anyway. In the uncommon case that an ORS replica is out of sync, there does not 
seem to be any benefit to this first update since it is basically paying the 
cost of one write in order to save the speculative cost of one write. 
Additionally, it would be useful if the protocol could enforce the invariant 
that the ISR is always a subset of the replica set.


> Unnecessary LeaderAndIsr update following reassignment completion
> -
>
> Key: KAFKA-9484
> URL: https://issues.apache.org/jira/browse/KAFKA-9484
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Following the completion of the reassignment, the controller executes two 
> steps: first, it elects a new leader (if needed) and sends a LeaderAndIsr 
> update (in any case) with the new target replica set; second, it removes 
> unneeded replicas from the replica set and sends another round of 
> LeaderAndIsr updates. I am doubting the need for the first round of updates 
> in the case that the leader doesn't needed changing. 
> For example, 

[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-03 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029058#comment-17029058
 ] 

Evan Williams edited comment on KAFKA-4084 at 2/3/20 4:40 PM:
--

 We are seeing the same (or at least similar) issues. When terminating a broker 
in AWS, and assigning the same broker ID to it's replacement (that has a empty 
data dir). 

We have auto.leader.rebalance.enable=true and when the broker get's assigned as 
leader - we get 100% CPU usage (ReplicaFetcher threads), and many clients 
(streams) fail to connect, as the broker both tries to fetch and serve data. 

Am just thinking outside of the box a bit for a work around, but until KIP-491 
is implemented (if at all), is it possible to implement some logic/automation 
to set auto.leader.rebalance.enable=false on the replacement node, so that as 
soon as the Kafka service is started, it will prevent it from becoming leader, 
until under replication partitions = 0, and then reset 
auto.leader.rebalance.enable back to true, and restart the kafka service ? Or 
must auto.leader.rebalance.enable be set on all brokers (or all other brokers), 
for it to be effective ?

Or perhaps just script kafka-preferred-replica-election.sh to run, when CPU is 
lower on the new node, or under replication partitions = 0 ?


was (Author: blodsbror):
 We are seeing the same (or at least similar) issues. When terminating a broker 
in AWS, and assigning the same broker ID to it's replacement (that has a empty 
data dir). We have auto.leader.rebalance.enable=true and when the broker get's 
assigned as leader - we get 100% CPU usage (ReplicaFetcher threads), and many 
clients (streams) fail to connect, as the broker both tries to fetch and serve 
data. Am just thinking outside of the box a bit for a work around, but until 
KIP-491 is implemented (if at all), is it possible to implement some 
logic/automation to set auto.leader.rebalance.enable=false on the replacement 
node, so that as soon as the Kafka service is started, it will prevent it from 
becoming leader, until under replication partitions = 0, and then reset 
auto.leader.rebalance.enable back to true, and restart the kafka service ? Or 
must auto.leader.rebalance.enable be set on all brokers (or all other brokers), 
for it to be effective ?

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8796) A broker joining the cluster should be able to replicate without impacting the cluster

2020-02-03 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029073#comment-17029073
 ] 

Evan Williams edited comment on KAFKA-8796 at 2/3/20 4:29 PM:
--

[~dajac] Must you enforce throttling at both the topic and broker levels ? Or 
can you just enforce it at the broker level, which would be much simpler to 
automate ?

Ideally the following can be set on the brokers, to limit replication and 
hopefully lower CPU.


{{leader.replication.throttled.rate}}
{{follower.replication.throttled.rate}}


was (Author: blodsbror):
[~dajac] Must you enforce throttling at both the topic and broker levels ? Or 
can you just enforce it at the broker level, which would be much simpler to 
automate ?

> A broker joining the cluster should be able to replicate without impacting 
> the cluster
> --
>
> Key: KAFKA-8796
> URL: https://issues.apache.org/jira/browse/KAFKA-8796
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Marouane RAJI
>Priority: Major
> Attachments: image-2019-08-13-10-26-19-282.png, 
> image-2019-08-13-10-28-42-337.png
>
>
> Hi, 
> We run a cluster of 50 brokers, 1.4M msgs/sec at max, on AWS. We were using 
> m4.2xlarge. We are now moving to m5.2xlarge. Everytime we replace a broker 
> from scratch (EBSs are linked to ec2 instance..), the byte sent on the 
> replaced broker increase significantly and that seem to impact the cluster, 
> increasing the produce time and fetch time..
> This is our configuration per broker :
>  
>  
> {code:java}
> broker.id=11
> # Socket Server Settings 
> #
> # The port the socket server listens on
> port=9092
> advertised.host.name=ec2-xx-xx-xx-xx.eu-west-1.compute.amazonaws.com
> # The number of threads handling network requests
> num.network.threads=32
> # The number of threads doing disk I/O
> num.io.threads=16socket server socket.receive.buffer.bytes=1048576 
> socket.request.max.bytes=104857600 # The max time a connection can be idle 
> connections.max.idle.ms = 6 
> num.partitions=2 
> default.replication.factor=2 
> auto.leader.rebalance.enable=true 
> delete.topic.enable=true 
> compression.type=producer 
> log.message.format.version=0.9.0.1
> message.max.bytes=800 
> # The minimum age of a log file to be eligible for deletion 
> log.retention.hours=48 
> log.retention.bytes=30 
> log.segment.bytes=268435456 
> log.retention.check.interval.ms=6  
> log.cleaner.enable=true 
> log.cleaner.dedupe.buffer.size=268435456
> replica.fetch.max.bytes=8388608 
> replica.fetch.wait.max.ms=500 
> replica.lag.time.max.ms=1 
> num.replica.fetchers = 3 
> # Auto creation of topics on the server 
> auto.create.topics.enable=true 
> controlled.shutdown.enable=true 
> inter.broker.protocol.version=0.10.2 
> unclean.leader.election.enabled=True
> {code}
>  
> This is what we notice on replication :
> I high increase in byte received on the replaced broker
>  
> !image-2019-08-13-10-26-19-282.png!
> !image-2019-08-13-10-28-42-337.png!
> You can't see it the graph above but the increase in produce time stayed high 
> for 20minutes..
> We didn't see anything out of the ordinary in the logs.
> Please let us know if there is anything wrong in our config or if it is a 
> potential issue that needs fixing with kafka. 
> Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8796) A broker joining the cluster should be able to replicate without impacting the cluster

2020-02-03 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029073#comment-17029073
 ] 

Evan Williams commented on KAFKA-8796:
--

[~dajac] Must you enforce throttling at both the topic and broker levels ? Or 
can you just enforce it at the broker level, which would be much simpler to 
automate ?

> A broker joining the cluster should be able to replicate without impacting 
> the cluster
> --
>
> Key: KAFKA-8796
> URL: https://issues.apache.org/jira/browse/KAFKA-8796
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Marouane RAJI
>Priority: Major
> Attachments: image-2019-08-13-10-26-19-282.png, 
> image-2019-08-13-10-28-42-337.png
>
>
> Hi, 
> We run a cluster of 50 brokers, 1.4M msgs/sec at max, on AWS. We were using 
> m4.2xlarge. We are now moving to m5.2xlarge. Everytime we replace a broker 
> from scratch (EBSs are linked to ec2 instance..), the byte sent on the 
> replaced broker increase significantly and that seem to impact the cluster, 
> increasing the produce time and fetch time..
> This is our configuration per broker :
>  
>  
> {code:java}
> broker.id=11
> # Socket Server Settings 
> #
> # The port the socket server listens on
> port=9092
> advertised.host.name=ec2-xx-xx-xx-xx.eu-west-1.compute.amazonaws.com
> # The number of threads handling network requests
> num.network.threads=32
> # The number of threads doing disk I/O
> num.io.threads=16socket server socket.receive.buffer.bytes=1048576 
> socket.request.max.bytes=104857600 # The max time a connection can be idle 
> connections.max.idle.ms = 6 
> num.partitions=2 
> default.replication.factor=2 
> auto.leader.rebalance.enable=true 
> delete.topic.enable=true 
> compression.type=producer 
> log.message.format.version=0.9.0.1
> message.max.bytes=800 
> # The minimum age of a log file to be eligible for deletion 
> log.retention.hours=48 
> log.retention.bytes=30 
> log.segment.bytes=268435456 
> log.retention.check.interval.ms=6  
> log.cleaner.enable=true 
> log.cleaner.dedupe.buffer.size=268435456
> replica.fetch.max.bytes=8388608 
> replica.fetch.wait.max.ms=500 
> replica.lag.time.max.ms=1 
> num.replica.fetchers = 3 
> # Auto creation of topics on the server 
> auto.create.topics.enable=true 
> controlled.shutdown.enable=true 
> inter.broker.protocol.version=0.10.2 
> unclean.leader.election.enabled=True
> {code}
>  
> This is what we notice on replication :
> I high increase in byte received on the replaced broker
>  
> !image-2019-08-13-10-26-19-282.png!
> !image-2019-08-13-10-28-42-337.png!
> You can't see it the graph above but the increase in produce time stayed high 
> for 20minutes..
> We didn't see anything out of the ordinary in the logs.
> Please let us know if there is anything wrong in our config or if it is a 
> potential issue that needs fixing with kafka. 
> Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-03 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029058#comment-17029058
 ] 

Evan Williams commented on KAFKA-4084:
--

 We are seeing the same (or at least similar) issues. When terminating a broker 
in AWS, and assigning the same broker ID to it's replacement (that has a empty 
data dir). We have auto.leader.rebalance.enable=true and when the broker get's 
assigned as leader - we get 100% CPU usage (ReplicaFetcher threads), and many 
clients (streams) fail to connect, as the broker both tries to fetch and serve 
data. Am just thinking outside of the box a bit for a work around, but until 
KIP-491 is implemented (if at all), is it possible to implement some 
logic/automation to set auto.leader.rebalance.enable=false on the replacement 
node, so that as soon as the Kafka service is started, it will prevent it from 
becoming leader, until under replication partitions = 0, and then reset 
auto.leader.rebalance.enable back to true, and restart the kafka service ? Or 
must auto.leader.rebalance.enable be set on all brokers (or all other brokers), 
for it to be effective ?

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9027) Replace CreateAcls request/response with automated protocol

2020-02-03 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-9027.

Fix Version/s: 2.5.0
 Assignee: Ismael Juma  (was: Mickael Maison)
   Resolution: Fixed

> Replace CreateAcls request/response with automated protocol
> ---
>
> Key: KAFKA-9027
> URL: https://issues.apache.org/jira/browse/KAFKA-9027
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9028) Replace DeleteAcls request/response with automated protocol

2020-02-03 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-9028.

Fix Version/s: 2.5.0
 Assignee: Ismael Juma  (was: Mickael Maison)
   Resolution: Fixed

> Replace DeleteAcls request/response with automated protocol
> ---
>
> Key: KAFKA-9028
> URL: https://issues.apache.org/jira/browse/KAFKA-9028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9027) Replace CreateAcls request/response with automated protocol

2020-02-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17029010#comment-17029010
 ] 

ASF GitHub Bot commented on KAFKA-9027:
---

ijuma commented on pull request #7725: KAFKA-9027, KAFKA-9028: Convert 
create/delete acls requests/response to use generated protocol
URL: https://github.com/apache/kafka/pull/7725
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace CreateAcls request/response with automated protocol
> ---
>
> Key: KAFKA-9027
> URL: https://issues.apache.org/jira/browse/KAFKA-9027
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9499) Batch topics deletion

2020-02-03 Thread David Jacot (Jira)
David Jacot created KAFKA-9499:
--

 Summary: Batch topics deletion
 Key: KAFKA-9499
 URL: https://issues.apache.org/jira/browse/KAFKA-9499
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


I have noticed that the topics deletion process could be improve by batching 
topics here: 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/TopicDeletionManager.scala#L354]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9498) Topic validation during the creation trigger unnecessary TopicChange events

2020-02-03 Thread David Jacot (Jira)
David Jacot created KAFKA-9498:
--

 Summary: Topic validation during the creation trigger unnecessary 
TopicChange events 
 Key: KAFKA-9498
 URL: https://issues.apache.org/jira/browse/KAFKA-9498
 Project: Kafka
  Issue Type: Bug
Reporter: David Jacot
Assignee: David Jacot


I have found out that the topic validation logic, which is executed when 
CreateTopicPolicy or when validateOnly is set, triggers unnecessary ChangeTopic 
events in the controller. In the worst case, it can trigger up to one event per 
created topic and leads to overloading the controller.

This happens because the validation logic reads all the topics from ZK using 
the method getAllTopicsInCluster provided by the KafkaZKClient. This method 
registers a watch every time the topics are read from Zookeeper.

I think that we should make the watch registration optional for this call in 
oder to avoid this unwanted behaviour.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9497) Brokers start up even if SASL provider is not loaded and throw NPE when clients connect

2020-02-03 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-9497:
-

 Summary: Brokers start up even if SASL provider is not loaded and 
throw NPE when clients connect
 Key: KAFKA-9497
 URL: https://issues.apache.org/jira/browse/KAFKA-9497
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0, 1.1.1, 0.11.0.3, 0.10.2.2
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.5.0


Note: This is not a regression, this has been the behaviour since SASL was 
first implemented in Kafka.

 

Sasl.createSaslServer and Sasl.createSaslClient may return null if a SASL 
provider that works for the specified configs cannot be created. We don't 
currently handle this case. As a result broker/client throws 
NullPointerException if a provider has not been loaded. On the broker-side, we 
allow brokers to start up successfully even if SASL provider for its enabled 
mechanisms are not found. For SASL mechanisms PLAIN/SCRAM-xx/OAUTHBEARER, the 
login module in Kafka loads the SASL providers. If the login module is 
incorrectly configured, brokers startup and then fail client connections when 
hitting NPE. Clients see disconnections during authentication as a result. It 
is difficult to tell from the client or broker logs why the failure occurred. 
We should fail during startup if SASL providers are not found and provide 
better diagnostics for this case.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9146) Add option to force delete members in stream reset tool

2020-02-03 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028823#comment-17028823
 ] 

feyman commented on KAFKA-9146:
---

Thanks for your comments, this is very helpful ! [~bchen225242]

Let me initiate a KIP discussion and also think about the detailed plan .

> Add option to force delete members in stream reset tool
> ---
>
> Key: KAFKA-9146
> URL: https://issues.apache.org/jira/browse/KAFKA-9146
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>  Labels: newbie
>
> Sometimes people want to reset the stream application sooner, but blocked by 
> the left-over members inside group coordinator, which only expire after 
> session timeout. When user configures a really long session timeout, it could 
> prevent the group from clearing. We should consider adding the support to 
> cleanup members by forcing them to leave the group. To do that, 
>  # If the stream application is already on static membership, we could call 
> directly from adminClient.removeMembersFromGroup
>  # If the application is on dynamic membership, we should modify 
> adminClient.removeMembersFromGroup interface to allow deletion based on 
> member.id.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2020-02-03 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028811#comment-17028811
 ] 

VIkram commented on KAFKA-9280:
---

[~bchen225242]  Any update on this. I still face this issue

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for t229-0:12 ms has passed since batch creation_
> _}}}_
>  
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries 

[jira] [Commented] (KAFKA-9263) Reocurrence: Transient failure in kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDi

2020-02-03 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028762#comment-17028762
 ] 

Chia-Ping Tsai commented on KAFKA-9263:
---

update the title since KAFKA-9183 had renamed AdminClientIntegrationTest to 
PlaintextAdminIntegrationTest

> Reocurrence: Transient failure in 
> kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and 
> kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs
> --
>
> Key: KAFKA-9263
> URL: https://issues.apache.org/jira/browse/KAFKA-9263
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Priority: Major
>  Labels: flaky-test
>
> This test has failed for me on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
> Stacktrace
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
>   at 
> kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> Standard Output
> [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,091] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for 

[jira] [Updated] (KAFKA-9263) Reocurrence: Transient failure in kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs

2020-02-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-9263:
--
Summary: Reocurrence: Transient failure in 
kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and 
kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs  (was: 
Reocurrence: Transient failure in 
kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpointkafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs)

> Reocurrence: Transient failure in 
> kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and 
> kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs
> --
>
> Key: KAFKA-9263
> URL: https://issues.apache.org/jira/browse/KAFKA-9263
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Priority: Major
>  Labels: flaky-test
>
> This test has failed for me on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
> Stacktrace
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
>   at 
> kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> Standard Output
> [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
>