[jira] [Updated] (KAFKA-7950) Kafka tools GetOffsetShell -time description

2019-03-01 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated KAFKA-7950:
--
Issue Type: Wish  (was: Improvement)

> Kafka tools GetOffsetShell -time description 
> -
>
> Key: KAFKA-7950
> URL: https://issues.apache.org/jira/browse/KAFKA-7950
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 2.1.0
>Reporter: Kartik
>Assignee: Kartik
>Priority: Trivial
>
> In Kafka GetOffsetShell tool, The --time description should contain 
> information regarding what happens when the timestamp value  > recently 
> committed timestamp is given.
>  
> Expected: "If timestamp value provided is greater than recently committed 
> timestamp then no offset is returned. "
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782247#comment-16782247
 ] 

Matthias J. Sax commented on KAFKA-7937:


And again: 
[https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/41/pipeline]

> Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
> 
>
> Key: KAFKA-7937
> URL: https://issues.apache.org/jira/browse/KAFKA-7937
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline
> {quote}kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsNotExistingGroup FAILED 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306)
>  at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)
>  Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8026) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782246#comment-16782246
 ] 

Matthias J. Sax commented on KAFKA-8026:


Failed again: 
[https://builds.apache.org/blue/organizations/jenkins/kafka-1.1-jdk7/detail/kafka-1.1-jdk7/250/tests]

Seems something is broken on 1.1 branch.

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted
> 
>
> Key: KAFKA-8026
> URL: https://issues.apache.org/jira/browse/KAFKA-8026
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.0.2, 1.1.1
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 1.0.3, 1.1.2
>
>
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Stream tasks not updated
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:215){quote}
> Happend in 1.0 and 1.1 builds:
> [https://builds.apache.org/blue/organizations/jenkins/kafka-1.0-jdk7/detail/kafka-1.0-jdk7/263/tests/]
> and
> [https://builds.apache.org/blue/organizations/jenkins/kafka-1.1-jdk7/detail/kafka-1.1-jdk7/249/tests/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7288) Transient failure in SslSelectorTest.testCloseConnectionInClosingState

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782244#comment-16782244
 ] 

Matthias J. Sax commented on KAFKA-7288:


Failed again: 
https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3429/tests

> Transient failure in SslSelectorTest.testCloseConnectionInClosingState
> --
>
> Key: KAFKA-7288
> URL: https://issues.apache.org/jira/browse/KAFKA-7288
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 2.3.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.1.0, 2.3.0
>
>
> Noticed this failure in SslSelectorTest.testCloseConnectionInClosingState a 
> few times in unit tests in Jenkins:
> {quote}
> java.lang.AssertionError: Channel not expired expected null, but 
> was: at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotNull(Assert.java:755) at 
> org.junit.Assert.assertNull(Assert.java:737) at 
> org.apache.kafka.common.network.SelectorTest.testCloseConnectionInClosingState(SelectorTest.java:341)
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4730) Streams does not have an in-memory windowed store

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4730:
---
Labels: kip  (was: )

> Streams does not have an in-memory windowed store
> -
>
> Key: KAFKA-4730
> URL: https://issues.apache.org/jira/browse/KAFKA-4730
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
> Fix For: 2.3.0
>
>
> Streams has windowed persistent stores (e.g., see PersistentKeyValueFactory 
> interface with "windowed" method), however it does not allow for windowed 
> in-memory stores (e.g., see InMemoryKeyValueFactory interface). 
> In addition to the interface not allowing it, streams does not actually have 
> an implementation of an in-memory windowed store.
> The implications are that operations that require windowing cannot use 
> in-memory stores. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8029) Add in-memory bytes-only session store implementation

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8029:
---
Labels: needs-kip  (was: )

> Add in-memory bytes-only session store implementation
> -
>
> Key: KAFKA-8029
> URL: https://issues.apache.org/jira/browse/KAFKA-8029
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> As titled. We've added the window store and session store implementations in 
> memory, what's left is the session store now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8029) Add in-memory bytes-only session store implementation

2019-03-01 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-8029:


 Summary: Add in-memory bytes-only session store implementation
 Key: KAFKA-8029
 URL: https://issues.apache.org/jira/browse/KAFKA-8029
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang
Assignee: Sophie Blee-Goldman


As titled. We've added the window store and session store implementations in 
memory, what's left is the session store now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8025) Flaky Test RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8025:
---
Affects Version/s: 2.3.0

> Flaky Test 
> RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls
> 
>
> Key: KAFKA-8025
> URL: https://issues.apache.org/jira/browse/KAFKA-8025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: flaky-test
>
> At least one occurence where the following unit test case failed on a jenkins 
> job that didn't involve any related changes. 
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2783/consoleFull]
> I have not been able to reproduce it locally on Linux. (For instance 20 
> consecutive runs of this class pass all test cases)
> {code:java}
> 14:06:13 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllDbOptionsCalls STARTED 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/streams/build/reports/testOutput/org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls.test.stdout
>  14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllDbOptionsCalls FAILED 14:06:14     
> java.lang.AssertionError: 14:06:14     Expected: a string matching the 
> pattern 'Unexpected method call DBOptions\.baseBackgroundCompactions((.* 
> 14:06:14     *)*):' 14:06:14          but: was "Unexpected method call 
> DBOptions.baseBackgroundCompactions():\n    DBOptions.close(): expected: 3, 
> actual: 0" 14:06:14         at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) 14:06:14         
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) 14:06:14       
>   at 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.verifyDBOptionsMethodCall(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:121)
>  14:06:14         at 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:101)
>  14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllColumnFamilyCalls STARTED 14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllColumnFamilyCalls PASSED
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8018) Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed

2019-03-01 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782210#comment-16782210
 ] 

Jun Rao commented on KAFKA-8018:


Created a PR: https://github.com/apache/kafka/pull/6354

> Flaky Test 
> SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
> 
>
> Key: KAFKA-8018
> URL: https://issues.apache.org/jira/browse/KAFKA-8018
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Jun Rao
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/35/]
> {quote}org.apache.zookeeper.KeeperException$SessionExpiredException: 
> KeeperErrorCode = Session expired for 
> /brokers/topics/__consumer_offsets/partitions/3/state at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
> kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:534) at 
> kafka.zk.KafkaZkClient.getTopicPartitionState(KafkaZkClient.scala:891) at 
> kafka.zk.KafkaZkClient.getLeaderForPartition(KafkaZkClient.scala:901) at 
> kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:669) 
> at kafka.utils.TestUtils$.$anonfun$createTopic$1(TestUtils.scala:304) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$1$adapted(TestUtils.scala:302) at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at 
> scala.collection.immutable.Range.foreach(Range.scala:158) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:237) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:230) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
> kafka.utils.TestUtils$.createTopic(TestUtils.scala:302) at 
> kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:350) at 
> kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:95) at 
> kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73) at 
> kafka.api.AdminClientIntegrationTest.setUp(AdminClientIntegrationTest.scala:78)
>  at 
> kafka.api.SaslSslAdminClientIntegrationTest.setUp(SaslSslAdminClientIntegrationTest.scala:64){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8025) Flaky Test RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8025:
---
Fix Version/s: 2.3.0

> Flaky Test 
> RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls
> 
>
> Key: KAFKA-8025
> URL: https://issues.apache.org/jira/browse/KAFKA-8025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> At least one occurence where the following unit test case failed on a jenkins 
> job that didn't involve any related changes. 
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2783/consoleFull]
> I have not been able to reproduce it locally on Linux. (For instance 20 
> consecutive runs of this class pass all test cases)
> {code:java}
> 14:06:13 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllDbOptionsCalls STARTED 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/streams/build/reports/testOutput/org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls.test.stdout
>  14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllDbOptionsCalls FAILED 14:06:14     
> java.lang.AssertionError: 14:06:14     Expected: a string matching the 
> pattern 'Unexpected method call DBOptions\.baseBackgroundCompactions((.* 
> 14:06:14     *)*):' 14:06:14          but: was "Unexpected method call 
> DBOptions.baseBackgroundCompactions():\n    DBOptions.close(): expected: 3, 
> actual: 0" 14:06:14         at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) 14:06:14         
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) 14:06:14       
>   at 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.verifyDBOptionsMethodCall(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:121)
>  14:06:14         at 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:101)
>  14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllColumnFamilyCalls STARTED 14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllColumnFamilyCalls PASSED
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782206#comment-16782206
 ] 

Matthias J. Sax commented on KAFKA-7994:


Thanks [~vvcephei]. Sounds convincing! Glad that suppress() is not subject to 
this issue.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7651:
---
Component/s: unit tests
 core

> Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
> ---
>
> Key: KAFKA-7651
> URL: https://issues.apache.org/jira/browse/KAFKA-7651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Dong Lin
>Priority: Critical
> Fix For: 2.3.0
>
>
> Here is stacktrace from 
> https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/
> {code}
> Error Message
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
> Stacktrace
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7651:
---
Affects Version/s: 2.3.0

> Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
> ---
>
> Key: KAFKA-7651
> URL: https://issues.apache.org/jira/browse/KAFKA-7651
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 2.3.0
>Reporter: Dong Lin
>Priority: Critical
>
> Here is stacktrace from 
> https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/
> {code}
> Error Message
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
> Stacktrace
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7652:
-
Fix Version/s: (was: 2.2.0)

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-7652:
--

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7651:
---
Priority: Critical  (was: Major)

> Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
> ---
>
> Key: KAFKA-7651
> URL: https://issues.apache.org/jira/browse/KAFKA-7651
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Critical
>
> Here is stacktrace from 
> https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/
> {code}
> Error Message
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
> Stacktrace
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts

2019-03-01 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782184#comment-16782184
 ] 

Bill Bejeck commented on KAFKA-7651:


kafka.api.SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts failed 
again during PR build for Java 8 and Java 11

> Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
> ---
>
> Key: KAFKA-7651
> URL: https://issues.apache.org/jira/browse/KAFKA-7651
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dong Lin
>Priority: Major
>
> Here is stacktrace from 
> https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/
> {code}
> Error Message
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
> Stacktrace
> java.lang.AssertionError: Expected an exception of type 
> org.apache.kafka.common.errors.TimeoutException; got type 
> org.apache.kafka.common.errors.SslAuthenticationException
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4474) Poor kafka-streams throughput

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4474:
---
Fix Version/s: 0.11.0.0

> Poor kafka-streams throughput
> -
>
> Key: KAFKA-4474
> URL: https://issues.apache.org/jira/browse/KAFKA-4474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Juan J Chorro
>Assignee: Eno Thereska
>Priority: Major
> Fix For: 0.11.0.0
>
> Attachments: Performance test results.png, hctop sreenshot.png, 
> kafka-streams-bug-1.png, kafka-streams-bug-2.png
>
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' 
> topic, prints on the screen and produces in 'output' topic. All topics have 4 
> partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' 
> topic I detect that I'm receiving ~4K messages/second. I had next 
> configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 
> 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I 
> got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All 
> topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each 
> instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each 
> instance (And more or less same throughput that with num.stream.threads set 
> to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each 
> instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best 
> results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 and with 
> num.streams.threads is set to 1 I have ~4K messages/second always?
> - In case C. 4 instances with num.stream.threads set to 1 should be better 
> that 1 instance with num.stream.threads set to 4. Is corrects this 
> supposition?
> This is the kafka-streams application that I use: 
> https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5122) Kafka Streams unexpected off-heap memory growth

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5122.
--
Resolution: Not A Problem

> Kafka Streams unexpected off-heap memory growth
> ---
>
> Key: KAFKA-5122
> URL: https://issues.apache.org/jira/browse/KAFKA-5122
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux 64-bit
> Oracle JVM version "1.8.0_121"
>Reporter: Jon Buffington
>Assignee: Guozhang Wang
>Priority: Minor
>
> I have a Kafka Streams application that leaks off-heap memory at a rate of 
> 20MB per commit interval. The application is configured with a 1G heap; the 
> heap memory does not show signs of leaking. The application reaches 16g of 
> system memory usage before terminating and restarting.
> Application facts:
> * The data pipeline is source -> map -> groupByKey -> reduce -> to.
> * The reduce operation uses a tumbling time window 
> TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)).
> * The commit interval is five minutes (30ms).
> * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link 
> to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit 
> interval.
> * The application uses the schema registry for two pairs of serdes. One serde 
> pair is used to read from a source topic that has 40 partitions. The other 
> serde pair is used by the internal changelog and repartition topics created 
> by the groupByKey/reduce operations.
> * The source input rate varies between 500-1500 records/sec. The source rate 
> variation does not change the size or frequency of the leak.
> * The application heap has been configured using both 1024m and 2048m. The 
> only observed difference between the two JVM heap sizes is more old gen 
> collections at 1024m although there is little difference in throughput. JVM 
> settings are {-server -Djava.awt.headless=true -Xss256k 
> -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m 
> -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m 
> -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem 
> -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 
> -XX:MaxMetaspaceFreeRatio=80}
> * We configure a custom RocksDBConfigSetter to set 
> options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors)
> * Per 
> ,
>  the SSTables are being compacted. Total disk usage for the state files 
> (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables.
> * The application is written in Scala and compiled using version 2.12.1.
> • Oracle JVM version "1.8.0_121"
> Various experiments that had no effect on the leak rate:
> * Tried different RocksDB block sizes (4k, 16k, and 32k).
> * Different numbers of instances (1, 2, and 4).
> * Different numbers of threads (1, 4, 10, 40).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7419) Rolling sum for high frequency sream

2019-03-01 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782181#comment-16782181
 ] 

John Roesler commented on KAFKA-7419:
-

Oh, good!

> Rolling sum for high frequency sream
> 
>
> Key: KAFKA-7419
> URL: https://issues.apache.org/jira/browse/KAFKA-7419
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Stanislav Bausov
>Priority: Minor
>
> Have a task to count 24h market volume for high frequency trades stream. And 
> there is no solution out of the box. Windowing is not an option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4186:


> Transient failure in KStreamAggregationIntegrationTest
> --
>
> Key: KAFKA-4186
> URL: https://issues.apache.org/jira/browse/KAFKA-4186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>Priority: Major
>
> Saw this running locally off of trunk:
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
> shouldGroupByKey[1] FAILED
> java.lang.AssertionError: Condition not met within timeout 6. Did not 
> receive 10 number of records
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
> {code}
> 
> Re-opening this issue with a different test, but it seems the test suite 
> itself has a timing-dependent flakiness.
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. metadata 
> for topic=output-9 partition=0 not propagated to all brokers
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMetadataIsPropagated(IntegrationTestUtils.java:254)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForTopicPartitions(IntegrationTestUtils.java:246)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:209)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopics(EmbeddedKafkaCluster.java:168)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.createTopics(KStreamAggregationIntegrationTest.java:680)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.before(KStreamAggregationIntegrationTest.java:105)
> ...
> {code}
> Example: 
> https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/10564/testReport/junit/org.apache.kafka.streams.integration/KStreamAggregationIntegrationTest/shouldAggregateWindowed/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-03-01 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782178#comment-16782178
 ] 

John Roesler commented on KAFKA-7994:
-

I did some mental math when I made that last change to stream time. It's funny, 
I keep proving the math to myself, but I can't shake the feeling that there 
might be something wrong with it... Thanks for bringing this up, the more we go 
over it, the better.

 

It's always possible I've made an arithmetic error, but I don't think we'll 
ever emit earlier than necessary, and we shouldn't emit later than necessary 
either.

The stream-time variable is initialized to -1, which is smaller than any record 
timestamp (because we're not considering negative timestamps yet), so it's also 
smaller than any "real" stream time.

Records are only emitted from the suppression when a certain time passes 
*after* the record time. I.e., (ST is stream time, BR is a buffered record, D 
is suppression duration) when ST >= BR.time + D . Neither BR.time nor D can be 
negative, so that expression is never true for ST = -1 (the initial, "unknown 
time", value). Therefore, resetting stream time to -1 at any point is safe, in 
that it can never cause premature evictions.

 

It wouldn't be terrible if we sometimes held on to records a little longer than 
necessary, but actually, I think this won't happen either.

Stream time advances when we process new records through the Suppress 
processor, and the processing of a new record isn't complete until we emit all 
of the buffered records that should be emitted as a result of the stream time 
advancement that record causes. Therefore, we have an invariant that, if the 
processing of record R is complete, ST >= R.time and the buffer contains only 
buffered records BR such that BR.time > ST - D .

Now, if we restart after processing R, we'll forget all about that prior stream 
time, and just take the next record's (NR) timestamp as the new stream time. 
NR.time can either be larger or smaller than the previous ST. If it's larger, 
it's equivalent to a normal increment of stream time, and all the normal logic 
applies. The unusual situation is that if NR.time is smaller than the previous 
ST. This means the new stream time ST' is smaller than the prior stream time. 
If we hadn't had the "reset", what would have happened? Since ST' = max(ST, 
NR.time) and NR.time < ST, we get ST' = ST. The stream time doesn't advance at 
all, and we should not emit anything.

So whether we reset the stream time to -1 or preserve the prior ST, the exact 
same records will be emitted or not.

 

I wrote the above out long form to try and eliminate any hand-waving from my 
thinking about this. Is there any problem with the reasoning? If not, then we 
have a proof that resetting the stream time to -1 has no impact on suppress. 

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time 

[jira] [Reopened] (KAFKA-5122) Kafka Streams unexpected off-heap memory growth

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-5122:
--

> Kafka Streams unexpected off-heap memory growth
> ---
>
> Key: KAFKA-5122
> URL: https://issues.apache.org/jira/browse/KAFKA-5122
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux 64-bit
> Oracle JVM version "1.8.0_121"
>Reporter: Jon Buffington
>Assignee: Guozhang Wang
>Priority: Minor
>
> I have a Kafka Streams application that leaks off-heap memory at a rate of 
> 20MB per commit interval. The application is configured with a 1G heap; the 
> heap memory does not show signs of leaking. The application reaches 16g of 
> system memory usage before terminating and restarting.
> Application facts:
> * The data pipeline is source -> map -> groupByKey -> reduce -> to.
> * The reduce operation uses a tumbling time window 
> TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)).
> * The commit interval is five minutes (30ms).
> * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link 
> to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit 
> interval.
> * The application uses the schema registry for two pairs of serdes. One serde 
> pair is used to read from a source topic that has 40 partitions. The other 
> serde pair is used by the internal changelog and repartition topics created 
> by the groupByKey/reduce operations.
> * The source input rate varies between 500-1500 records/sec. The source rate 
> variation does not change the size or frequency of the leak.
> * The application heap has been configured using both 1024m and 2048m. The 
> only observed difference between the two JVM heap sizes is more old gen 
> collections at 1024m although there is little difference in throughput. JVM 
> settings are {-server -Djava.awt.headless=true -Xss256k 
> -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m 
> -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m 
> -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem 
> -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 
> -XX:MaxMetaspaceFreeRatio=80}
> * We configure a custom RocksDBConfigSetter to set 
> options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors)
> * Per 
> ,
>  the SSTables are being compacted. Total disk usage for the state files 
> (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables.
> * The application is written in Scala and compiled using version 2.12.1.
> • Oracle JVM version "1.8.0_121"
> Various experiments that had no effect on the leak rate:
> * Tried different RocksDB block sizes (4k, 16k, and 32k).
> * Different numbers of instances (1, 2, and 4).
> * Different numbers of threads (1, 4, 10, 40).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5122) Kafka Streams unexpected off-heap memory growth

2019-03-01 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782176#comment-16782176
 ] 

Guozhang Wang commented on KAFKA-5122:
--

It was not a leak, will mark as not a problem.

