[
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508386#comment-17508386
]
John Roesler commented on KAFKA-13714:
--------------------------------------
Another local repro:
{code:java}
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest >
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED
java.lang.AssertionError: Result:StateQueryResult{partitionResults={
0=SucceededQueryResult{
result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946,
executionInfo=[
Handled in class
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns,
Handled in class
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
via WrappedStateStore in 1165952ns,
Handled in class
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns,
Handled in class
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with
serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns
],
position=Position{position={input-topic={0=1}}}},
1=SucceededQueryResult{
result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc,
executionInfo=[
Handled in class
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns,
Handled in class
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
via WrappedStateStore in 116767ns,
Handled in class
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns,
Handled in class
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with
serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns
],
position=Position{position={input-topic={1=1}}}}},
globalResult=null}
Expected: is <[1, 2, 3]>
but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
at
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
at
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
{code}
> Flaky test IQv2StoreIntegrationTest
> -----------------------------------
>
> Key: KAFKA-13714
> URL: https://issues.apache.org/jira/browse/KAFKA-13714
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.2.0
> Reporter: John Roesler
> Priority: Blocker
>
> I have observed multiple consistency violations in the
> IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's
> apparently a major flaw in the feature, we should not release with this bug
> outstanding. Depending on the time-table, we may want to block the release or
> pull the feature until the next release.
>
> The first observation I have is from 23 Feb 2022. So far all observations
> point to the range query in particular, and all observations have been for
> RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the
> windowed store built on RocksDB segments.
> For reference, range queries were implemented on 16 Feb 2022:
> [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]
> The window-specific range query test has also failed once that I have seen.
> That feature was implemented on 2 Jan 2022:
> [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]
>
> Here are some stack traces I have seen:
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError:
> Expected: is <[1, 2, 3]>
> but: was <[1, 2]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
> {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
> java.lang.AssertionError:
> Expected: is <[1, 2, 3]>
> but: was <[1, 3]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
> {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError:
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
> executionInfo=[], position=Position{position={input-topic={0=1}}}},
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
> executionInfo=[], position=Position{position={input-topic={1=1}}}}},
> globalResult=null}
> Expected: is <[1, 2, 3]>
> but: was <[1, 2]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
> {code}
> {code:java}
> verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL]
> java.lang.AssertionError:
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
> executionInfo=[], position=Position{position={input-topic={0=1}}}},
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
> executionInfo=[], position=Position{position={input-topic={1=1}}}}},
> globalResult=null}
> Expected: is <[0, 1, 2, 3]>
> but: was <[0, 2, 3]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880)
> at
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793)
> {code}
>
> Some observations:
> * After I added the whole query result to the failure message, we can see
> that the results are always past the desired position, even though they don't
> include all the data that should have been present in that position.
> * All the observed failures have happened with caching=true, but that it
> probably a red herring, since range queries skip the cache (cf
> fe72187cb15bf7dcc16e8630ed379e979c101151)
> * For a while, I thought that it might be a thread visibility problem with
> the iterators, since the missing record was always at the end of the range
> for some partition, but the window range failure is missing record 1, which
> is at the beginning of the range in partition 1.
> I have been able to reproduce the failure locally, but only occasionally. I
> made some hacks to narrow down the space of possibilities:
> [https://github.com/vvcephei/kafka/commit/2a0776e52e378f1c59e98f352e3fa4f79c55842d]
> I didn't have success running that one test until failure in IDEA. It has
> never failed for me in IDEA, even after thousands of attempts. In my testing
> branch, I added a loop to repeat one test configuration a thousand times in
> Gradle, but it still didn't fail reliably.
> I also added a test to specifically check that RocksDB is giving the desired
> serialization both in one thread and across threads, and that test passes for
> me. My next thought is to expand that Tmp test to do the same with the
> RocksDBIterator class, or maybe just with a standalone RocksDBStore to see if
> we can reproduce it.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)