[ https://issues.apache.org/jira/browse/KAFKA-17515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881675#comment-17881675 ]
Yu-Lin Chen commented on KAFKA-17515: ------------------------------------- The same "java.nio.file.DirectoryNotEmptyException" error can be reproduced in my local environment if I adjust the timeout to 100ms. ([Code Link|https://github.com/apache/kafka/blob/d95e384146e67e475b2e0638f70d6f4a708a01dc/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582-L583]) By shortening the timeout, I can force the flush() of kafkaStreams.close() and purgeLocalStreamsState() to be triggered in the same time. Attached the screenshot. > Fix flaky > RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener > ---------------------------------------------------------------------------------- > > Key: KAFKA-17515 > URL: https://issues.apache.org/jira/browse/KAFKA-17515 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Reporter: Chia-Ping Tsai > Assignee: Yu-Lin Chen > Priority: Major > Attachments: Reproduced screenshoot in my env (Loop 7).png > > > {code:java} > Stacktrace > java.nio.file.DirectoryNotEmptyException: > /tmp/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ10111145955704739924-ks1/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ/0_0 > at > java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:289) > at > java.base/sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:109) > at java.base/java.nio.file.Files.deleteIfExists(Files.java:1191) > at > org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:898) > at > org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:870) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2803) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2857) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:870) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:266) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:278) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener(RestoreIntegrationTest.java:583) > at java.base/java.lang.reflect.Method.invoke(Method.java:580) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)