> Kafka Streams unexpected off-heap memory growth
> ---
>
> Key: KAFKA-5122
> URL: https://issues.apache.org/jira/browse/KAFKA-5122
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux 64-bit
> Oracle JVM version "1.8.0_121"
>Reporter: Jon Buffington
>Assignee: Guozhang Wang
>Priority: Minor
>
> I have a Kafka Streams application that leaks off-heap memory at a rate of 
> 20MB per commit interval. The application is configured with a 1G heap; the 
> heap memory does not show signs of leaking. The application reaches 16g of 
> system memory usage before terminating and restarting.
> Application facts:
> * The data pipeline is source -> map -> groupByKey -> reduce -> to.
> * The reduce operation uses a tumbling time window 
> TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)).
> * The commit interval is five minutes (30ms).
> * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link 
> to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit 
> interval.
> * The application uses the schema registry for two pairs of serdes. One serde 
> pair is used to read from a source topic that has 40 partitions. The other 
> serde pair is used by the internal changelog and repartition topics created 
> by the groupByKey/reduce operations.
> * The source input rate varies between 500-1500 records/sec. The source rate 
> variation does not change the size or frequency of the leak.
> * The application heap has been configured using both 1024m and 2048m. The 
> only observed difference between the two JVM heap sizes is more old gen 
> collections at 1024m although there is little difference in throughput. JVM 
> settings are {-server -Djava.awt.headless=true -Xss256k 
> -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m 
> -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m 
> -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem 
> -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 
> -XX:MaxMetaspaceFreeRatio=80}
> * We configure a custom RocksDBConfigSetter to set 
> options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors)
> * Per 
> ,
>  the SSTables are being compacted. Total disk usage for the state files 
> (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables.
> * The application is written in Scala and compiled using version 2.12.1.
> • Oracle JVM version "1.8.0_121"
> Various experiments that had no effect on the leak rate:
> * Tried different RocksDB block sizes (4k, 16k, and 32k).
> * Different numbers of instances (1, 2, and 4).
> * Different numbers of threads (1, 4, 10, 40).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3826) Sampling on throughput / latency metrics recording in Streams

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3826.

Resolution: Duplicate

> Sampling on throughput / latency metrics recording in Streams
> -
>
> Key: KAFKA-3826
> URL: https://issues.apache.org/jira/browse/KAFKA-3826
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: architecture, performance
>
> In Kafka Streams we record throughput / latency metrics on EACH processing 
> record, causing a lot of recording overhead. Instead, we should consider 
> statistically sampling messages flowing through to measures latency and 
> throughput.
> This is based on our observations from KAFKA-3769 and KAFKA-3811.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-3826) Sampling on throughput / latency metrics recording in Streams

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3826:


> Sampling on throughput / latency metrics recording in Streams
> -
>
> Key: KAFKA-3826
> URL: https://issues.apache.org/jira/browse/KAFKA-3826
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: architecture, performance
>
> In Kafka Streams we record throughput / latency metrics on EACH processing 
> record, causing a lot of recording overhead. Instead, we should consider 
> statistically sampling messages flowing through to measures latency and 
> throughput.
> This is based on our observations from KAFKA-3769 and KAFKA-3811.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4721) KafkaStreams (and possibly others) should inherit Closeable

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4721:


> KafkaStreams (and possibly others) should inherit Closeable
> ---
>
> Key: KAFKA-4721
> URL: https://issues.apache.org/jira/browse/KAFKA-4721
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Priority: Major
>  Labels: needs-kip
>
> KafkaStreams should inherit AutoCloseable or Closeable so that you can use 
> try-with-resources:
> {code}
> try (KafkaStreams reader = storage.createStreams(builder)) {
> reader.start();
> stopCondition.join();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-3884) KGroupedStreamIntegrationTest.shouldAggregate seems to be hanging

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3884:


> KGroupedStreamIntegrationTest.shouldAggregate seems to be hanging
> -
>
> Key: KAFKA-3884
> URL: https://issues.apache.org/jira/browse/KAFKA-3884
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Ismael Juma
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: transient-unit-test-failure
>
> Build failed after 180 minutes and the last 2 lines were:
> {code}
> org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > 
> shouldReduce PASSED
> org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > 
> shouldAggregate STARTED
> {code}
> https://builds.apache.org/job/kafka-trunk-jdk8/712/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3884) KGroupedStreamIntegrationTest.shouldAggregate seems to be hanging

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3884.

Resolution: Duplicate

> KGroupedStreamIntegrationTest.shouldAggregate seems to be hanging
> -
>
> Key: KAFKA-3884
> URL: https://issues.apache.org/jira/browse/KAFKA-3884
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Ismael Juma
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: transient-unit-test-failure
>
> Build failed after 180 minutes and the last 2 lines were:
> {code}
> org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > 
> shouldReduce PASSED
> org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > 
> shouldAggregate STARTED
> {code}
> https://builds.apache.org/job/kafka-trunk-jdk8/712/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4281) Should be able to forward aggregation values immediately

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4281:


> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Major
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4186.

Resolution: Cannot Reproduce

> Transient failure in KStreamAggregationIntegrationTest
> --
>
> Key: KAFKA-4186
> URL: https://issues.apache.org/jira/browse/KAFKA-4186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>Priority: Major
>
> Saw this running locally off of trunk:
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
> shouldGroupByKey[1] FAILED
> java.lang.AssertionError: Condition not met within timeout 6. Did not 
> receive 10 number of records
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
> {code}
> 
> Re-opening this issue with a different test, but it seems the test suite 
> itself has a timing-dependent flakiness.
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. metadata 
> for topic=output-9 partition=0 not propagated to all brokers
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMetadataIsPropagated(IntegrationTestUtils.java:254)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForTopicPartitions(IntegrationTestUtils.java:246)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:209)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopics(EmbeddedKafkaCluster.java:168)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.createTopics(KStreamAggregationIntegrationTest.java:680)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.before(KStreamAggregationIntegrationTest.java:105)
> ...
> {code}
> Example: 
> https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/10564/testReport/junit/org.apache.kafka.streams.integration/KStreamAggregationIntegrationTest/shouldAggregateWindowed/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4281) Should be able to forward aggregation values immediately

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4281.

Resolution: Duplicate

> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Major
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-03-01 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782175#comment-16782175
 ] 

Guozhang Wang commented on KAFKA-7652:
--

Hi [~jonathanpdx] we are not voting on the 2.2.0 RC1 right now, if it is 
accepted then 2.2.0 is final and this PR would not be included; if it is 
cancelled we will see if we can push it into 2.2.0.

On the other hand, the PR I gave you is a bit hacky as it is just to validate 
the root cause, and I'd like to have a thorough profiling and see if we should 
consider this as a general regression fix not only for session store, but also 
for window stores. We will start the investigation right away, but in the worst 
case if we cannot get the clean fix into 2.2.0 we will cut out a 2.2.1 release 
immediately for this purpose as well. At the mean time, I think it is safe for 
your application to turn off caching since in session-windowed aggregations, as 
long as your records timestamp is monotonically increasing and there's little 
out-of-ordering data, your will keep merging / expanding your sessions as you 
accepts new data which means that you'd not have too many overwrites on the 
store that can be de-duplicated -- if you see the downstream traffic increased 
by a log if caching is not used please let me know, and we can look into that 
as well.

cc [~ableegoldman].

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4601:
---
Fix Version/s: 2.1.0

> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: performance
> Fix For: 2.1.0
>
>
> Consider the following DSL:
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1");
> Stream mapped = source.map(..);
> KTable counts = mapped
> .groupByKey()
> .count("Counts");
> KStream sink = mapped.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>   topics: [X-Counts-repartition]
>   children:   
> [KSTREAM-AGGREGATE-02]
>   KSTREAM-AGGREGATE-02:
>   states: [Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the 
> join, which not only introduce unnecessary overheads but also mess up the 
> processing ordering (users are expecting each record to go through 
> aggregation first then the join operator). And in order to get the following 
> simpler topology users today need to add a {{through}} operator after {{map}} 
> manually to enforce repartitioning.
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1");
> Stream repartitioned = source.map(..).through("topic2");
> KTable counts = repartitioned
> .groupByKey()
> .count("Counts");
> KStream sink = repartitioned.leftJoin(counts, ..);
> {code}
> The resulted topology then will look like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-SINK-02]
>   KSTREAM-SINK-02:
>   topic:  topic 2
> ProcessorTopology:
>   KSTREAM-SOURCE-03:
>   topics: [topic 2]
>   children:   
> [KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05]
>   KSTREAM-AGGREGATE-04:
>   states: [Counts]
>   KSTREAM-LEFTJOIN-05:
>   states: [Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can 
> consider doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5063:


> Flaky 
> ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
> -
>
> Key: KAFKA-5063
> URL: https://issues.apache.org/jira/browse/KAFKA-5063
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: unit-test
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError: 
> Expected: <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 4), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 3)]>
>  but: was <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5063) Flaky ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5063.

Resolution: Cannot Reproduce

> Flaky 
> ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic
> -
>
> Key: KAFKA-5063
> URL: https://issues.apache.org/jira/browse/KAFKA-5063
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: unit-test
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
> java.lang.AssertionError: 
> Expected: <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4), KeyValue(2983939126895, 4), 
> KeyValue(2983939126915, 3), KeyValue(2983939126935, 3)]>
>  but: was <[KeyValue(2983939126775, 1), KeyValue(2983939126815, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 1), KeyValue(2983939126855, 1), 
> KeyValue(2983939126835, 1), KeyValue(2983939126795, 1), 
> KeyValue(2983939126815, 2), KeyValue(2983939126835, 2), 
> KeyValue(2983939126855, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126835, 2), KeyValue(2983939126855, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126875, 1), 
> KeyValue(2983939126855, 2), KeyValue(2983939126875, 2), 
> KeyValue(2983939126895, 1), KeyValue(2983939126915, 1), 
> KeyValue(2983939126875, 2), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 1), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 2), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 1), 
> KeyValue(2983939126875, 3), KeyValue(2983939126895, 3), 
> KeyValue(2983939126915, 2), KeyValue(2983939126935, 2), 
> KeyValue(2983939126875, 4)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4721) KafkaStreams (and possibly others) should inherit Closeable

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4721.

