mjsax commented on code in PR #21643:
URL: https://github.com/apache/kafka/pull/21643#discussion_r2900557091
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java:
##########
@@ -209,22 +246,116 @@ public void shouldThrowUsingIQv2ForInMemoryStores() {
.withCachingDisabled()
.build();
- final KeyQuery<String, ValueTimestampHeaders<String>> query =
- KeyQuery.withKey("test-key");
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ try {
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(HeadersBytesStore.class, wrapped,
+ "Expected wrapper to implement HeadersBytesStore for
InMemoryKeyValueStore");
+
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final QueryResult<byte[]> result = wrapped.query(query,
PositionBound.unbounded(), new QueryConfig(false));
+
+ assertTrue(result.isSuccess(), "Expected query to succeed on
InMemoryKeyValueStore");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForQueryOnInMemoryStore() {
Review Comment:
Do we need this one? In the end, we didn't change in-memory store, so the
store itself should be tested elsewhere already? And we have
`shouldCollectExecutionInfoForQueryOnHeadersStore` below, that test the new
headers related code already?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java:
##########
@@ -209,22 +246,116 @@ public void shouldThrowUsingIQv2ForInMemoryStores() {
.withCachingDisabled()
.build();
- final KeyQuery<String, ValueTimestampHeaders<String>> query =
- KeyQuery.withKey("test-key");
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ try {
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(HeadersBytesStore.class, wrapped,
+ "Expected wrapper to implement HeadersBytesStore for
InMemoryKeyValueStore");
+
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final QueryResult<byte[]> result = wrapped.query(query,
PositionBound.unbounded(), new QueryConfig(false));
+
+ assertTrue(result.isSuccess(), "Expected query to succeed on
InMemoryKeyValueStore");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ } finally {
Review Comment:
Do we need to add some data to the store, to see if we can actually get it
back, and deserialized correctly?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java:
##########
@@ -192,9 +208,30 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() {
}
@Test
- public void shouldThrowUsingIQv2ForInMemoryStores() {
+ public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
Review Comment:
When we added ts-store back in the days, we needed to allow to pass in
plain-stores via `Materialized` for backward compatibility.
Given that we support to pass in a plain-stores into the DSL atm (via
`Materialized` -- we don't have a config for it, as we don't want to encourage
people to do this), it is strictly speaking a backward incompatibly if we do
drop support for it with AK 4.3 release... I believe it's very rare (or maybe
even not used at all), to pass in a plain KV store into the DSL, so risk is
low, but still wondering if we can do this?
We might need to extend the scope for this, to cover this case. Thoughts?
(Or course this is not blocking this PR...)
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java:
##########
@@ -326,16 +557,93 @@ public void shouldThrowOnGetPositionForInMemoryStores() {
.withCachingDisabled()
.build();
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- store::getPosition
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ final RangeQuery<Bytes, byte[]> query = RangeQuery.withRange(
+ new Bytes("a".getBytes()),
+ new Bytes("z".getBytes())
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false);
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> result =
wrapped.query(query, positionBound, config);
+
+ // Verify: Headers store currently returns UNKNOWN_QUERY_TYPE
+ assertFalse(result.isSuccess(), "Expected query to fail with
unknown query type");
+ assertEquals(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ result.getFailureReason(),
+ "Expected UNKNOWN_QUERY_TYPE failure reason"
+ );
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForQueryOnHeadersStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
);
- assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable
execution info
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final QueryResult<byte[]> result = wrapped.query(query,
positionBound, config);
+
+ // Verify: Execution info was collected
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected
execution info to be collected");
+ assertTrue(
+ result.getExecutionInfo().get(0).contains("Handled in"),
+ "Expected execution info to contain handling information"
+ );
+ assertTrue(
+
result.getExecutionInfo().get(0).contains(RocksDBTimestampedStoreWithHeaders.class.getName()),
+ "Expected execution info to mention the class name"
+ );
+ } finally {
+ store.close();
+ }
}
@Test
- public void shouldThrowOnGetPositionForHeadersStoreAdapter() {
+ public void shouldHandleKeyQueryOnAdaptedTimestampedStore() {
Review Comment:
Do we need to add data into the store for this test?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java:
##########
@@ -352,19 +660,42 @@ public void
shouldThrowOnGetPositionForHeadersStoreAdapter() {
.withCachingDisabled()
.build();
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- store::getPosition
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
);
-
- assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ store.init(context, store);
+
+ try {
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped,
+ "Expected TimestampedToHeadersStoreAdapter for legacy
timestamped store");
+
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false);
+
+ final QueryResult<byte[]> result = wrapped.query(query,
positionBound, config);
+
+ // Verify: Adapter delegates to RocksDBTimestampedStore which
supports IQv2 through RocksDBStore
+ // The underlying store should handle the query successfully (even
if key doesn't exist)
+ assertTrue(result.isSuccess() || result.getFailureReason() !=
FailureReason.UNKNOWN_QUERY_TYPE,
Review Comment:
Why do we need `|| result.getFailureReason() !=
FailureReason.UNKNOWN_QUERY_TYPE` ? -- There shouldn't be any error?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]