[jira] [Commented] (KAFKA-3887) StreamBounceTest.test_bounce and StreamSmokeTest.test_streams failing
[ https://issues.apache.org/jira/browse/KAFKA-3887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369420#comment-15369420 ] ASF GitHub Bot commented on KAFKA-3887: --- GitHub user guozhangwang opened a pull request: https://github.com/apache/kafka/pull/1604 KAFKA-3887 Follow-up: add unit test for null checking in KTable aggregates Also made a pass over the streams unit tests, with the following changes: 1. Removed three integration tests as they are already covered by other integration tests. 2. Merged `KGroupedTableImplTest` into `KTableAggregateTest`. 3. Use mocks whenever possible to reduce code duplicates. You can merge this pull request into a Git repository by running: $ git pull https://github.com/guozhangwang/kafka Kminor-unit-tests-consolidation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1604.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1604 commit c08d252e591cae875b9c43e81f38e078a025f46a Author: Guozhang Wang Date: 2016-07-10T05:03:50Z make a pass over unit tests > StreamBounceTest.test_bounce and StreamSmokeTest.test_streams failing > - > > Key: KAFKA-3887 > URL: https://issues.apache.org/jira/browse/KAFKA-3887 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Ismael Juma >Assignee: Guozhang Wang > Labels: transient-system-test-failure > Fix For: 0.10.0.1 > > > StreamBounceTest.test_bounce and StreamSmokeTest.test_streams has been > failing semi-regularly. Output from the latest failure: > {code} > > test_id: > 2016-06-20--001.kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce > status: FAIL > run time: 4 minutes 13.916 seconds > Streams Smoke Test process on ubuntu@worker5 took too long to exit > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py", > line 106, in run_all_tests > data = self.run_single_test() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py", > line 162, in run_single_test > return self.current_test_context.function(self.current_test) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/streams/streams_bounce_test.py", > line 67, in test_bounce > self.driver.wait() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py", > line 94, in wait > wait_until(lambda: not node.account.alive(pid), timeout_sec=120, > err_msg="Streams Smoke Test process on " + str(node.account) + " took too > long to exit") > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Streams Smoke Test process on ubuntu@worker5 took too long to > exit > > test_id: > 2016-06-20--001.kafkatest.tests.streams.streams_smoke_test.StreamsSmokeTest.test_streams > status: FAIL > run time: 4 minutes 36.426 seconds > Streams Smoke Test process on ubuntu@worker9 took too long to exit > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py", > line 106, in run_all_tests > data = self.run_single_test() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py", > line 162, in run_single_test > return self.current_test_context.function(self.current_test) > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/streams/streams_smoke_test.py", > line 67, in test_streams > self.driver.wait() > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py", > line 94, in wait > wait_until(lambda: not node.account.alive(pid), timeout_sec=120, > err_msg="Streams Smoke Test process on " + str(node.account) + " took too > long to exit") > File > "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/utils/util
[GitHub] kafka pull request #1604: KAFKA-3887 Follow-up: add unit test for null check...
GitHub user guozhangwang opened a pull request: https://github.com/apache/kafka/pull/1604 KAFKA-3887 Follow-up: add unit test for null checking in KTable aggregates Also made a pass over the streams unit tests, with the following changes: 1. Removed three integration tests as they are already covered by other integration tests. 2. Merged `KGroupedTableImplTest` into `KTableAggregateTest`. 3. Use mocks whenever possible to reduce code duplicates. You can merge this pull request into a Git repository by running: $ git pull https://github.com/guozhangwang/kafka Kminor-unit-tests-consolidation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1604.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1604 commit c08d252e591cae875b9c43e81f38e078a025f46a Author: Guozhang Wang Date: 2016-07-10T05:03:50Z make a pass over unit tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-1429) Yet another deadlock in controller shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369380#comment-15369380 ] ASF GitHub Bot commented on KAFKA-1429: --- GitHub user pengwei-li opened a pull request: https://github.com/apache/kafka/pull/1603 KAFKA-1429: Yet another deadlock in controller shutdown Author: pengwei Reviewers: NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/pengwei-li/kafka trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1603.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1603 commit a920d4e9807add634cc44e4b7cf9e156edd515cf Author: pengwei-li Date: 2016-07-10T00:31:56Z KAFKA-1429: Yet another deadlock in controller shutdown Author: pengwei Reviewers: NA > Yet another deadlock in controller shutdown > --- > > Key: KAFKA-1429 > URL: https://issues.apache.org/jira/browse/KAFKA-1429 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.1 >Reporter: Dmitry Bugaychenko >Assignee: Neha Narkhede > Attachments: kafka_0.9.0.0_controller_dead_lock.patch > > > Found one more case of deadlock in controller during shutdown: > {code} > ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181 > id=57 state=TIMED_WAITING > - waiting on <0x288a66ec> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > - locked <0x288a66ec> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) > at > java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468) > at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:339) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) > at > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067) > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > Locked synchronizers: count = 1 > - java.util.concurrent.locks.ReentrantLock$NonfairSync@22b9b31a > kafka-scheduler-0 id=172 state=WAITING > - waiting on <0x22b9b31a> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > - locked <0x22b9b31a> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > owned by > ZkClient-EventThread-57-192.168.41.148:2181,192.168.36.250:2181,192.168.41.207:2181 > id=57 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) > at kafka.utils.Utils$.inLock(Utils.scala:536) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1110) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerP
[GitHub] kafka pull request #1603: KAFKA-1429: Yet another deadlock in controller shu...
GitHub user pengwei-li opened a pull request: https://github.com/apache/kafka/pull/1603 KAFKA-1429: Yet another deadlock in controller shutdown Author: pengwei Reviewers: NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/pengwei-li/kafka trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1603.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1603 commit a920d4e9807add634cc44e4b7cf9e156edd515cf Author: pengwei-li Date: 2016-07-10T00:31:56Z KAFKA-1429: Yet another deadlock in controller shutdown Author: pengwei Reviewers: NA --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-3943) ConfigDef should support a builder pattern.
[ https://issues.apache.org/jira/browse/KAFKA-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369347#comment-15369347 ] ASF GitHub Bot commented on KAFKA-3943: --- GitHub user jcustenborder opened a pull request: https://github.com/apache/kafka/pull/1602 KAFKA-3943 - ConfigDef with Builder pattern Added Builder class and define() method with no arguments. Added testcase validating the ConfigDef using the current implementation against the new builder implementation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jcustenborder/kafka KAFKA-3943 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1602.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1602 commit de7873c57a382b9b8976e50fea1c2842ad3aa961 Author: Jeremy Custenborder Date: 2016-07-09T23:33:40Z Added Builder class and define() method with no arguments. Added test case validating the ConfigDef using the current implementation against the new builder implementation. > ConfigDef should support a builder pattern. > --- > > Key: KAFKA-3943 > URL: https://issues.apache.org/jira/browse/KAFKA-3943 > Project: Kafka > Issue Type: Improvement >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Minor > > I catch myself always having to lookup the overloads for define. What about > adding a builder pattern? > {code} > ConfigDef def = new ConfigDef() > > .define().name("a").type(Type.INT).defaultValue(5).validator(Range.between(0, > 14)).importance(Importance.HIGH).documentation("docs").build() > > .define().name("b").type(Type.LONG).importance(Importance.HIGH).documentation("docs").build() > > .define().name("c").type(Type.STRING).defaultValue("hello").importance(Importance.HIGH).documentation("docs").build() > > .define().name("d").type(Type.LIST).importance(Importance.HIGH).documentation("docs").build() > > .define().name("e").type(Type.DOUBLE).importance(Importance.HIGH).documentation("docs").build() > > .define().name("f").type(Type.CLASS).importance(Importance.HIGH).documentation("docs").build() > > .define().name("g").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() > > .define().name("h").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() > > .define().name("i").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() > > .define().name("j").type(Type.PASSWORD).importance(Importance.HIGH).documentation("docs").build(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request #1602: KAFKA-3943 - ConfigDef with Builder pattern
GitHub user jcustenborder opened a pull request: https://github.com/apache/kafka/pull/1602 KAFKA-3943 - ConfigDef with Builder pattern Added Builder class and define() method with no arguments. Added testcase validating the ConfigDef using the current implementation against the new builder implementation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jcustenborder/kafka KAFKA-3943 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1602.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1602 commit de7873c57a382b9b8976e50fea1c2842ad3aa961 Author: Jeremy Custenborder Date: 2016-07-09T23:33:40Z Added Builder class and define() method with no arguments. Added test case validating the ConfigDef using the current implementation against the new builder implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Avoid recreating reassign partition path in zk if it is already deleted
For the benefit of the future readers, a simple workaround for this issue is to: # change the controller to a non-existing broker, # delete the current assignment from zk, and # then change the controller to an existent broker Maysam On Wed, May 11, 2016 at 5:24 PM, Maysam Yabandeh wrote: > Hi > > I wondering if makes sense to remove > {code} > case nne: ZkNoNodeException => > createPersistentPath(zkPath, jsonData) > debug("Created path %s with %s for partition > reassignment".format(zkPath, jsonData)) > {code} > from ZKUtils::updatePartitionReassignmentData, which has caused an > incident for us. > > The code does not seem to be doing anything in the normal case: if > reassign path does not exist when removePartitionFromReassignedPartitions > starts, it then has nothing to write back to zk anyway. The only time that > the code kick in is when the admin manually deletes the zk path in the > middle of update, which essentially cancels the admin's attempt to stop a > bad partition assignment. > > The incident in our case was a very large json file that was mistakenly > used by admin for partition assignment. The controller zk thread was in a > busy loop removing partitions from this json file stored at zk, one by one. > We attempted to stop the assignment by i) removing the zk path, ii) > changing the controller. However, due to the many zk update operations by > the active controller, the path would be recreated over and over. Changing > the controller would also did not help since the new controller resumes the > badly started reassignment job by picking it up from zk. > > Simply removing createPersistentPath in the catch clause should avoid such > problems and yet does not seem to changing the intended semantics of > removePartitionFromReassignedPartitions. > > Thoughts? > > Thanks > Maysam >
[jira] [Created] (KAFKA-3944) After the broker restart, fetchers stopped due to a delayed controlled shutdown message
Maysam Yabandeh created KAFKA-3944: -- Summary: After the broker restart, fetchers stopped due to a delayed controlled shutdown message Key: KAFKA-3944 URL: https://issues.apache.org/jira/browse/KAFKA-3944 Project: Kafka Issue Type: Bug Components: core Reporter: Maysam Yabandeh Priority: Minor The symptom is that cluster reports under-replicated blocks and some replicas do not seem to catch up ever. It turns out that the corresponding fetchers in those brokers were stopped shortly after the broker's restart. The broker had stopped the fetcher upon receiving stop-replica request from the controller. The controller had issued those request upon processing controlled shutdown request form the same broker. However those requests were all sent before the broker restart but the controller processed them after. Here is the timeline: # broker sends controlled shutdown message to controller # the process fails and the broker proceeds with an unclean shutdown # the broker is restated # the controller processes the perviously sent controlled shutdown messages # the controller sends stop replica messages to the broker # the broker shuts down the fetchers, while it has no intent to shut down again # this leads to under-replicated blocks Example from logs: {code} broker19.com:/var/log/kafka$ grep "Retrying controlled shutdow\|unclean shutdown" server.log.2016-07-07.2 2016-07-07 15:58:10,818 WARN server.KafkaServer: [Kafka Server 19], Retrying controlled shutdown after the previous attempt failed... 2016-07-07 15:58:45,887 WARN server.KafkaServer: [Kafka Server 19], Retrying controlled shutdown after the previous attempt failed... 2016-07-07 15:59:20,927 WARN server.KafkaServer: [Kafka Server 19], Retrying controlled shutdown after the previous attempt failed... 2016-07-07 15:59:20,929 WARN server.KafkaServer: [Kafka Server 19], Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed broker19.com:/var/log/kafka$ head -1 server.log.2016-07-07.3 2016-07-07 16:00:23,191 INFO server.KafkaConfig: KafkaConfig values: {code} {code} broker13.com:/var/log/kafka$ grep "Shutting down broker 19" controller.log.2016-07-07.1 2016-07-07 15:57:35,822 INFO controller.KafkaController: [Controller 13]: Shutting down broker 19 2016-07-07 16:02:45,526 INFO controller.KafkaController: [Controller 13]: Shutting down broker 19 2016-07-07 16:05:42,432 INFO controller.KafkaController: [Controller 13]: Shutting down broker 19 {code} which resulted into many stop replica request to broker 19: {code} broker13.com:/var/log/kafka$ grep "The stop replica request (delete = false) sent to broker 19 is" controller.log.2016-07-07.1 | tail -1 2016-07-07 16:06:02,374 DEBUG controller.ControllerBrokerRequestBatch: The stop replica request (delete = false) sent to broker 19 is [Topic=topic-xyz,Partition=6,Replica=19] {code} broker 19 processes them AFTER its restart: {code} broker19.com:/var/log/kafka$ grep "handling stop replica (delete=false) for partition .topic-xzy,3." state-change.log.2016-07-07.2 2016-07-07 16:06:00,154 TRACE change.logger: Broker 19 handling stop replica (delete=false) for partition [topic-xzy,3] 2016-07-07 16:06:00,154 TRACE change.logger: Broker 19 finished handling stop replica (delete=false) for partition [topic-xyz,3] 2016-07-07 16:06:00,155 TRACE change.logger: Broker 19 handling stop replica (delete=false) for partition [topic-xyz,3] 2016-07-07 16:06:00,155 TRACE change.logger: Broker 19 finished handling stop replica (delete=false) for partition [topic-xyz,3] {code} and removes the fetchers: {code} broker19.com:/var/log/kafka$ grep "Removed fetcher.*topic-xyz.3" server.log.2016-07-07.3 | tail -2 2016-07-07 16:06:00,154 INFO server.ReplicaFetcherManager: [ReplicaFetcherManager on broker 19] Removed fetcher for partitions [topic-xyz,3] 2016-07-07 16:06:00,155 INFO server.ReplicaFetcherManager: [ReplicaFetcherManager on broker 19] Removed fetcher for partitions [topic-xyz,3] {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3943) ConfigDef should support a builder pattern.
Jeremy Custenborder created KAFKA-3943: -- Summary: ConfigDef should support a builder pattern. Key: KAFKA-3943 URL: https://issues.apache.org/jira/browse/KAFKA-3943 Project: Kafka Issue Type: Improvement Reporter: Jeremy Custenborder Assignee: Jeremy Custenborder Priority: Minor I catch myself always having to lookup the overloads for define. What about adding a builder pattern? {code} ConfigDef def = new ConfigDef() .define().name("a").type(Type.INT).defaultValue(5).validator(Range.between(0, 14)).importance(Importance.HIGH).documentation("docs").build() .define().name("b").type(Type.LONG).importance(Importance.HIGH).documentation("docs").build() .define().name("c").type(Type.STRING).defaultValue("hello").importance(Importance.HIGH).documentation("docs").build() .define().name("d").type(Type.LIST).importance(Importance.HIGH).documentation("docs").build() .define().name("e").type(Type.DOUBLE).importance(Importance.HIGH).documentation("docs").build() .define().name("f").type(Type.CLASS).importance(Importance.HIGH).documentation("docs").build() .define().name("g").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() .define().name("h").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() .define().name("i").type(Type.BOOLEAN).importance(Importance.HIGH).documentation("docs").build() .define().name("j").type(Type.PASSWORD).importance(Importance.HIGH).documentation("docs").build(); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-3882) Integration failures on low-end machines
[ https://issues.apache.org/jira/browse/KAFKA-3882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-3882. -- Resolution: Not A Bug I found out this is actually due to an issue on my local machine. > Integration failures on low-end machines > > > Key: KAFKA-3882 > URL: https://issues.apache.org/jira/browse/KAFKA-3882 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Guozhang Wang > > I now consistently saw some integration failures on my laptop due to no > records ever been produced in 30 / 60 seconds (my laptop only has 4GB and > when running the unit test today it is burning all CPUs / memory btw): > {code} > org.apache.kafka.streams.integration.JoinIntegrationTest > > shouldCountClicksPerRegion FAILED > java.lang.AssertionError: Expected 8 but received only 0 records before > timeout 3 ms > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:199) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:173) > at > org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion(JoinIntegrationTest.java:258) > org.apache.kafka.streams.integration.MapFunctionIntegrationTest > > shouldUppercaseTheInput FAILED > java.lang.AssertionError: Expected 2 but received only 0 records before > timeout 3 ms > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:235) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:209) > at > org.apache.kafka.streams.integration.MapFunctionIntegrationTest.shouldUppercaseTheInput(MapFunctionIntegrationTest.java:116) > org.apache.kafka.streams.integration.FanoutIntegrationTest > > shouldFanoutTheInput FAILED > java.lang.AssertionError: Expected 2 but received only 0 records before > timeout 3 ms > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:235) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:209) > at > org.apache.kafka.streams.integration.FanoutIntegrationTest.shouldFanoutTheInput(FanoutIntegrationTest.java:145) > org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > > shouldReduce FAILED > java.lang.AssertionError: Expected 10 but received only 0 records before > timeout 6 ms > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:199) > at > org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest.receiveMessages(KGroupedStreamIntegrationTest.java:467) > at > org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest.shouldReduce(KGroupedStreamIntegrationTest.java:141) > org.apache.kafka.streams.integration.PassThroughIntegrationTest > > shouldWriteTheInputDataAsIsToTheOutputTopic FAILED > java.lang.AssertionError: Expected 3 but received only 0 records before > timeout 3 ms > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:235) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:209) > at > org.apache.kafka.streams.integration.PassThroughIntegrationTest.shouldWriteTheInputDataAsIsToTheOutputTopic(PassThroughIntegrationTest.java:103) > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest > > shouldCorrectlyRepartitionOnJoinOperations FAILED > java.lang.AssertionError: Expected 5 but received only 0 records before > timeout 6 ms > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:235) > at > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.receiveMessages(KStreamRepartitionJoinTest.java:414) > at > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyCorrectOutput(KStreamRepartitionJoinTest.java:326) > at > org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations(KStreamRepartitionJoinTest.java:130) > org.apache.kafka.streams.integration.KGroupedStreamIntegrationTest > > shouldAggregate FAILED > java.lang.AssertionError: Expected 10 but received only
[jira] [Comment Edited] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369257#comment-15369257 ] Bill Bejeck edited comment on KAFKA-3101 at 7/9/16 7:26 PM: [~enothereska] none actually, a misunderstanding on my part. Thanks, -Bill was (Author: bbejeck): [~enothereska] none actually, a misunderstanding on my part. > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of Change oldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369257#comment-15369257 ] Bill Bejeck commented on KAFKA-3101: [~enothereska] none actually, a misunderstanding on my part. > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of Change oldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe
[ https://issues.apache.org/jira/browse/KAFKA-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369206#comment-15369206 ] ASF GitHub Bot commented on KAFKA-3905: --- GitHub user rekhajoshm opened a pull request: https://github.com/apache/kafka/pull/1601 KAFKA-3905; Handle invalid collection of topics, patterns on subscription for list of topics, with patterns, and with assignments KAFKA-3905: Handling null/empty topics and collections, patterns when subscription with list of topics or with patterns, and with assignments. - Added validity checks for input parameters on subscribe, assign to avoid NPE, and provide an argument exception instead - Updated behavior for subscription with null collection to be same as when subscription with emptyList.i.e., unsubscribes. - Added tests on subscription, assign You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/kafka KAFKA-3905-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1601.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1601 commit c9a66992b1095616f87c5748f210b973ebc7eb01 Author: Rekha Joshi Date: 2016-05-26T17:48:37Z Merge pull request #2 from apache/trunk Apache Kafka trunk pull commit 8d7fb005cb132440e7768a5b74257d2598642e0f Author: Rekha Joshi Date: 2016-05-30T21:37:43Z Merge pull request #3 from apache/trunk Apache Kafka trunk pull commit fbef9a8fb1411282fbadec46955691c3e7ba2578 Author: Rekha Joshi Date: 2016-06-04T23:58:02Z Merge pull request #4 from apache/trunk Apache Kafka trunk pull commit 172db701bf9affda1304b684921260d1cd36ae9e Author: Rekha Joshi Date: 2016-06-06T22:10:31Z Merge pull request #6 from apache/trunk Apache Kafka trunk pull commit 9d18d93745cf2bc9b0ab4bb9b25d9a31196ef918 Author: Rekha Joshi Date: 2016-06-07T19:36:45Z Merge pull request #7 from apache/trunk Apache trunk pull commit 882faea01f28aef1977f4ced6567833bcf736840 Author: Rekha Joshi Date: 2016-06-13T20:01:43Z Merge pull request #8 from confluentinc/trunk Apache kafka trunk pull commit 851315d39c0c308d79b9575546822aa932c46a09 Author: Rekha Joshi Date: 2016-06-27T17:34:54Z Merge pull request #9 from apache/trunk Merge Apache kafka trunk commit 613f07c2b4193302c82a5d6eaa1e53e4b87bfbc1 Author: Rekha Joshi Date: 2016-07-09T17:03:45Z Merge pull request #11 from apache/trunk Merge Apache kafka trunk commit ff5a583abf22d40d0bf3339a450d28ce336dd4fa Author: Joshi Date: 2016-07-09T17:21:49Z Handle invalid collection of topics, patterns on subscription for list of topics, with patterns, and with assignments > remove null from subscribed topics in KafkaConsumer#subscribe > -- > > Key: KAFKA-3905 > URL: https://issues.apache.org/jira/browse/KAFKA-3905 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 0.10.0.0 >Reporter: Xing Huang >Assignee: Rekha Joshi >Priority: Minor > > Currently, KafkaConsumer's subscribe methods accept Collection as > topics to be subscribed, but a Collection may have null as its element. For > example > {code} > String topic = null; > Collection topics = Arrays.asList(topic); > consumer.subscribe(topics) > {code} > When this happens, consumer will throw a puzzling NullPointerException: > {code} > at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245) > at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248) > at > org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85) > at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89) > at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244) > at > org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35) > at > org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.inter
[GitHub] kafka pull request #1601: KAFKA-3905; Handle invalid collection of topics, p...
GitHub user rekhajoshm opened a pull request: https://github.com/apache/kafka/pull/1601 KAFKA-3905; Handle invalid collection of topics, patterns on subscription for list of topics, with patterns, and with assignments KAFKA-3905: Handling null/empty topics and collections, patterns when subscription with list of topics or with patterns, and with assignments. - Added validity checks for input parameters on subscribe, assign to avoid NPE, and provide an argument exception instead - Updated behavior for subscription with null collection to be same as when subscription with emptyList.i.e., unsubscribes. - Added tests on subscription, assign You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/kafka KAFKA-3905-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1601.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1601 commit c9a66992b1095616f87c5748f210b973ebc7eb01 Author: Rekha Joshi Date: 2016-05-26T17:48:37Z Merge pull request #2 from apache/trunk Apache Kafka trunk pull commit 8d7fb005cb132440e7768a5b74257d2598642e0f Author: Rekha Joshi Date: 2016-05-30T21:37:43Z Merge pull request #3 from apache/trunk Apache Kafka trunk pull commit fbef9a8fb1411282fbadec46955691c3e7ba2578 Author: Rekha Joshi Date: 2016-06-04T23:58:02Z Merge pull request #4 from apache/trunk Apache Kafka trunk pull commit 172db701bf9affda1304b684921260d1cd36ae9e Author: Rekha Joshi Date: 2016-06-06T22:10:31Z Merge pull request #6 from apache/trunk Apache Kafka trunk pull commit 9d18d93745cf2bc9b0ab4bb9b25d9a31196ef918 Author: Rekha Joshi Date: 2016-06-07T19:36:45Z Merge pull request #7 from apache/trunk Apache trunk pull commit 882faea01f28aef1977f4ced6567833bcf736840 Author: Rekha Joshi Date: 2016-06-13T20:01:43Z Merge pull request #8 from confluentinc/trunk Apache kafka trunk pull commit 851315d39c0c308d79b9575546822aa932c46a09 Author: Rekha Joshi Date: 2016-06-27T17:34:54Z Merge pull request #9 from apache/trunk Merge Apache kafka trunk commit 613f07c2b4193302c82a5d6eaa1e53e4b87bfbc1 Author: Rekha Joshi Date: 2016-07-09T17:03:45Z Merge pull request #11 from apache/trunk Merge Apache kafka trunk commit ff5a583abf22d40d0bf3339a450d28ce336dd4fa Author: Joshi Date: 2016-07-09T17:21:49Z Handle invalid collection of topics, patterns on subscription for list of topics, with patterns, and with assignments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-3905) remove null from subscribed topics in KafkaConsumer#subscribe
[ https://issues.apache.org/jira/browse/KAFKA-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369204#comment-15369204 ] ASF GitHub Bot commented on KAFKA-3905: --- Github user rekhajoshm closed the pull request at: https://github.com/apache/kafka/pull/1561 > remove null from subscribed topics in KafkaConsumer#subscribe > -- > > Key: KAFKA-3905 > URL: https://issues.apache.org/jira/browse/KAFKA-3905 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 0.10.0.0 >Reporter: Xing Huang >Assignee: Rekha Joshi >Priority: Minor > > Currently, KafkaConsumer's subscribe methods accept Collection as > topics to be subscribed, but a Collection may have null as its element. For > example > {code} > String topic = null; > Collection topics = Arrays.asList(topic); > consumer.subscribe(topics) > {code} > When this happens, consumer will throw a puzzling NullPointerException: > {code} > at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245) > at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248) > at > org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85) > at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89) > at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244) > at > org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35) > at > org.apache.kafka.common.requests.RequestSend.(RequestSend.java:29) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934) > {code} > Maybe it's better to remove null when doing subscription. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request #1561: KAFKA-3905; Handle invalid collection of topics, p...
Github user rekhajoshm closed the pull request at: https://github.com/apache/kafka/pull/1561 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-3911) Enforce KTable materialization
[ https://issues.apache.org/jira/browse/KAFKA-3911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369091#comment-15369091 ] Eno Thereska commented on KAFKA-3911: - This JIRA needs to be done strictly after KAFKA-3870 is done, since without the store names exposed it is difficult to do the materialisation. > Enforce KTable materialization > -- > > Key: KAFKA-3911 > URL: https://issues.apache.org/jira/browse/KAFKA-3911 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Eno Thereska >Assignee: Eno Thereska > Fix For: 0.10.1.0 > > > Always enforce KTable materialization upon creation (i.e. even for > "builder.table(..).to()" we also materialize it into a state store; this will > indeed incur unnecessary overhead but should be very rare, and it helps for > consistency of materialization). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request #1600: KAFKA-3942: Change IntegrationTestUtils.purgeLocal...
GitHub user dguy opened a pull request: https://github.com/apache/kafka/pull/1600 KAFKA-3942: Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir It was previously only deleting files/folders where the path started with /tmp. Changed it to delete from the value of the System Property `java.io.tmpdir`. Also changed the tests that were creating State dirs under /tmp to just use `TestUtils.tempDirectory(..)` You can merge this pull request into a Git repository by running: $ git pull https://github.com/dguy/kafka kafka-3942 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1600.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1600 commit 8e68f12f9e0c9b1c67fc5ffb4c33cf1b6a65cc4f Author: Damian Guy Date: 2016-07-09T09:00:49Z Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-3942) Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir
[ https://issues.apache.org/jira/browse/KAFKA-3942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369019#comment-15369019 ] ASF GitHub Bot commented on KAFKA-3942: --- GitHub user dguy opened a pull request: https://github.com/apache/kafka/pull/1600 KAFKA-3942: Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir It was previously only deleting files/folders where the path started with /tmp. Changed it to delete from the value of the System Property `java.io.tmpdir`. Also changed the tests that were creating State dirs under /tmp to just use `TestUtils.tempDirectory(..)` You can merge this pull request into a Git repository by running: $ git pull https://github.com/dguy/kafka kafka-3942 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1600.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1600 commit 8e68f12f9e0c9b1c67fc5ffb4c33cf1b6a65cc4f Author: Damian Guy Date: 2016-07-09T09:00:49Z Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir > Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir > > > Key: KAFKA-3942 > URL: https://issues.apache.org/jira/browse/KAFKA-3942 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Damian Guy >Assignee: Damian Guy >Priority: Trivial > Fix For: 0.10.1.0 > > > {code}IntegrationTestUtils.purgeLocalStreamsState(...){code} currently only > removes the folders if the path starts with /tmp. This should really use > {code}System.getProperty("java.io.tmpdir"){code} > Once this has been changed the tests using it should be able to simplify > their {code}StreamsConfig.STATE_DIR_CONFIG{code} to > {code}TestUtils.tempDirectory().getPath(){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3942) Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir
Damian Guy created KAFKA-3942: - Summary: Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir Key: KAFKA-3942 URL: https://issues.apache.org/jira/browse/KAFKA-3942 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.1.0 Reporter: Damian Guy Assignee: Damian Guy Priority: Trivial Fix For: 0.10.1.0 {code}IntegrationTestUtils.purgeLocalStreamsState(...){code} currently only removes the folders if the path starts with /tmp. This should really use {code}System.getProperty("java.io.tmpdir"){code} Once this has been changed the tests using it should be able to simplify their {code}StreamsConfig.STATE_DIR_CONFIG{code} to {code}TestUtils.tempDirectory().getPath(){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)