Resolution: Duplicate

> KafkaStreams (and possibly others) should inherit Closeable
> ---
>
> Key: KAFKA-4721
> URL: https://issues.apache.org/jira/browse/KAFKA-4721
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Priority: Major
>  Labels: needs-kip
>
> KafkaStreams should inherit AutoCloseable or Closeable so that you can use 
> try-with-resources:
> {code}
> try (KafkaStreams reader = storage.createStreams(builder)) {
> reader.start();
> stopCondition.join();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4694) Streams smoke tests fails when there is only 1 broker

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4694:
---
Fix Version/s: 0.10.2.0

> Streams smoke tests fails when there is only 1 broker
> -
>
> Key: KAFKA-4694
> URL: https://issues.apache.org/jira/browse/KAFKA-4694
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ewen Cheslack-Postava
>Assignee: Eno Thereska
>Priority: Major
> Fix For: 0.10.2.0
>
>
> The streams smoke test fails when we have a single broker since by default it 
> requires a replication factor of 2. So in StreamsKafkaClient:createTopics we 
> can get an INVALID_REPLICATION_FACTOR code.
> As part of this commit, it appears we do not check the number of brokers 
> available and thus fail if there aren't enough brokers (check for 
> getBrokers() in here: 
> https://github.com/apache/kafka/commit/4b71c0bdc1acf244e3c96fa809a1a0e48471d586#diff-4320e27e72244cae71428533cf3582ef)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4732) Unstable test: KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion[1]

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4732.

Resolution: Duplicate

> Unstable test: KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion[1]
> -
>
> Key: KAFKA-4732
> URL: https://issues.apache.org/jira/browse/KAFKA-4732
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:259)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:221)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:190)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:295)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4732) Unstable test: KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion[1]

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4732:


> Unstable test: KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion[1]
> -
>
> Key: KAFKA-4732
> URL: https://issues.apache.org/jira/browse/KAFKA-4732
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:259)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:221)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:190)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:295)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4849) Bug in KafkaStreams documentation

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4849.

Resolution: Duplicate  (was: Fixed)

> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4953) Global Store: cast exception when initialising with in-memory logged state store

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4953.

Resolution: Duplicate

> Global Store: cast exception when initialising with in-memory logged state 
> store
> 
>
> Key: KAFKA-4953
> URL: https://issues.apache.org/jira/browse/KAFKA-4953
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Yennick Trevels
>Priority: Major
>  Labels: user-experience
>
> Currently it is not possible to initialise a global store with an in-memory 
> *logged* store via the TopologyBuilder. This results in the following 
> exception:
> {code}
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl 
> cannot be cast to 
> org.apache.kafka.streams.processor.internals.RecordCollector$Supplier
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:52)
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:44)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
>   at 
> org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97)
>   at 
> org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)
>   at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:215)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235)
>   ...
> {code}
> I've created a PR which includes a unit this to verify this behavior.
> If the below PR gets merge, the fixing PR should leverage the provided test 
> {{ProcessorTopologyTest#shouldDriveInMemoryLoggedGlobalStore}} by removing 
> the {{@ignore}} annotation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-4953) Global Store: cast exception when initialising with in-memory logged state store

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4953:


> Global Store: cast exception when initialising with in-memory logged state 
> store
> 
>
> Key: KAFKA-4953
> URL: https://issues.apache.org/jira/browse/KAFKA-4953
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Yennick Trevels
>Priority: Major
>  Labels: user-experience
>
> Currently it is not possible to initialise a global store with an in-memory 
> *logged* store via the TopologyBuilder. This results in the following 
> exception:
> {code}
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl 
> cannot be cast to 
> org.apache.kafka.streams.processor.internals.RecordCollector$Supplier
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:52)
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.(StoreChangeLogger.java:44)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
>   at 
> org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97)
>   at 
> org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)
>   at 
> org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:215)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235)
>   ...
> {code}
> I've created a PR which includes a unit this to verify this behavior.
> If the below PR gets merge, the fixing PR should leverage the provided test 
> {{ProcessorTopologyTest#shouldDriveInMemoryLoggedGlobalStore}} by removing 
> the {{@ignore}} annotation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7628) KafkaStream is not closing

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7628.
--
Resolution: Not A Problem

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6092) Time passed in punctuate call is currentTime, not punctuate schedule time.

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6092.

Resolution: Not A Problem

> Time passed in punctuate call is currentTime, not punctuate schedule time. 
> ---
>
> Key: KAFKA-6092
> URL: https://issues.apache.org/jira/browse/KAFKA-6092
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Priority: Major
>
> The java doc specifies that for a Transformer, calling context.schedule calls 
> punctuate every 1000ms. This is not entirely accurate, as if no data is 
> received for a while, punctuate won't be called.
> {code}
>  * void init(ProcessorContext context) {
>  * this.context = context;
>  * this.state = context.getStateStore("myTransformState");
>  * context.schedule(1000); // call #punctuate() each 
> 1000ms
>  * }
> {code}
> When you receive new data say after 20 seconds, punctuate will play catch up 
> and will be called 20 times at reception of the new data. 
> the signature of punctuate is
> {code}
> * KeyValue punctuate(long timestamp) {
>  * // can access this.state
>  * // can emit as many new KeyValue pairs as required via 
> this.context#forward()
>  * return null; // don't return result -- can also be 
> "new KeyValue()"
>  * }
> {code}
> but the timestamp being passed is currentTimestamp at the time of the call to 
> punctuate, not at the time the punctuate was scheduled. It is very confusing 
> and I think the timestamp should represent the one at which the punctuate 
> should have been scheduled. Getting the current timestamp is not adding much 
> information as it can easily obtained using  System.currentTimeMillis();



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7628) KafkaStream is not closing

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-7628:
--

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5122) Kafka Streams unexpected off-heap memory growth

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782144#comment-16782144
 ] 

Matthias J. Sax commented on KAFKA-5122:


[~guozhang] What is the fixed version for this?

> Kafka Streams unexpected off-heap memory growth
> ---
>
> Key: KAFKA-5122
> URL: https://issues.apache.org/jira/browse/KAFKA-5122
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux 64-bit
> Oracle JVM version "1.8.0_121"
>Reporter: Jon Buffington
>Assignee: Guozhang Wang
>Priority: Minor
>
> I have a Kafka Streams application that leaks off-heap memory at a rate of 
> 20MB per commit interval. The application is configured with a 1G heap; the 
> heap memory does not show signs of leaking. The application reaches 16g of 
> system memory usage before terminating and restarting.
> Application facts:
> * The data pipeline is source -> map -> groupByKey -> reduce -> to.
> * The reduce operation uses a tumbling time window 
> TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.HOURS.toMillis(168)).
> * The commit interval is five minutes (30ms).
> * The application links to v0.10.2.0-cp1 of the Kakfa libraries. When I link 
> to the current 0.10.2.1 RC3, the leak rate changes to ~10MB per commit 
> interval.
> * The application uses the schema registry for two pairs of serdes. One serde 
> pair is used to read from a source topic that has 40 partitions. The other 
> serde pair is used by the internal changelog and repartition topics created 
> by the groupByKey/reduce operations.
> * The source input rate varies between 500-1500 records/sec. The source rate 
> variation does not change the size or frequency of the leak.
> * The application heap has been configured using both 1024m and 2048m. The 
> only observed difference between the two JVM heap sizes is more old gen 
> collections at 1024m although there is little difference in throughput. JVM 
> settings are {-server -Djava.awt.headless=true -Xss256k 
> -XX:MaxMetaspaceSize=128m -XX:ReservedCodeCacheSize=64m 
> -XX:CompressedClassSpaceSize=32m -XX:MaxDirectMemorySize=128m 
> -XX:+AlwaysPreTouch -XX:+UseG1GC -XX:MaxGCPauseMillis=50 
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+PerfDisableSharedMem 
> -XX:+UseStringDeduplication -XX:MinMetaspaceFreeRatio=50 
> -XX:MaxMetaspaceFreeRatio=80}
> * We configure a custom RocksDBConfigSetter to set 
> options.setMaxBackgroundCompactions(Runtime.getRuntime.availableProcessors)
> * Per 
> ,
>  the SSTables are being compacted. Total disk usage for the state files 
> (RocksDB) is ~2.5g. Per partition and window, there are 3-4 SSTables.
> * The application is written in Scala and compiled using version 2.12.1.
> • Oracle JVM version "1.8.0_121"
> Various experiments that had no effect on the leak rate:
> * Tried different RocksDB block sizes (4k, 16k, and 32k).
> * Different numbers of instances (1, 2, and 4).
> * Different numbers of threads (1, 4, 10, 40).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2019-03-01 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782143#comment-16782143
 ] 

Guozhang Wang commented on KAFKA-7628:
--

It was not a bug, I should mark it as "not a problem."

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5156) Options for handling exceptions in streams

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5156:
---
Fix Version/s: 1.1.0

> Options for handling exceptions in streams
> --
>
> Key: KAFKA-5156
> URL: https://issues.apache.org/jira/browse/KAFKA-5156
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: user-experience
> Fix For: 1.1.0
>
>
> This is a task around options for handling exceptions in streams. It focuses 
> around options for dealing with corrupt data (keep going, stop streams, log, 
> retry, etc).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5551) StreamThread should not expose methods for testing

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5551.

Resolution: Duplicate

