[GitHub] [flink] leonardBang commented on a change in pull request #15495: [FLINK-21305][table-planner-blink] Fix Cumulative and Hopping window should accumulate late events belonging to the cleaned s

2021-04-13 Thread GitBox


leonardBang commented on a change in pull request #15495:
URL: https://github.com/apache/flink/pull/15495#discussion_r612124869



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
##
@@ -123,6 +124,19 @@ public void merge(@Nullable Long mergeResult, 
Iterable toBeMerged) throws
 }
 }
 
+protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
+mergeTargetGetter.mergeTarget = null;
+sliceSharedAssigner.mergeSlices(sliceToMerge, mergeTargetGetter);

Review comment:
   I got the point, but looks this doesn't very clean. Could we  rename 
`SliceMergeTargetGetter`  to `SliceMergeTargetHelper`  because it didn't expose 
`getter` method, and add `setMergeTarget` `getMergeTarget`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] leonardBang commented on a change in pull request #15495: [FLINK-21305][table-planner-blink] Fix Cumulative and Hopping window should accumulate late events belonging to the cleaned s

2021-04-12 Thread GitBox


leonardBang commented on a change in pull request #15495:
URL: https://github.com/apache/flink/pull/15495#discussion_r612122508



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
##
@@ -46,6 +46,7 @@
 
 private final SliceSharedAssigner sliceSharedAssigner;
 private final WindowIsEmptySupplier emptySupplier;
+private final SliceMergeTargetGetter mergeTargetGetter = new 
SliceMergeTargetGetter();

Review comment:
   move the initialization to class constructor?

##
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
##
@@ -186,20 +189,20 @@ public void testEventTimeHoppingWindows() throws 
Exception {
 ASSERTER.assertOutputEqualsSorted(
 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-// late element, should be dropped
+// late element for [1K, 4K), but should be accumulated into [2K, 5K), 
[3K, 6K)
 testHarness.processElement(insertRecord("key2", 1, 3500L));
 
 testHarness.processWatermark(new Watermark(4999));
-expectedOutput.add(insertRecord("key2", 2L, 2L, localMills(2000L), 
localMills(5000L)));
+expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(2000L), 
localMills(5000L)));
 expectedOutput.add(new Watermark(4999));
 ASSERTER.assertOutputEqualsSorted(
 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-// late element, should be dropped
-testHarness.processElement(insertRecord("key1", 1, 4999L));
+// totally late element, should be dropped

Review comment:
   nit: we can use `late than the slice` and `late then the window` 

##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/SliceSharedWindowAggProcessor.java
##
@@ -123,6 +124,19 @@ public void merge(@Nullable Long mergeResult, 
Iterable toBeMerged) throws
 }
 }
 
+protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
+mergeTargetGetter.mergeTarget = null;
+sliceSharedAssigner.mergeSlices(sliceToMerge, mergeTargetGetter);

Review comment:
   I got the point, but looks this doesn't very clean. Could we  rename 
`SliceMergeTargetGetter`  to `SliceMergeTargetHelper`  because it didn't expose 
`getter` method, and add `SetMergeTarget` `GetMergeTarget`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org