mjsax commented on code in PR #21749:
URL: https://github.com/apache/kafka/pull/21749#discussion_r2936066627


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java:
##########
@@ -428,8 +443,12 @@ public void shouldNotEnableCachingWithEmitFinal(final 
EmitStrategy.StrategyType
             final StateStore store = 
driver.getAllStateStores().get("aggregated");
             final WrappedStateStore changeLogging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
             assertThat(store, instanceOf(MeteredSessionStore.class));
-            assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStore.class));
-            assertThat(changeLogging.wrapped(), 
instanceOf(SessionToHeadersStoreAdapter.class));
+            if (withHeaders) {
+                assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStoreWithHeaders.class));
+            } else {
+                assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStore.class));
+                assertThat(changeLogging.wrapped(), 
instanceOf(SessionToHeadersStoreAdapter.class));

Review Comment:
   Why do we need to check the wrapped store type? And if we need to, why don't 
we do it for the headers-case?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java:
##########
@@ -78,10 +82,21 @@ public class SessionWindowedKStreamImplTest {
 
     private boolean emitFinal;
 
-    public void setup(final EmitStrategy.StrategyType inputType) {
+    static Stream<Arguments> emitStrategyAndHeaders() {
+        return Stream.of(
+            Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false),
+            Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true),
+            Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false),
+            Arguments.of(EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true)
+        );
+    }
+    public void setup(final EmitStrategy.StrategyType inputType, final boolean 
withHeaders) {

Review Comment:
   nit: inser empty line.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java:
##########
@@ -312,35 +327,35 @@ public void shouldMaterializeAggregated(final 
EmitStrategy.StrategyType inputTyp
     @ParameterizedTest
     @EnumSource(EmitStrategy.StrategyType.class)
     public void shouldThrowNullPointerOnAggregateIfInitializerIsNull(final 
EmitStrategy.StrategyType inputType) {
-        setup(inputType);
+        setup(inputType, false);

Review Comment:
   Makes sense to hard-code `false` but this make we wonder why we don't 
hardcode `inputType`, too? Of course not directly related to this PR, but might 
be nice side improvement we could piggy-pack on this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to