[ 
https://issues.apache.org/jira/browse/KAFKA-10151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17138129#comment-17138129
 ] 

Chia-Ping Tsai commented on KAFKA-10151:
----------------------------------------

[~ableegoldman] I feel you have resolved this issue by 
https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03

the root cause is the in-flight records to changelog are not completed when 
closing streamTasks.
{code:java}
    @Override
    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
        switch (state()) {
            case RUNNING:
            case RESTORING:
            case SUSPENDED:
                maybeScheduleCheckpoint(); //checkpoint is not up-to-date if 
there are in-flight requests
                stateMgr.flush();
                recordCollector.flush();
{code}
see 
[https://github.com/apache/kafka/blob/03ed08d0d17a10ca4f96c8cc0a8694834ae01e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L345]

The commit 
(https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03)
 update the checkpoint after calling recordCollector.flush so the checkpoint is 
up-to-date.

Also, I loop 
StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
 30 times with 
[https://github.com/apache/kafka/commit/2239004907b29e00811fee9ded5a790172701a03].
 All pass.


> Flaky Test 
> StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10151
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10151
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Sophie Blee-Goldman
>            Priority: Major
>              Labels: flaky-test, integration-test
>         Attachments: 
> StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi.stdout.rtf
>
>
> I've started seeing this fail in the past week or so. Checked out the logs 
> and there's nothing obviously wrong (ie no ERROR or exception) so it might 
> just be flaky? 
>  
> java.lang.AssertionError: Condition not met within timeout 60000. Could not 
> get expected result in time. at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388) at 
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.verifyCountWithTimestamp(StoreUpgradeIntegrationTest.java:367)
>  at 
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:183)
>  at 
> org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest.shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(StoreUpgradeIntegrationTest.java:109)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to