> StreamThread should not expose methods for testing
> --
>
> Key: KAFKA-5551
> URL: https://issues.apache.org/jira/browse/KAFKA-5551
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> {{StreamsThread}} currently exposes {{createStreamTask()}} and 
> {{createStandbyTask()}} as {{protected}} in order to inject "test tasks" in 
> unit tests. We should rework this and make both methods {{private}}. Maybe we 
> can introduce a {{TaskSupplier}} similar to {{KafkaClientSupplier}} (however, 
> {{TaskSupplier}} should not be public API and be in package {{internal}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5551) StreamThread should not expose methods for testing

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5551:


> StreamThread should not expose methods for testing
> --
>
> Key: KAFKA-5551
> URL: https://issues.apache.org/jira/browse/KAFKA-5551
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> {{StreamsThread}} currently exposes {{createStreamTask()}} and 
> {{createStandbyTask()}} as {{protected}} in order to inject "test tasks" in 
> unit tests. We should rework this and make both methods {{private}}. Maybe we 
> can introduce a {{TaskSupplier}} similar to {{KafkaClientSupplier}} (however, 
> {{TaskSupplier}} should not be public API and be in package {{internal}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5571) Possible deadlock during shutdown in setState in kafka streams 10.2

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5571:
---
Fix Version/s: (was: 1.1.0)
   1.0.0

> Possible deadlock during shutdown in setState in kafka streams 10.2
> ---
>
> Key: KAFKA-5571
> URL: https://issues.apache.org/jira/browse/KAFKA-5571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Greg Fodor
>Assignee: Eno Thereska
>Priority: Major
> Fix For: 1.0.0
>
> Attachments: kafka-streams.deadlock.log
>
>
> I'm running a 10.2 job across 5 nodes with 32 stream threads on each node and 
> find that when gracefully shutdown all of them at once via an ansible 
> scripts, some of the nodes end up freezing -- at a glance the attached thread 
> dump implies a deadlock between stream threads trying to update their state 
> via setState. We haven't had this problem before but it may or may not be 
> related to changes in 10.2 (we are upgrading from 10.0 to 10.2)
> when we gracefully shutdown all nodes simultaneously, what typically happens 
> is some subset of the nodes end up not shutting down completely but end up 
> going through a rebalance first. it seems this deadlock requires this 
> rebalancing to occur simultaneously with the graceful shutdown. if we happen 
> to shut them down and no rebalance happens, i don't believe this deadlock is 
> triggered.
> the deadlock appears related to the state change handlers being subscribed 
> across threads and the fact that both StreamThread#setState and 
> StreamStateListener#onChange are both synchronized methods.
> Another thing worth mentioning is that one of the transformers used in the 
> job has a close() method that can take 10-15 seconds to finish since it needs 
> to flush some data to a database. Having a long close() method combined with 
> a rebalance during a shutdown across many threads may be necessary for 
> reproduction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-03-01 Thread Jonathan Gordon (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782139#comment-16782139
 ] 

Jonathan Gordon commented on KAFKA-7652:


That did it! This is really encouraging. Any chance it'll make it into 2.2.0?

[^2.3.0-7652-NamedCache.txt]

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5571) Possible deadlock during shutdown in setState in kafka streams 10.2

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5571:
---
Fix Version/s: 1.1.0

> Possible deadlock during shutdown in setState in kafka streams 10.2
> ---
>
> Key: KAFKA-5571
> URL: https://issues.apache.org/jira/browse/KAFKA-5571
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Greg Fodor
>Assignee: Eno Thereska
>Priority: Major
> Fix For: 1.1.0
>
> Attachments: kafka-streams.deadlock.log
>
>
> I'm running a 10.2 job across 5 nodes with 32 stream threads on each node and 
> find that when gracefully shutdown all of them at once via an ansible 
> scripts, some of the nodes end up freezing -- at a glance the attached thread 
> dump implies a deadlock between stream threads trying to update their state 
> via setState. We haven't had this problem before but it may or may not be 
> related to changes in 10.2 (we are upgrading from 10.0 to 10.2)
> when we gracefully shutdown all nodes simultaneously, what typically happens 
> is some subset of the nodes end up not shutting down completely but end up 
> going through a rebalance first. it seems this deadlock requires this 
> rebalancing to occur simultaneously with the graceful shutdown. if we happen 
> to shut them down and no rebalance happens, i don't believe this deadlock is 
> triggered.
> the deadlock appears related to the state change handlers being subscribed 
> across threads and the fact that both StreamThread#setState and 
> StreamStateListener#onChange are both synchronized methods.
> Another thing worth mentioning is that one of the transformers used in the 
> job has a close() method that can take 10-15 seconds to finish since it needs 
> to flush some data to a database. Having a long close() method combined with 
> a rebalance during a shutdown across many threads may be necessary for 
> reproduction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5652) Add new api methods to KStream

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5652:
---
Fix Version/s: 1.0.0

> Add new api methods to KStream
> --
>
> Key: KAFKA-5652
> URL: https://issues.apache.org/jira/browse/KAFKA-5652
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Major
> Fix For: 1.0.0
>
>
> Add new methods from KIP-182 to {{KStream}}
>  until finalized



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-03-01 Thread Jonathan Gordon (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Gordon updated KAFKA-7652:
---
Attachment: 2.3.0-7652-NamedCache.txt

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5702) Refactor StreamThread to separate concerns and enable better testability

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5702:
---
Fix Version/s: 1.0.0

> Refactor StreamThread to separate concerns and enable better testability
> 
>
> Key: KAFKA-5702
> URL: https://issues.apache.org/jira/browse/KAFKA-5702
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Major
> Fix For: 1.0.0
>
>
> {{StreamThread}} does a lot of stuff, i.e., managing and creating tasks, 
> getting data from consumers, updating standby tasks, punctuating, rebalancing 
> etc. With the current design it is extremely hard to reason about and is 
> quite tightly coupled. 
> We need to start to tease out some of the separate concerns from 
> StreamThread, ie, TaskManager, RebalanceListener etc. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7419) Rolling sum for high frequency sream

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-7419:


> Rolling sum for high frequency sream
> 
>
> Key: KAFKA-7419
> URL: https://issues.apache.org/jira/browse/KAFKA-7419
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Stanislav Bausov
>Priority: Minor
>
> Have a task to count 24h market volume for high frequency trades stream. And 
> there is no solution out of the box. Windowing is not an option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5825) Streams not processing when exactly once is set

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5825:


> Streams not processing when exactly once is set
> ---
>
> Key: KAFKA-5825
> URL: https://issues.apache.org/jira/browse/KAFKA-5825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: EmbeddedKafka running on Windows.  Relevant files 
> attached.
>Reporter: Ryan Worsley
>Priority: Major
> Attachments: Tests.scala, build.sbt, log-output.txt, log4j.properties
>
>
> +Set-up+
> I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] 
> for ScalaTest.
> This spins up a single broker internally on a random port.
> I've written two tests - the first without transactions, the second with.  
> They're nearly identical apart from the config and the transactional 
> semantics.  I've written the transactional version based on Neha's 
> [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/]
>  which is the closest thing I could find to instructions.
> The tests wait until a single message is processed by the streams topology, 
> they use this message to complete a promise that the test is waiting on.  
> Once the promise completes the test verifies the value of the promise as 
> being the expected value of the message.
> +Observed behaviour+
> The first test passes fine, the second test times out, the stream processor 
> never seems to read the transactional message.
> +Notes+
> I've attached my build.sbt, log4j.properties and my Tests.scala file in order 
> to make it as easy as possible for someone to re-create.  I'm running on 
> Windows and using Scala as this reflects my workplace.  I completely expect 
> there to be some configuration issue that's causing this, but am unable to 
> proceed at this time.
> Related information: 
> https://github.com/manub/scalatest-embedded-kafka/issues/82



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5825) Streams not processing when exactly once is set

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5825.

Resolution: Not A Bug

> Streams not processing when exactly once is set
> ---
>
> Key: KAFKA-5825
> URL: https://issues.apache.org/jira/browse/KAFKA-5825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: EmbeddedKafka running on Windows.  Relevant files 
> attached.
>Reporter: Ryan Worsley
>Priority: Major
> Attachments: Tests.scala, build.sbt, log-output.txt, log4j.properties
>
>
> +Set-up+
> I'm using [EmbeddedKafka|https://github.com/manub/scalatest-embedded-kafka/] 
> for ScalaTest.
> This spins up a single broker internally on a random port.
> I've written two tests - the first without transactions, the second with.  
> They're nearly identical apart from the config and the transactional 
> semantics.  I've written the transactional version based on Neha's 
> [blog|https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/]
>  which is the closest thing I could find to instructions.
> The tests wait until a single message is processed by the streams topology, 
> they use this message to complete a promise that the test is waiting on.  
> Once the promise completes the test verifies the value of the promise as 
> being the expected value of the message.
> +Observed behaviour+
> The first test passes fine, the second test times out, the stream processor 
> never seems to read the transactional message.
> +Notes+
> I've attached my build.sbt, log4j.properties and my Tests.scala file in order 
> to make it as easy as possible for someone to re-create.  I'm running on 
> Windows and using Scala as this reflects my workplace.  I completely expect 
> there to be some configuration issue that's causing this, but am unable to 
> proceed at this time.
> Related information: 
> https://github.com/manub/scalatest-embedded-kafka/issues/82



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5893) ResetIntegrationTest fails

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5893:
---
Fix Version/s: 1.0.0

> ResetIntegrationTest fails
> --
>
> Key: KAFKA-5893
> URL: https://issues.apache.org/jira/browse/KAFKA-5893
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 1.0.0
>
>
> {noformat}
> java.lang.AssertionError: expected:<0> but was:<1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:363)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
> {noformat}
> One issue with debugging is, that we catch exceptions and print the exception 
> message that is {{null}}:
> {noformat}
> Standard Error
> ERROR: null
> ERROR: null
> {noformat}
> After printing the stack trace in case of failure, we got:
> {noformat}
> ERROR: java.lang.NullPointerException
> java.lang.NullPointerException
>   at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:194)
>   at kafka.tools.StreamsResetter.run(StreamsResetter.java:121)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.cleanGlobal(ResetIntegrationTest.java:362)
>   at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:172)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-6092) Time passed in punctuate call is currentTime, not punctuate schedule time.

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-6092:


