[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508386#comment-17508386 ]
John Roesler commented on KAFKA-13714: -------------------------------------- Another local repro: {code:java} org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED java.lang.AssertionError: Result:StateQueryResult{partitionResults={ 0=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 1165952ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns ], position=Position{position={input-topic={0=1}}}}, 1=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 116767ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns ], position=Position{position={input-topic={1=1}}}}}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) {code} > Flaky test IQv2StoreIntegrationTest > ----------------------------------- > > Key: KAFKA-13714 > URL: https://issues.apache.org/jira/browse/KAFKA-13714 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.2.0 > Reporter: John Roesler > Priority: Blocker > > I have observed multiple consistency violations in the > IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's > apparently a major flaw in the feature, we should not release with this bug > outstanding. Depending on the time-table, we may want to block the release or > pull the feature until the next release. > > The first observation I have is from 23 Feb 2022. So far all observations > point to the range query in particular, and all observations have been for > RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the > windowed store built on RocksDB segments. > For reference, range queries were implemented on 16 Feb 2022: > [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884] > The window-specific range query test has also failed once that I have seen. > That feature was implemented on 2 Jan 2022: > [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c] > > Here are some stack traces I have seen: > {code:java} > verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 2]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) > {code} > {code:java} > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 3]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778) > {code} > {code:java} > verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a, > executionInfo=[], position=Position{position={input-topic={0=1}}}}, > 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364, > executionInfo=[], position=Position{position={input-topic={1=1}}}}}, > globalResult=null} > Expected: is <[1, 2, 3]> > but: was <[1, 2]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780) > {code} > {code:java} > verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] > java.lang.AssertionError: > Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6, > executionInfo=[], position=Position{position={input-topic={0=1}}}}, > 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165, > executionInfo=[], position=Position{position={input-topic={1=1}}}}}, > globalResult=null} > Expected: is <[0, 1, 2, 3]> > but: was <[0, 2, 3]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793) > {code} > > Some observations: > * After I added the whole query result to the failure message, we can see > that the results are always past the desired position, even though they don't > include all the data that should have been present in that position. > * All the observed failures have happened with caching=true, but that it > probably a red herring, since range queries skip the cache (cf > fe72187cb15bf7dcc16e8630ed379e979c101151) > * For a while, I thought that it might be a thread visibility problem with > the iterators, since the missing record was always at the end of the range > for some partition, but the window range failure is missing record 1, which > is at the beginning of the range in partition 1. > I have been able to reproduce the failure locally, but only occasionally. I > made some hacks to narrow down the space of possibilities: > [https://github.com/vvcephei/kafka/commit/2a0776e52e378f1c59e98f352e3fa4f79c55842d] > I didn't have success running that one test until failure in IDEA. It has > never failed for me in IDEA, even after thousands of attempts. In my testing > branch, I added a loop to repeat one test configuration a thousand times in > Gradle, but it still didn't fail reliably. > I also added a test to specifically check that RocksDB is giving the desired > serialization both in one thread and across threads, and that test passes for > me. My next thought is to expand that Tmp test to do the same with the > RocksDBIterator class, or maybe just with a standalone RocksDBStore to see if > we can reproduce it. -- This message was sent by Atlassian Jira (v8.20.1#820001)