[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13714.
----------------------------------
    Fix Version/s: 3.2.0
         Assignee: John Roesler
       Resolution: Fixed

> Flaky test IQv2StoreIntegrationTest
> -----------------------------------
>
>                 Key: KAFKA-13714
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13714
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.2.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Blocker
>             Fix For: 3.2.0
>
>
> I have observed multiple consistency violations in the 
> IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
> apparently a major flaw in the feature, we should not release with this bug 
> outstanding. Depending on the time-table, we may want to block the release or 
> pull the feature until the next release.
>  
> The first observation I have is from 23 Feb 2022. So far all observations 
> point to the range query in particular, and all observations have been for 
> RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the 
> windowed store built on RocksDB segments.
> For reference, range queries were implemented on 16 Feb 2022: 
> [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]
> The window-specific range query test has also failed once that I have seen. 
> That feature was implemented on 2 Jan 2022: 
> [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]
>  
> Here are some stack traces I have seen:
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>      but: was <[1, 2]>
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
>  {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>      but: was <[1, 3]>
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
>        {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
>  executionInfo=[], position=Position{position={input-topic={0=1}}}}, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
>  executionInfo=[], position=Position{position={input-topic={1=1}}}}}, 
> globalResult=null}
> Expected: is <[1, 2, 3]>
>      but: was <[1, 2]>
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
>       at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
>  {code}
> {code:java}
> verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 
>     java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
>  executionInfo=[], position=Position{position={input-topic={0=1}}}}, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
>  executionInfo=[], position=Position{position={input-topic={1=1}}}}}, 
> globalResult=null}
>     Expected: is <[0, 1, 2, 3]> 
>          but: was <[0, 2, 3]>
>         at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>         at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234)
>         at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880)
>         at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793)
>  {code}
>  
> Some observations:
>  * After I added the whole query result to the failure message, we can see 
> that the results are always past the desired position, even though they don't 
> include all the data that should have been present in that position.
>  * All the observed failures have happened with caching=true, but that it 
> probably a red herring, since range queries skip the cache (cf 
> fe72187cb15bf7dcc16e8630ed379e979c101151)
>  * For a while, I thought that it might be a thread visibility problem with 
> the iterators, since the missing record was always at the end of the range 
> for some partition, but the window range failure is missing record 1, which 
> is at the beginning of the range in partition 1.
> I have been able to reproduce the failure locally, but only occasionally. I 
> made some hacks to narrow down the space of possibilities: 
> [https://github.com/vvcephei/kafka/commit/2a0776e52e378f1c59e98f352e3fa4f79c55842d]
> I didn't have success running that one test until failure in IDEA. It has 
> never failed for me in IDEA, even after thousands of attempts. In my testing 
> branch, I added a loop to repeat one test configuration a thousand times in 
> Gradle, but it still didn't fail reliably.
> I also added a test to specifically check that RocksDB is giving the desired 
> serialization both in one thread and across threads, and that test passes for 
> me. My next thought is to expand that Tmp test to do the same with the 
> RocksDBIterator class, or maybe just with a standalone RocksDBStore to see if 
> we can reproduce it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to