mjsax commented on code in PR #21749:
URL: https://github.com/apache/kafka/pull/21749#discussion_r2936066627
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java:
##########
@@ -428,8 +443,12 @@ public void shouldNotEnableCachingWithEmitFinal(final
EmitStrategy.StrategyType
final StateStore store =
driver.getAllStateStores().get("aggregated");
final WrappedStateStore changeLogging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
assertThat(store, instanceOf(MeteredSessionStore.class));
- assertThat(changeLogging,
instanceOf(ChangeLoggingSessionBytesStore.class));
- assertThat(changeLogging.wrapped(),
instanceOf(SessionToHeadersStoreAdapter.class));
+ if (withHeaders) {
+ assertThat(changeLogging,
instanceOf(ChangeLoggingSessionBytesStoreWithHeaders.class));
+ } else {
+ assertThat(changeLogging,
instanceOf(ChangeLoggingSessionBytesStore.class));
+ assertThat(changeLogging.wrapped(),
instanceOf(SessionToHeadersStoreAdapter.class));
Review Comment:
Why do we need to check the wrapped store type? And if we need to, why don't
we do it for the headers-case?
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java:
##########
@@ -78,10 +82,21 @@ public class SessionWindowedKStreamImplTest {
private boolean emitFinal;
- public void setup(final EmitStrategy.StrategyType inputType) {
+ static Stream<Arguments> emitStrategyAndHeaders() {
+ return Stream.of(
+ Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false),
+ Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true),
+ Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false),
+ Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true)
+ );
+ }
+ public void setup(final EmitStrategy.StrategyType inputType, final boolean
withHeaders) {
Review Comment:
nit: inser empty line.
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java:
##########
@@ -312,35 +327,35 @@ public void shouldMaterializeAggregated(final
EmitStrategy.StrategyType inputTyp
@ParameterizedTest
@EnumSource(EmitStrategy.StrategyType.class)
public void shouldThrowNullPointerOnAggregateIfInitializerIsNull(final
EmitStrategy.StrategyType inputType) {
- setup(inputType);
+ setup(inputType, false);
Review Comment:
Makes sense to hard-code `false` but this make we wonder why we don't
hardcode `inputType`, too? Of course not directly related to this PR, but might
be nice side improvement we could piggy-pack on this PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]