mjsax commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465097558



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -132,16 +135,19 @@
                                                                                
     final boolean stateCreated,
                                                                                
     final StoreBuilder<?> storeBuilder,
                                                                                
     final Windows<W> windows,
+                                                                               
     final SlidingWindows slidingWindows,
                                                                                
     final SessionWindows sessionWindows,
                                                                                
     final Merger<? super K, VOut> sessionMerger) {
 
         final ProcessorSupplier<K, ?> kStreamAggregate;
 
-        if (windows == null && sessionWindows == null) {
+        if (windows == null && slidingWindows == null && sessionWindows == 
null) {
             kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), 
initializer, aggregator);
-        } else if (windows != null && sessionWindows == null) {
+        } else if (windows != null && slidingWindows == null && sessionWindows 
== null) {
             kStreamAggregate = new KStreamWindowAggregate<>(windows, 
storeBuilder.name(), initializer, aggregator);
-        } else if (windows == null && sessionMerger != null) {
+        } else if (windows == null && slidingWindows != null && sessionWindows 
== null) {
+            kStreamAggregate = new 
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), 
initializer, aggregator);
+        } else if (windows == null && slidingWindows == null && sessionMerger 
!= null) {

Review comment:
       Well, yes and no. We can follow two strategies: (1) we rely on the user 
to only set all `null` (for the non-windowed aggregation case) or either one of 
`windows`, `slidingWindow`, or `sessionWindow+sessionMerger` and we don't do 
any verification if the method is called correctly or not. However, for this 
case, we don't need to do any redundant not-null check and we could just write:
   ```
   if (windows != null) {
     ...
   } else if (slidingWindow != null) {
     ...
   } else if (sessionWindow != null) { // we blindly assume that 
`sessionMerger` is not-null, too, for this case
    ...
   } else { // all `null`: non-winowed aggregation
     ...
   }
   ```
   
   Or, (2) we do not "trust" the caller and do a proper check that the provided 
arguments make sense. And the existing code already has such a safe guard and 
does checks that not multiple windows are passed in and throws an 
`IllegalArgumentException` if the caller makes a mistake. I personally prefer 
to have a safe guard (especially on the non-hot code path) as it may prevent 
bugs. However, the current check is not complete, as it does not verify that 
`sessionMerger` must be not-null when `sessionWindows` is not-null; this may 
lead to a potentially cryptic `NullPointerException` later that is harder to 
understand. If we do the check and throw a proper 
`IllegalArgumentException("sessionMerger cannot be null for sessionWindows")` 
and `IllegalArgumentException("Unexpected sessionMerger parameter: should be 
null because sessionWindows is null");` help to identify the issue quickly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to