> Time passed in punctuate call is currentTime, not punctuate schedule time. 
> ---
>
> Key: KAFKA-6092
> URL: https://issues.apache.org/jira/browse/KAFKA-6092
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Stephane Maarek
>Priority: Major
>
> The java doc specifies that for a Transformer, calling context.schedule calls 
> punctuate every 1000ms. This is not entirely accurate, as if no data is 
> received for a while, punctuate won't be called.
> {code}
>  * void init(ProcessorContext context) {
>  * this.context = context;
>  * this.state = context.getStateStore("myTransformState");
>  * context.schedule(1000); // call #punctuate() each 
> 1000ms
>  * }
> {code}
> When you receive new data say after 20 seconds, punctuate will play catch up 
> and will be called 20 times at reception of the new data. 
> the signature of punctuate is
> {code}
> * KeyValue punctuate(long timestamp) {
>  * // can access this.state
>  * // can emit as many new KeyValue pairs as required via 
> this.context#forward()
>  * return null; // don't return result -- can also be 
> "new KeyValue()"
>  * }
> {code}
> but the timestamp being passed is currentTimestamp at the time of the call to 
> punctuate, not at the time the punctuate was scheduled. It is very confusing 
> and I think the timestamp should represent the one at which the punctuate 
> should have been scheduled. Getting the current timestamp is not adding much 
> information as it can easily obtained using  System.currentTimeMillis();



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6461:
---
Fix Version/s: 1.1.0

> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Ted Yu
>Priority: Minor
> Fix For: 1.1.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6462) ResetIntegrationTest unstable

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6462:
---
Fix Version/s: 1.1.0

> ResetIntegrationTest unstable
> -
>
> Key: KAFKA-6462
> URL: https://issues.apache.org/jira/browse/KAFKA-6462
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 1.1.0
>
>
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms.
> at 
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1155)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:847)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(IntegrationTestUtils.java:133)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(IntegrationTestUtils.java:118)
> at 
> org.apache.kafka.streams.integration.AbstractResetIntegrationTest.add10InputElements(AbstractResetIntegrationTest.java:199)
> at 
> org.apache.kafka.streams.integration.AbstractResetIntegrationTest.prepareTest(AbstractResetIntegrationTest.java:175)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.before(ResetIntegrationTest.java:56){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6472) WordCount example code error

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6472:
---
Fix Version/s: 2.0.0

> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Assignee: Joel Hamill
>Priority: Trivial
>  Labels: newbie
> Fix For: 2.0.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6592.

Resolution: Duplicate

> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-6592:


> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6702) Wrong className in LoggerFactory.getLogger method

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6702:
---
Fix Version/s: 2.0.0

> Wrong className in LoggerFactory.getLogger method
> -
>
> Key: KAFKA-6702
> URL: https://issues.apache.org/jira/browse/KAFKA-6702
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 1.0.1, 2.0.0
>Reporter: JieFang.He
>Assignee: JieFang.He
>Priority: Major
> Fix For: 2.0.0
>
>
> Wrong className in LoggerFactory.getLogger method



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7419) Rolling sum for high frequency sream

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7419.

Resolution: Not A Problem

> Rolling sum for high frequency sream
> 
>
> Key: KAFKA-7419
> URL: https://issues.apache.org/jira/browse/KAFKA-7419
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Stanislav Bausov
>Priority: Minor
>
> Have a task to count 24h market volume for high frequency trades stream. And 
> there is no solution out of the box. Windowing is not an option.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782124#comment-16782124
 ] 

Matthias J. Sax commented on KAFKA-7628:


[~guozhang] What is the fixed version for this? Is this ticket a duplicate?

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8011) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8011:
---
Fix Version/s: 2.1.2
   2.3.0
   2.0.2
   2.2.0
   1.1.2
   1.0.3

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> 
>
> Key: KAFKA-8011
> URL: https://issues.apache.org/jira/browse/KAFKA-8011
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Critical
>  Labels: flaky-test, newbie
> Fix For: 1.0.3, 1.1.2, 2.2.0, 2.0.2, 2.3.0, 2.1.2
>
> Attachments: streams_1_0_test_results.png, streams_1_1_tests.png
>
>
> The RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> and RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted  tests use 
> an ArrayList to assert the topics assigned to the Streams application. 
> The ConsumerRebalanceListener used in the test operates on this list as does 
> the TestUtils.waitForCondition() to verify the expected topic assignments.
> Using the same list in both places can cause a ConcurrentModficationException 
> if the rebalance listener modifies the assignment at the same time 
> TestUtils.waitForCondition() is using the list to verify the expected topics. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7221) Add Build method to StreamsBuilder accepting Properties

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-7221:


> Add Build method to StreamsBuilder accepting Properties 
> 
>
> Key: KAFKA-7221
> URL: https://issues.apache.org/jira/browse/KAFKA-7221
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7221) Add Build method to StreamsBuilder accepting Properties

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7221.

Resolution: Duplicate

> Add Build method to StreamsBuilder accepting Properties 
> 
>
> Key: KAFKA-7221
> URL: https://issues.apache.org/jira/browse/KAFKA-7221
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3770) KStream job should be able to specify linger.ms

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3770.

Resolution: Duplicate

> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Major
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-3770) KStream job should be able to specify linger.ms

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3770:


> KStream job should be able to specify linger.ms
> ---
>
> Key: KAFKA-3770
> URL: https://issues.apache.org/jira/browse/KAFKA-3770
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Major
>
> The default linger.ms hardcoded into the StreamsConfig class of 100ms is 
> problematic for jobs that have lots of tasks, since this latency can accrue. 
> It seems useful to be able to override the linger.ms in the StreamsConfig. 
> Attached is a PR which allows this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3752:
---
Fix Version/s: 0.10.1.0

> Provide a way for KStreams to recover from unclean shutdown
> ---
>
> Key: KAFKA-3752
> URL: https://issues.apache.org/jira/browse/KAFKA-3752
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Roger Hoover
>Priority: Major
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM 
> Killer), it may leave behind lock files and fail to recover.
> It would be useful to have an options (say --force) to tell KStreams to 
> proceed even if it finds old LOCK files.
> {noformat}
> [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in 
> thread [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread:583)
> org.apache.kafka.streams.errors.ProcessorStateException: Error while creating 
> the state manager
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>   at 
> 

[jira] [Updated] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3758:
---
Fix Version/s: 0.11.0.1

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Eno Thereska
>Priority: Major
> Fix For: 0.11.0.1
>
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731 at 
> 

[jira] [Updated] (KAFKA-3429) Remove Serdes needed for repartitioning in KTable stateful operations

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3429:
---
Fix Version/s: 1.0.0

> Remove Serdes needed for repartitioning in KTable stateful operations
> -
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: api
> Fix For: 1.0.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6707) The default value for config of Type.LONG should be *L

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6707:
---
Fix Version/s: 2.0.0

> The default value for config of Type.LONG should be *L
> --
>
> Key: KAFKA-6707
> URL: https://issues.apache.org/jira/browse/KAFKA-6707
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: JieFang.He
>Assignee: JieFang.He
>Priority: Major
> Fix For: 2.0.0
>
>
> h1. The default value for config of Type.LONG should be *L



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7492:
---
Fix Version/s: 2.3.0

> Explain `null` handling for reduce and aggregate
> 
>
> Key: KAFKA-7492
> URL: https://issues.apache.org/jira/browse/KAFKA-7492
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Asutosh Pandya
>Priority: Minor
>  Labels: beginner, docs, newbie
> Fix For: 2.3.0
>
>
> We currently don't explain how records with `null` value are handled in 
> reduce and aggregate. In particular, what happens when the users' 
> aggregation/reduce `apply()` implementation returns `null`.
> We should update the JavaDocs accordingly and maybe also update the docs on 
> the web page.
> Cf. 
> https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7918:
---
Fix Version/s: 2.3.0

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7912) In-memory key-value store does not support concurrent access

2019-03-01 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7912:
---
Fix Version/s: 2.3.0

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8028) Kafka - ACL remove

2019-03-01 Thread Sathish Yanamala (JIRA)
Sathish Yanamala created KAFKA-8028:
---

 Summary: Kafka - ACL remove 
 Key: KAFKA-8028
 URL: https://issues.apache.org/jira/browse/KAFKA-8028
 Project: Kafka
  Issue Type: Task
Reporter: Sathish Yanamala


Hi Team,

User:--consumer has Allow permission for operations: All from hosts: 
*
 
 "--consumer" added along with principle , please let me know which command to 
remove all operations , I tried using delete operation but it is showing same.

 

Thank you,

Sathish Yanamala 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8018) Flaky Test SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed

2019-03-01 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reassigned KAFKA-8018:
--

Assignee: Jun Rao

> Flaky Test 
> SaslSslAdminClientIntegrationTest#testLegacyAclOpsNeverAffectOrReturnPrefixed
> 
>
> Key: KAFKA-8018
> URL: https://issues.apache.org/jira/browse/KAFKA-8018
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Jun Rao
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/35/]
> {quote}org.apache.zookeeper.KeeperException$SessionExpiredException: 
> KeeperErrorCode = Session expired for 
> /brokers/topics/__consumer_offsets/partitions/3/state at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
> kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:534) at 
> kafka.zk.KafkaZkClient.getTopicPartitionState(KafkaZkClient.scala:891) at 
> kafka.zk.KafkaZkClient.getLeaderForPartition(KafkaZkClient.scala:901) at 
> kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:669) 
> at kafka.utils.TestUtils$.$anonfun$createTopic$1(TestUtils.scala:304) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$1$adapted(TestUtils.scala:302) at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at 
> scala.collection.immutable.Range.foreach(Range.scala:158) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:237) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:230) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
> kafka.utils.TestUtils$.createTopic(TestUtils.scala:302) at 
> kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:350) at 
> kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:95) at 
> kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73) at 
> kafka.api.AdminClientIntegrationTest.setUp(AdminClientIntegrationTest.scala:78)
>  at 
> kafka.api.SaslSslAdminClientIntegrationTest.setUp(SaslSslAdminClientIntegrationTest.scala:64){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7912) In-memory key-value store does not support concurrent access

2019-03-01 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-7912.

Resolution: Fixed

> In-memory key-value store does not support concurrent access 
> -
>
> Key: KAFKA-7912
> URL: https://issues.apache.org/jira/browse/KAFKA-7912
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the in-memory key-value store uses a Map to store key-value pairs 
> and fetches them by calling subMap and returning an iterator to this submap. 
> This is unsafe as the submap is just a view of the original map and there is 
> risk of concurrent access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8027) Gradual decline in performance of CachingWindowStore provider when number of keys grow

2019-03-01 Thread Prashant (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prashant updated KAFKA-8027:

Description: 
We observed this during a performance test of our stream application which 
tracks user's activity and provides REST interface to query the window state 
store.  We used default configuration of Materialized i.e. withCachingEnabled 
for storing user behaviour stats in a window state store (CompositeWindowStore 
with CachingWindowStore as underlyin which internally uses RocksDBStore for 
persistent).  

While querying window store with store.fetch(key, long, long), it internally 
tries to fetch the range from ThreadCache which uses a byte iterator to search 
for a key in cache and on a cache miss it goes to RocksDBStore for result. So, 
when number of keys in cache becomes large this ThreadCache search starts 
taking time (range Iterator on all keys) which impacts WindowStore query 
performance.

 

Workaround: If we disable cache with switch on Materialized instance i.e. 
withCachingDisabled, key search is delegated directly to RocksDBStore which is 
way faster and completed search in microseconds against millis in case of 
CachingWindowStore.  

 

Stats: With Unique users > 0.5M, random search for a key i.e. UserId:

 

withCachingEnabled :  40 < t < 80ms (upper bound increases as unique users grow)

withCahingDisabled: t < 1ms (Almost constant time)      

  was:
We observed this during a performance test of our stream application which 
tracks user's activity and provides REST interface to query the window state 
store.  We used default configuration of Materialized i.e. withCachingEnabled 
for storing user behaviour stats in a window state store (CompositeWindowStore 
with CachingWindowStore as underlyin which internally uses RocksDBStore for 
persistent).  

While querying window store with store.fetch(key, long, long), it internally 
tries to fetch the range from ThreadCache which uses a byte iterator to search 
for a key in cache and on a cache miss it goes to RocksDBStore for result. So, 
when number of keys in cache becomes large this ThreadCache search starts 
taking time (range Iterator on all keys) which impacts WindowStore query 
performance.

Workaround: If we disable cache with switch on Materialized instance i.e. 
withCachingDisabled, key search is delegated directly to RocksDBStore which is 
way faster and completed search in microseconds against millis in case of 
CachingWindowStore.  

Stats: With Unique users > 0.5M, random search for a key i.e. UserId:

 

withCachingEnabled :  40 < t < 80ms (upper bound increases as unique users 
grow) withCahingDisabled: t < 1ms (Almost constant time)      


> Gradual decline in performance of CachingWindowStore provider when number of 
> keys grow
> --
>
> Key: KAFKA-8027
> URL: https://issues.apache.org/jira/browse/KAFKA-8027
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Prashant
>Priority: Major
>  Labels: interactivequ, kafka-streams
>
> We observed this during a performance test of our stream application which 
> tracks user's activity and provides REST interface to query the window state 
> store.  We used default configuration of Materialized i.e. withCachingEnabled 
> for storing user behaviour stats in a window state store 
> (CompositeWindowStore with CachingWindowStore as underlyin which internally 
> uses RocksDBStore for persistent).  
> While querying window store with store.fetch(key, long, long), it internally 
> tries to fetch the range from ThreadCache which uses a byte iterator to 
> search for a key in cache and on a cache miss it goes to RocksDBStore for 
> result. So, when number of keys in cache becomes large this ThreadCache 
> search starts taking time (range Iterator on all keys) which impacts 
> WindowStore query performance.
>  
> Workaround: If we disable cache with switch on Materialized instance i.e. 
> withCachingDisabled, key search is delegated directly to RocksDBStore which 
> is way faster and completed search in microseconds against millis in case of 
> CachingWindowStore.  
>  
> Stats: With Unique users > 0.5M, random search for a key i.e. UserId:
>  
> withCachingEnabled :  40 < t < 80ms (upper bound increases as unique users 
> grow)
> withCahingDisabled: t < 1ms (Almost constant time)      



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8027) Gradual decline in performance of CachingWindowStore provider when number of keys grow

2019-03-01 Thread Prashant (JIRA)
Prashant created KAFKA-8027:
---

 Summary: Gradual decline in performance of CachingWindowStore 
provider when number of keys grow
 Key: KAFKA-8027
 URL: https://issues.apache.org/jira/browse/KAFKA-8027
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
Reporter: Prashant


We observed this during a performance test of our stream application which 
tracks user's activity and provides REST interface to query the window state 
store.  We used default configuration of Materialized i.e. withCachingEnabled 
for storing user behaviour stats in a window state store (CompositeWindowStore 
with CachingWindowStore as underlyin which internally uses RocksDBStore for 
persistent).  

While querying window store with store.fetch(key, long, long), it internally 
tries to fetch the range from ThreadCache which uses a byte iterator to search 
for a key in cache and on a cache miss it goes to RocksDBStore for result. So, 
when number of keys in cache becomes large this ThreadCache search starts 
taking time (range Iterator on all keys) which impacts WindowStore query 
performance.

Workaround: If we disable cache with switch on Materialized instance i.e. 
withCachingDisabled, key search is delegated directly to RocksDBStore which is 
way faster and completed search in microseconds against millis in case of 
CachingWindowStore.  

Stats: With Unique users > 0.5M, random search for a key i.e. UserId:

 

withCachingEnabled :  40 < t < 80ms (upper bound increases as unique users 
grow) withCahingDisabled: t < 1ms (Almost constant time)      



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7989) Flaky Test RequestQuotaTest#testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781926#comment-16781926
 ] 

Matthias J. Sax commented on KAFKA-7989:


Failed again: 
[https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/40/testReport/junit/kafka.server/RequestQuotaTest/testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated/]

> Flaky Test 
> RequestQuotaTest#testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated
> -
>
> Key: KAFKA-7989
> URL: https://issues.apache.org/jira/browse/KAFKA-7989
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/27/]
> {quote}java.util.concurrent.ExecutionException: java.lang.AssertionError: 
> Throttle time metrics for consumer quota not updated: 
> small-quota-consumer-client at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
> java.util.concurrent.FutureTask.get(FutureTask.java:206) at 
> kafka.server.RequestQuotaTest.$anonfun$waitAndCheckResults$1(RequestQuotaTest.scala:415)
>  at scala.collection.immutable.List.foreach(List.scala:392) at 
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38)
>  at 
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38)
>  at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47) at 
> kafka.server.RequestQuotaTest.waitAndCheckResults(RequestQuotaTest.scala:413) 
> at 
> kafka.server.RequestQuotaTest.testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(RequestQuotaTest.scala:134){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2019-03-01 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-7918.

Resolution: Fixed

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8026) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted

2019-03-01 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8026:
--

 Summary: Flaky Test 
RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted
 Key: KAFKA-8026
 URL: https://issues.apache.org/jira/browse/KAFKA-8026
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Affects Versions: 1.1.1, 1.0.2
Reporter: Matthias J. Sax
 Fix For: 1.0.3, 1.1.2


{quote}java.lang.AssertionError: Condition not met within timeout 15000. Stream 
tasks not updated
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
at 
org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:215){quote}
Happend in 1.0 and 1.1 builds:
[https://builds.apache.org/blue/organizations/jenkins/kafka-1.0-jdk7/detail/kafka-1.0-jdk7/263/tests/]

and
[https://builds.apache.org/blue/organizations/jenkins/kafka-1.1-jdk7/detail/kafka-1.1-jdk7/249/tests/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8026) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted

2019-03-01 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781886#comment-16781886
 ] 

Guozhang Wang commented on KAFKA-8026:
--

[~bbejeck] I think you cherry-picked the PR to old branches as well, maybe it 
was just before the commit? Worth checking the jenkins console (it shows upon 
which commit githash it was triggered).

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted
> 
>
> Key: KAFKA-8026
> URL: https://issues.apache.org/jira/browse/KAFKA-8026
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.0.2, 1.1.1
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 1.0.3, 1.1.2
>
>
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Stream tasks not updated
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:215){quote}
> Happend in 1.0 and 1.1 builds:
> [https://builds.apache.org/blue/organizations/jenkins/kafka-1.0-jdk7/detail/kafka-1.0-jdk7/263/tests/]
> and
> [https://builds.apache.org/blue/organizations/jenkins/kafka-1.1-jdk7/detail/kafka-1.1-jdk7/249/tests/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8011) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781883#comment-16781883
 ] 

Matthias J. Sax commented on KAFKA-8011:


Done: https://issues.apache.org/jira/browse/KAFKA-8026

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> 
>
> Key: KAFKA-8011
> URL: https://issues.apache.org/jira/browse/KAFKA-8011
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Critical
>  Labels: flaky-test, newbie
> Attachments: streams_1_0_test_results.png, streams_1_1_tests.png
>
>
> The RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> and RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted  tests use 
> an ArrayList to assert the topics assigned to the Streams application. 
> The ConsumerRebalanceListener used in the test operates on this list as does 
> the TestUtils.waitForCondition() to verify the expected topic assignments.
> Using the same list in both places can cause a ConcurrentModficationException 
> if the rebalance listener modifies the assignment at the same time 
> TestUtils.waitForCondition() is using the list to verify the expected topics. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8011) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781880#comment-16781880
 ] 

Matthias J. Sax commented on KAFKA-8011:


Your commit might be unrelated. It's older branches that don't run regularly 
and longer. I'll just create a new ticket for it. Thanks!

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> 
>
> Key: KAFKA-8011
> URL: https://issues.apache.org/jira/browse/KAFKA-8011
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Critical
>  Labels: flaky-test, newbie
> Attachments: streams_1_0_test_results.png, streams_1_1_tests.png
>
>
> The RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> and RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted  tests use 
> an ArrayList to assert the topics assigned to the Streams application. 
> The ConsumerRebalanceListener used in the test operates on this list as does 
> the TestUtils.waitForCondition() to verify the expected topic assignments.
> Using the same list in both places can cause a ConcurrentModficationException 
> if the rebalance listener modifies the assignment at the same time 
> TestUtils.waitForCondition() is using the list to verify the expected topics. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8023) Improve global state store restoration by using multiple update threads

2019-03-01 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8023:
-
Labels: performance  (was: )

> Improve global state store restoration by using multiple update threads
> ---
>
> Key: KAFKA-8023
> URL: https://issues.apache.org/jira/browse/KAFKA-8023
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrik Kleindl
>Priority: Major
>  Labels: performance
>
> Currently global state stores are restored sequentially and the partitions of 
> each global state store are restored sequentially too.
> Loop over stores:
> https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L155
> Loop over partitions:
> https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L256
> It would be a great improvement if one or both of those loops could be 
> processed in parallel.
> Possible related task is https://issues.apache.org/jira/browse/KAFKA-6721 
> Mail discussion: 
> https://lists.apache.org/thread.html/6fc4772eb8635c04b0ee6682003a99a5ef37ebccffea6c89752e96b1@%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8021) KafkaProducer.flush() can show unexpected behavior when a batch is split

2019-03-01 Thread Abhishek Mendhekar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781850#comment-16781850
 ] 

Abhishek Mendhekar commented on KAFKA-8021:
---

[~sriharsha] the issue is in 0.11 and higher version (including latest trunk). 
Since this is a very small race I am trying to reproduce this in an integration 
test but I was able to verify it via unit tests.

> KafkaProducer.flush() can show unexpected behavior when a batch is split
> 
>
> Key: KAFKA-8021
> URL: https://issues.apache.org/jira/browse/KAFKA-8021
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0
>Reporter: Abhishek Mendhekar
>Assignee: Abhishek Mendhekar
>Priority: Major
>
> KafkaProducer.flush() marks the flush in progress and then waits for all 
> incomplete batches to be completed (waits on the producer batch futures to 
> finish).
> The behavior is seen when a batch is split due to MESSAGE_TOO_LARGE exception.
> The large batch is split into smaller batches (2 or more) but 
> ProducerBatch.split() marks the large batch future as complete before adding 
> the new batches in the incomplete list of batches. At this time if the 
> KafkaProducer.flush() is called then it'll make a copy of existing incomplete 
> list of batches and waits for them to complete while ignoring the large batch 
> that was split into smaller batches.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8025) Flaky Test RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls

2019-03-01 Thread Konstantine Karantasis (JIRA)
Konstantine Karantasis created KAFKA-8025:
-

 Summary: Flaky Test 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls
 Key: KAFKA-8025
 URL: https://issues.apache.org/jira/browse/KAFKA-8025
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Konstantine Karantasis
Assignee: Guozhang Wang


At least one occurence where the following unit test case failed on a jenkins 
job that didn't involve any related changes. 

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2783/consoleFull]

I have not been able to reproduce it locally on Linux. (For instance 20 
consecutive runs of this class pass all test cases)
{code:java}
14:06:13 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
 > shouldForwardAllDbOptionsCalls STARTED 14:06:14 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/streams/build/reports/testOutput/org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls.test.stdout
 14:06:14 14:06:14 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
 > shouldForwardAllDbOptionsCalls FAILED 14:06:14     java.lang.AssertionError: 
14:06:14     Expected: a string matching the pattern 'Unexpected method call 
DBOptions\.baseBackgroundCompactions((.* 14:06:14     *)*):' 14:06:14          
but: was "Unexpected method call DBOptions.baseBackgroundCompactions():\n    
DBOptions.close(): expected: 3, actual: 0" 14:06:14         at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) 14:06:14         
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) 14:06:14         
at 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.verifyDBOptionsMethodCall(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:121)
 14:06:14         at 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:101)
 14:06:14 14:06:14 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
 > shouldForwardAllColumnFamilyCalls STARTED 14:06:14 14:06:14 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
 > shouldForwardAllColumnFamilyCalls PASSED

{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7922) Returned authorized operations in describe responses (KIP-430)

2019-03-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781816#comment-16781816
 ] 

ASF GitHub Bot commented on KAFKA-7922:
---

omkreddy commented on pull request #6322: KAFKA-7922: Return authorized 
operations in describe consumer group responses (KIP-430 Part-1)
URL: https://github.com/apache/kafka/pull/6322
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Returned authorized operations in describe responses (KIP-430)
> --
>
> Key: KAFKA-7922
> URL: https://issues.apache.org/jira/browse/KAFKA-7922
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Manikumar
>Priority: Major
>
> Add an option to request authorized operations on resources when describing 
> resources (topics, onsumer groups and cluster).
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses
>  for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8021) KafkaProducer.flush() can show unexpected behavior when a batch is split

2019-03-01 Thread Sriharsha Chintalapani (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781820#comment-16781820
 ] 

Sriharsha Chintalapani commented on KAFKA-8021:
---

[~amendhekar] have you noticed this issue with the latest release of 2.1.1

> KafkaProducer.flush() can show unexpected behavior when a batch is split
> 
>
> Key: KAFKA-8021
> URL: https://issues.apache.org/jira/browse/KAFKA-8021
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0
>Reporter: Abhishek Mendhekar
>Assignee: Abhishek Mendhekar
>Priority: Major
>
> KafkaProducer.flush() marks the flush in progress and then waits for all 
> incomplete batches to be completed (waits on the producer batch futures to 
> finish).
> The behavior is seen when a batch is split due to MESSAGE_TOO_LARGE exception.
> The large batch is split into smaller batches (2 or more) but 
> ProducerBatch.split() marks the large batch future as complete before adding 
> the new batches in the incomplete list of batches. At this time if the 
> KafkaProducer.flush() is called then it'll make a copy of existing incomplete 
> list of batches and waits for them to complete while ignoring the large batch 
> that was split into smaller batches.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8011) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated

2019-03-01 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781755#comment-16781755
 ] 

Bill Bejeck edited comment on KAFKA-8011 at 3/1/19 3:10 PM:


I've been able to verify that both 1.0 and 1.1 tests pass locally.  If there is 
a failure in a subsequent build should I revert the cherry-pick?

 

EDIT: Additionally the error message in the 1.1 failure

 
{noformat}
Exception in thread 
"regex-source-integration-test-e94954f0-2bda-4967-89ee-3de8a64522ea-StreamThread-6"
 org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic foo 
is already matched for another regex pattern foo.* and hence cannot be matched 
to this regex pattern f.* any more.{noformat}
includes a topic name that is not even in the test.

 

The commit only changed the type of the List used in two tests; I made no logic 
changes were at all.


was (Author: bbejeck):
I've been able to verify that both 1.0 and 1.1 tests pass locally.  If there is 
a failure in a subsequent build should I revert the cherry-pick?

 

EDIT: Additionally the error message

 
{noformat}
Exception in thread 
"regex-source-integration-test-e94954f0-2bda-4967-89ee-3de8a64522ea-StreamThread-6"
 org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic foo 
is already matched for another regex pattern foo.* and hence cannot be matched 
to this regex pattern f.* any more.{noformat}
includes a topic name that is not even in the test.

 

The commit only changed the type of the List used in two tests; I made no logic 
changes were at all.

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> 
>
> Key: KAFKA-8011
> URL: https://issues.apache.org/jira/browse/KAFKA-8011
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Critical
>  Labels: flaky-test, newbie
> Attachments: streams_1_0_test_results.png, streams_1_1_tests.png
>
>
> The RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> and RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted  tests use 
> an ArrayList to assert the topics assigned to the Streams application. 
> The ConsumerRebalanceListener used in the test operates on this list as does 
> the TestUtils.waitForCondition() to verify the expected topic assignments.
> Using the same list in both places can cause a ConcurrentModficationException 
> if the rebalance listener modifies the assignment at the same time 
> TestUtils.waitForCondition() is using the list to verify the expected topics. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8011) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated

2019-03-01 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781755#comment-16781755
 ] 

Bill Bejeck edited comment on KAFKA-8011 at 3/1/19 3:09 PM:


I've been able to verify that both 1.0 and 1.1 tests pass locally.  If there is 
a failure in a subsequent build should I revert the cherry-pick?

 

EDIT: Additionally the error message

 
{noformat}
Exception in thread 
"regex-source-integration-test-e94954f0-2bda-4967-89ee-3de8a64522ea-StreamThread-6"
 org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic foo 
is already matched for another regex pattern foo.* and hence cannot be matched 
to this regex pattern f.* any more.{noformat}
includes a topic name that is not even in the test.

 

The commit only changed the type of the List used in two tests; I made no logic 
changes were at all.


was (Author: bbejeck):
I've been able to verify that both 1.0 and 1.1 tests pass locally.  If there is 
a failure in a subsequent build should I revert the cherry-pick?

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> 
>
> Key: KAFKA-8011
> URL: https://issues.apache.org/jira/browse/KAFKA-8011
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Critical
>  Labels: flaky-test, newbie
> Attachments: streams_1_0_test_results.png, streams_1_1_tests.png
>
>
> The RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> and RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted  tests use 
> an ArrayList to assert the topics assigned to the Streams application. 
> The ConsumerRebalanceListener used in the test operates on this list as does 
> the TestUtils.waitForCondition() to verify the expected topic assignments.
> Using the same list in both places can cause a ConcurrentModficationException 
> if the rebalance listener modifies the assignment at the same time 
> TestUtils.waitForCondition() is using the list to verify the expected topics. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >